Source code for l2rpn_baselines.PPO_SB3.evaluate

# Copyright (c) 2020-2022 RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of L2RPN Baselines, L2RPN Baselines a repository to host baselines for l2rpn competitions.

import os
import json
from grid2op.Runner import Runner
from grid2op.gym_compat import BoxGymActSpace, BoxGymObsSpace, GymEnv
from l2rpn_baselines.utils.gymenv_custom import GymEnvWithHeuristics

from l2rpn_baselines.utils.save_log_gif import save_log_gif
from l2rpn_baselines.PPO_SB3.utils import SB3Agent


[docs]def evaluate(env, load_path=".", name="PPO_SB3", logs_path=None, nb_episode=1, nb_process=1, max_steps=-1, verbose=False, save_gif=False, gymenv_class=GymEnv, gymenv_kwargs=None, obs_space_kwargs=None, act_space_kwargs=None, iter_num=None, **kwargs): """ This function will use stable baselines 3 to evaluate a previously trained PPO agent (with stable baselines 3) on a grid2op environment "env". It will use the grid2op "gym_compat" module to convert the action space to a BoxActionSpace and the observation to a BoxObservationSpace. It is suited for the studying the impact of continuous actions: - on storage units - on dispatchable generators - on generators with renewable energy sources Parameters ---------- env: :class:`grid2op.Environment` Then environment on which you need to train your agent. name: ``str``` The name of your agent. load_path: ``str`` If you want to reload your baseline, specify the path where it is located. **NB** if a baseline is reloaded some of the argument provided to this function will not be used. logs_dir: ``str`` Where to store the tensorboard generated logs during the training. ``None`` if you don't want to log them. nb_episode: ``str`` How many episodes to run during the assessment of the performances nb_process: ``int`` On how many process the assessment will be made. (setting this > 1 can lead to some speed ups but can be unstable on some plaform) max_steps: ``int`` How many steps at maximum your agent will be assessed verbose: ``bool`` Currently un used save_gif: ``bool`` Whether or not you want to save, as a gif, the performance of your agent. It might cause memory issues (might take a lot of ram) and drastically increase computation time. gymenv_class: The class to use as a gym environment. By default `GymEnv` (from module grid2op.gym_compat) gymenv_kwargs: ``dict`` Extra key words arguments to build the gym environment. iter_num: Which training iteration do you want to restore (by default: None means "the last one") kwargs: extra parameters passed to the PPO from stable baselines 3 Returns ------- baseline: The loaded baseline as a stable baselines PPO element. Examples --------- Here is an example on how to evaluate an PPO agent (previously trained with stable baselines3): .. code-block:: python import grid2op from grid2op.Reward import LinesCapacityReward # or any other rewards from lightsim2grid import LightSimBackend # highly recommended ! from l2rpn_baselines.PPO_SB3 import evaluate nb_episode = 7 nb_process = 1 verbose = True env_name = "l2rpn_case14_sandbox" env = grid2op.make(env_name, reward_class=LinesCapacityReward, backend=LightSimBackend() ) try: evaluate(env, nb_episode=nb_episode, load_path="./saved_model", # should be the same as what has been called in the train function ! name="test", # should be the same as what has been called in the train function ! nb_process=1, verbose=verbose, ) # you can also compare your agent with the do nothing agent relatively # easily runner_params = env.get_params_for_runner() runner = Runner(**runner_params) res = runner.run(nb_episode=nb_episode, nb_process=nb_process ) # Print summary if verbose: print("Evaluation summary for DN:") for _, chron_name, cum_reward, nb_time_step, max_ts in res: msg_tmp = "chronics at: {}".format(chron_name) msg_tmp += "\ttotal score: {:.6f}".format(cum_reward) msg_tmp += "\ttime steps: {:.0f}/{:.0f}".format(nb_time_step, max_ts) print(msg_tmp) finally: env.close() """ if obs_space_kwargs is None: obs_space_kwargs = {} if act_space_kwargs is None: act_space_kwargs = {} # load the attributes kept my_path = os.path.join(load_path, name) if not os.path.exists(load_path): os.mkdir(load_path) if not os.path.exists(my_path): os.mkdir(my_path) with open(os.path.join(my_path, "obs_attr_to_keep.json"), encoding="utf-8", mode="r") as f: obs_attr_to_keep = json.load(fp=f) with open(os.path.join(my_path, "act_attr_to_keep.json"), encoding="utf-8", mode="r") as f: act_attr_to_keep = json.load(fp=f) # create the action and observation space gym_observation_space = BoxGymObsSpace(env.observation_space, attr_to_keep=obs_attr_to_keep, **obs_space_kwargs) gym_action_space = BoxGymActSpace(env.action_space, attr_to_keep=act_attr_to_keep, **act_space_kwargs) if os.path.exists(os.path.join(load_path, ".normalize_act")): for attr_nm in act_attr_to_keep: if (("multiply" in act_space_kwargs and attr_nm in act_space_kwargs["multiply"]) or ("add" in act_space_kwargs and attr_nm in act_space_kwargs["add"]) ): continue gym_action_space.normalize_attr(attr_nm) if os.path.exists(os.path.join(load_path, ".normalize_obs")): for attr_nm in obs_attr_to_keep: if (("divide" in obs_space_kwargs and attr_nm in obs_space_kwargs["divide"]) or ("subtract" in obs_space_kwargs and attr_nm in obs_space_kwargs["subtract"]) ): continue gym_observation_space.normalize_attr(attr_nm) gymenv = None if gymenv_class is not None and issubclass(gymenv_class, GymEnvWithHeuristics): if gymenv_kwargs is None: gymenv_kwargs = {} gymenv = gymenv_class(env, **gymenv_kwargs) gymenv.action_space.close() gymenv.action_space = gym_action_space gymenv.observation_space.close() gymenv.observation_space = gym_observation_space # create a grid2gop agent based on that (this will reload the save weights) full_path = os.path.join(load_path, name) grid2op_agent = SB3Agent(env.action_space, gym_action_space, gym_observation_space, nn_path=os.path.join(full_path, name), gymenv=gymenv, iter_num=iter_num, ) if nb_episode == 0: return grid2op_agent, [] # Build runner runner_params = env.get_params_for_runner() runner_params["verbose"] = verbose runner = Runner(**runner_params, agentClass=None, agentInstance=grid2op_agent) # Run the agent on the scenarios if logs_path is not None: os.makedirs(logs_path, exist_ok=True) res = runner.run(path_save=logs_path, nb_episode=nb_episode, nb_process=nb_process, max_iter=max_steps, pbar=verbose, **kwargs) # Print summary if verbose: print("Evaluation summary:") for _, chron_name, cum_reward, nb_time_step, max_ts in res: msg_tmp = "chronics at: {}".format(chron_name) msg_tmp += "\ttotal score: {:.6f}".format(cum_reward) msg_tmp += "\ttime steps: {:.0f}/{:.0f}".format(nb_time_step, max_ts) print(msg_tmp) if save_gif: if verbose: print("Saving the gif of the episodes") save_log_gif(logs_path, res) return grid2op_agent, res
if __name__ == "__main__": import grid2op from grid2op.Reward import LinesCapacityReward # or any other rewards from lightsim2grid import LightSimBackend # highly recommended ! nb_episode = 7 nb_process = 1 verbose = True env_name = "l2rpn_case14_sandbox" env = grid2op.make(env_name, reward_class=LinesCapacityReward, backend=LightSimBackend() ) try: trained_agent, res_eval = evaluate( env, nb_episode=nb_episode, load_path="./saved_model", name="test4", nb_process=1, verbose=verbose, ) # you can also compare your agent with the do nothing agent relatively # easily runner_params = env.get_params_for_runner() runner = Runner(**runner_params) res = runner.run(nb_episode=nb_episode, nb_process=nb_process ) # Print summary if verbose: print("Evaluation summary for DN:") for _, chron_name, cum_reward, nb_time_step, max_ts in res: msg_tmp = "chronics at: {}".format(chron_name) msg_tmp += "\ttotal score: {:.6f}".format(cum_reward) msg_tmp += "\ttime steps: {:.0f}/{:.0f}".format(nb_time_step, max_ts) print(msg_tmp) finally: env.close()