Source code for l2rpn_baselines.utils.deepQAgent

# Copyright (c) 2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of L2RPN Baselines, L2RPN Baselines a repository to host baselines for l2rpn competitions.

import os
import warnings
import numpy as np
from tqdm import tqdm

from grid2op.Exceptions import Grid2OpException
from grid2op.Agent import AgentWithConverter
from grid2op.Converter import IdToAct

from l2rpn_baselines.utils.replayBuffer import ReplayBuffer
from l2rpn_baselines.utils.trainingParam import TrainingParam

try:
    from grid2op.Chronics import MultifolderWithCache
    _CACHE_AVAILABLE_DEEPQAGENT = True
except ImportError:
    _CACHE_AVAILABLE_DEEPQAGENT = False

try:
    import tensorflow as tf
    _CAN_USE_TENSORFLOW = True
except ImportError:
    _CAN_USE_TENSORFLOW = False

[docs]class DeepQAgent(AgentWithConverter): """ This class allows to train and log the training of different Q learning algorithm. .. warning:: This baseline recodes entire the RL training procedure. You can use it if you want to have a deeper look at Deep Q Learning algorithm and a possible (non optimized, slow, etc. implementation ). For a much better implementation, you can reuse the code of "PPO_RLLIB" or the "PPO_SB3" baseline. Prefer to use the :class:`GymAgent` class and the :class:`GymEnvWithHeuristics` classes to train agent interacting with grid2op and fully compatible with gym framework. It is not meant to be the state of the art implement of some baseline. It is rather meant to be a set of useful functions that allows to easily develop an environment if we want to get started in RL using grid2op. It derives from :class:`grid2op.Agent.AgentWithConverter` and as such implements the :func:`DeepQAgent.convert_obs` and :func:`DeepQAgent.my_act` It is suppose to be a Baseline, so it implements also the - :func:`DeepQAgent.load`: to load the agent - :func:`DeepQAgent.save`: to save the agent - :func:`DeepQAgent.train`: to train the agent TODO description of the training scheme! Attributes ---------- filter_action_fun: ``callable`` The function used to filter the action of the action space. See the documentation of grid2op: :class:`grid2op.Converter.IdToAct` `here <https://grid2op.readthedocs.io/en/v0.9.3/converter.html#grid2op.Converter.IdToAct>`_ for more information. replay_buffer: The experience replay buffer deep_q: :class:`BaseDeepQ` The neural network, represented as a :class:`BaseDeepQ` object. name: ``str`` The name of the Agent store_action: ``bool`` Whether you want to register which action your agent took or not. Saving the action can slow down a bit the computation (less than 1%) but can help understand what your agent is doing during its learning process. dict_action: ``str`` The action taken by the agent, represented as a dictionnary. This can be useful to know which type of actions is taken by your agent. Only filled if :attr:DeepQAgent.store_action` is ``True`` istraining: ``bool`` Whether you are training this agent or not. No more really used. Mainly used for backward compatibility. epsilon: ``float`` The epsilon greedy exploration parameter. nb_injection: ``int`` Number of action tagged as "injection". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_voltage: ``int`` Number of action tagged as "voltage". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_topology: ``int`` Number of action tagged as "topology". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_redispatching: ``int`` Number of action tagged as "redispatching". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_storage: ``int`` Number of action tagged as "storage". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_curtail: ``int`` Number of action tagged as "curtailment". See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. nb_do_nothing: ``int`` Number of action tagged as "do_nothing", *ie* when an action is not modifiying the state of the grid. See the `official grid2op documentation <https://grid2op.readthedocs.io/en/v0.9.3/action.html?highlight=get_types#grid2op.Action.BaseAction.get_types>`_ for more information. verbose: ``bool`` An effort will be made on the logging (outside of trensorboard) of the training. For now: verbose=True will allow some printing on the command prompt, and verbose=False will drastically reduce the amount of information printed during training. """ def __init__(self, action_space, nn_archi, name="DeepQAgent", store_action=True, istraining=False, filter_action_fun=None, verbose=False, observation_space=None, **kwargs_converters): if not _CAN_USE_TENSORFLOW: raise RuntimeError("Cannot import tensorflow, this function cannot be used.") AgentWithConverter.__init__(self, action_space, action_space_converter=IdToAct, **kwargs_converters) self.filter_action_fun = filter_action_fun if self.filter_action_fun is not None: self.action_space.filter_action(self.filter_action_fun) # and now back to the origin implementation self.replay_buffer = None self.__nb_env = None self.deep_q = None self._training_param = None self._tf_writer = None self.name = name self._losses = None self.__graph_saved = False self.store_action = store_action self.dict_action = {} self.istraining = istraining self.epsilon = 1.0 # for tensorbaord self._train_lr = None self._reset_num = None self._max_iter_env_ = 1000000 self._curr_iter_env = 0 self._max_reward = 0. # action type self.nb_injection = 0 self.nb_voltage = 0 self.nb_topology = 0 self.nb_line = 0 self.nb_redispatching = 0 self.nb_curtail = 0 self.nb_storage = 0 self.nb_do_nothing = 0 # for over sampling the hard scenarios self._prev_obs_num = 0 self._time_step_lived = None self._nb_chosen = None self._proba = None self._prev_id = 0 # this is for the "limit the episode length" depending on your previous success self._total_sucesses = 0 # neural network architecture self._nn_archi = nn_archi # observation tranformers self._obs_as_vect = None self._tmp_obs = None self._indx_obs = None self.verbose = verbose if observation_space is None: pass else: self.init_obs_extraction(observation_space) # for the frequency of action type self.current_ = 0 self.nb_ = 10 self._nb_this_time = np.zeros((self.nb_, 8), dtype=int) # self._vector_size = None self._actions_per_ksteps = None self._illegal_actions_per_ksteps = None self._ambiguous_actions_per_ksteps = None def _fill_vectors(self, training_param): self._vector_size = self.nb_ * training_param.update_tensorboard_freq self._actions_per_ksteps = np.zeros((self._vector_size, self.action_space.size()), dtype=np.int) self._illegal_actions_per_ksteps = np.zeros(self._vector_size, dtype=np.int) self._ambiguous_actions_per_ksteps = np.zeros(self._vector_size, dtype=np.int) # grid2op.Agent interface
[docs] def convert_obs(self, observation): """ Generic way to convert an observation. This transform it to a vector and the select the attributes that were selected in :attr:`l2rpn_baselines.utils.NNParams.list_attr_obs` (that have been extracted once and for all in the :attr:`DeepQAgent._indx_obs` vector). Parameters ---------- observation: :class:`grid2op.Observation.BaseObservation` The current observation sent by the environment Returns ------- _tmp_obs: ``numpy.ndarray`` The observation as vector with only the proper attribute selected (TODO scaling will be available in future version) """ obs_as_vect = observation.to_vect() self._tmp_obs[:] = obs_as_vect[self._indx_obs] return self._tmp_obs
[docs] def my_act(self, transformed_observation, reward, done=False): """ This function will return the action (its id) selected by the underlying :attr:`DeepQAgent.deep_q` network. Before being used, this method require that the :attr:`DeepQAgent.deep_q` is created. To that end a call to :func:`DeepQAgent.init_deep_q` needs to have been performed (this is automatically done if you use baseline we provide and their `evaluate` and `train` scripts). Parameters ---------- transformed_observation: ``numpy.ndarray`` The observation, as transformed after :func:`DeepQAgent.convert_obs` reward: ``float`` The reward of the last time step. Ignored by this method. Here for retro compatibility with openAI gym interface. done: ``bool`` Whether the episode is over or not. This is not used, and is only present to be compliant with open AI gym interface Returns ------- res: ``int`` The id the action taken. """ predict_movement_int, *_ = self.deep_q.predict_movement(transformed_observation, epsilon=0.0, training=False) res = int(predict_movement_int) self._store_action_played(res) return res
[docs] @staticmethod def get_action_size(action_space, filter_fun, kwargs_converters): """ This function allows to get the size of the action space if we were to built a :class:`DeepQAgent` with this parameters. Parameters ---------- action_space: :class:`grid2op.ActionSpace` The grid2op action space used. filter_fun: ``callable`` see :attr:`DeepQAgent.filter_fun` for more information kwargs_converters: ``dict`` see the documentation of grid2op for more information: `here <https://grid2op.readthedocs.io/en/v0.9.3/converter.html?highlight=idToAct#grid2op.Converter.IdToAct.init_converter>`_ See Also -------- The official documentation of grid2Op, and especially its class "IdToAct" at this address `IdToAct <https://grid2op.readthedocs.io/en/v0.9.3/converter.html?highlight=idToAct#grid2op.Converter.IdToAct>`_ """ converter = IdToAct(action_space) converter.init_converter(**kwargs_converters) if filter_fun is not None: converter.filter_action(filter_fun) return converter.n
[docs] def init_obs_extraction(self, observation_space): """ This method should be called to initialize the observation (feed as a vector in the neural network) from its description as a list of its attribute names. """ tmp = np.zeros(0, dtype=np.uint) # TODO platform independant for obs_attr_name in self._nn_archi.get_obs_attr(): beg_, end_, dtype_ = observation_space.get_indx_extract(obs_attr_name) tmp = np.concatenate((tmp, np.arange(beg_, end_, dtype=np.uint))) self._indx_obs = tmp self._tmp_obs = np.zeros((1, tmp.shape[0]), dtype=np.float32)
# baseline interface
[docs] def load(self, path): """ Part of the l2rpn_baselines interface, this function allows to read back a trained model, to continue the training or to evaluate its performance for example. **NB** To reload an agent, it must have exactly the same name and have been saved at the right location. Parameters ---------- path: ``str`` The path where the agent has previously beens saved. """ # not modified compare to original implementation tmp_me = os.path.join(path, self.name) if not os.path.exists(tmp_me): raise RuntimeError("The model should be stored in \"{}\". But this appears to be empty".format(tmp_me)) self._load_action_space(tmp_me) # TODO handle case where training param class has been overidden self._training_param = TrainingParam.from_json(os.path.join(tmp_me, "training_params.json".format(self.name))) self.deep_q = self._nn_archi.make_nn(self._training_param) try: self.deep_q.load_network(tmp_me, name=self.name) except Exception as e: raise RuntimeError("Impossible to load the model located at \"{}\" with error \n{}".format(path, e)) for nm_attr in ["_time_step_lived", "_nb_chosen", "_proba"]: conv_path = os.path.join(tmp_me, "{}.npy".format(nm_attr)) if os.path.exists(conv_path): setattr(self, nm_attr, np.load(file=conv_path))
[docs] def save(self, path): """ Part of the l2rpn_baselines interface, this allows to save a model. Its name is used at saving time. The same name must be reused when loading it back. Parameters ---------- path: ``str`` The path where to save the agent. """ if path is not None: tmp_me = os.path.join(path, self.name) if not os.path.exists(tmp_me): os.mkdir(tmp_me) nm_conv = "action_space.npy" conv_path = os.path.join(tmp_me, nm_conv) if not os.path.exists(conv_path): self.action_space.save(path=tmp_me, name=nm_conv) self._training_param.save_as_json(tmp_me, name="training_params.json") self._nn_archi.save_as_json(tmp_me, "nn_architecture.json") self.deep_q.save_network(tmp_me, name=self.name) # TODO save the "oversampling" part, and all the other info for nm_attr in ["_time_step_lived", "_nb_chosen", "_proba"]: conv_path = os.path.join(tmp_me, "{}.npy".format(nm_attr)) attr_ = getattr(self, nm_attr) if attr_ is not None: np.save(arr=attr_, file=conv_path)
[docs] def train(self, env, iterations, save_path, logdir, training_param=None): """ Part of the public l2rpn-baselines interface, this function allows to train the baseline. If `save_path` is not None, the the model is saved regularly, and also at the end of training. TODO explain a bit more how you can train it. Parameters ---------- env: :class:`grid2op.Environment.Environment` or :class:`grid2op.Environment.MultiEnvironment` The environment used to train your model. iterations: ``int`` The number of training iteration. NB when reloading a model, this is **NOT** the training steps that will be used when re training. Indeed, if `iterations` is 1000 and the model was already trained for 750 time steps, then when reloaded, the training will occur on 250 (=1000 - 750) time steps only. save_path: ``str`` Location at which to save the model logdir: ``str`` Location at which tensorboard related information will be kept. training_param: :class:`l2rpn_baselines.utils.TrainingParam` The meta parameters for the training procedure. This is currently ignored if the model is reloaded (in that case the parameters used when first created will be used) """ if training_param is None: training_param = TrainingParam() self._train_lr = training_param.lr if self._training_param is None: self._training_param = training_param else: training_param = self._training_param self._init_deep_q(self._training_param, env) self._fill_vectors(self._training_param) self._init_replay_buffer() # efficient reading of the data (read them by chunk of roughly 1 day nb_ts_one_day = 24 * 60 / 5 # number of time steps per day self._set_chunk(env, nb_ts_one_day) # Create file system related vars if save_path is not None: save_path = os.path.abspath(save_path) os.makedirs(save_path, exist_ok=True) if logdir is not None: logpath = os.path.join(logdir, self.name) self._tf_writer = tf.summary.create_file_writer(logpath, name=self.name) else: logpath = None self._tf_writer = None UPDATE_FREQ = training_param.update_tensorboard_freq # update tensorboard every "UPDATE_FREQ" steps SAVING_NUM = training_param.save_model_each if hasattr(env, "nb_env"): nb_env = env.nb_env warnings.warn("Training using {} environments".format(nb_env)) self.__nb_env = nb_env else: self.__nb_env = 1 # if isinstance(env, grid2op.Environment.Environment): # self.__nb_env = 1 # else: # import warnings # nb_env = env.nb_env # warnings.warn("Training using {} environments".format(nb_env)) # self.__nb_env = nb_env self.init_obs_extraction(env.observation_space) training_step = self._training_param.last_step # some parameters have been move to a class named "training_param" for convenience self.epsilon = self._training_param.initial_epsilon # now the number of alive frames and total reward depends on the "underlying environment". It is vector instead # of scalar alive_frame, total_reward = self._init_global_train_loop() reward, done = self._init_local_train_loop() epoch_num = 0 self._losses = np.zeros(iterations) alive_frames = np.zeros(iterations) total_rewards = np.zeros(iterations) new_state = None self._reset_num = 0 self._curr_iter_env = 0 self._max_reward = env.reward_range[1] # action types # injection, voltage, topology, line, redispatching = action.get_types() self.nb_injection = 0 self.nb_voltage = 0 self.nb_topology = 0 self.nb_line = 0 self.nb_redispatching = 0 self.nb_curtail = 0 self.nb_storage = 0 self.nb_do_nothing = 0 # for non uniform random sampling of the scenarios th_size = None self._prev_obs_num = 0 if self.__nb_env == 1: # TODO make this available for multi env too if _CACHE_AVAILABLE_DEEPQAGENT: if isinstance(env.chronics_handler.real_data, MultifolderWithCache): th_size = env.chronics_handler.real_data.cache_size if th_size is None: th_size = len(env.chronics_handler.real_data.subpaths) # number of time step lived per possible scenarios if self._time_step_lived is None or self._time_step_lived.shape[0] != th_size: self._time_step_lived = np.zeros(th_size, dtype=np.uint64) # number of time a given scenario has been played if self._nb_chosen is None or self._nb_chosen.shape[0] != th_size: self._nb_chosen = np.zeros(th_size, dtype=np.uint) # number of time a given scenario has been played if self._proba is None or self._proba.shape[0] != th_size: self._proba = np.ones(th_size, dtype=np.float64) self._prev_id = 0 # this is for the "limit the episode length" depending on your previous success self._total_sucesses = 0 with tqdm(total=iterations - training_step, disable=not self.verbose) as pbar: while training_step < iterations: # reset or build the environment initial_state = self._need_reset(env, training_step, epoch_num, done, new_state) # Slowly decay the exploration parameter epsilon # if self.epsilon > training_param.FINAL_EPSILON: self.epsilon = self._training_param.get_next_epsilon(current_step=training_step) # then we need to predict the next moves. Agents have been adapted to predict a batch of data pm_i, pq_v, act = self._next_move(initial_state, self.epsilon, training_step) # todo store the illegal / ambiguous / ... actions reward, done = self._init_local_train_loop() if self.__nb_env == 1: # still the "hack" to have same interface between multi env and env... # yeah it's a pain act = act[0] temp_observation_obj, temp_reward, temp_done, info = env.step(act) if self.__nb_env == 1: # dirty hack to wrap them into list temp_observation_obj = [temp_observation_obj] temp_reward = np.array([temp_reward], dtype=np.float32) temp_done = np.array([temp_done], dtype=np.bool) info = [info] new_state = self._convert_obs_train(temp_observation_obj) self._updage_illegal_ambiguous(training_step, info) done, reward, total_reward, alive_frame, epoch_num \ = self._update_loop(done, temp_reward, temp_done, alive_frame, total_reward, reward, epoch_num) # update the replay buffer self._store_new_state(initial_state, pm_i, reward, done, new_state) # now train the model if not self._train_model(training_step): # infinite loss in this case raise RuntimeError("ERROR INFINITE LOSS") # Save the network every 1000 iterations if training_step % SAVING_NUM == 0 or training_step == iterations - 1: self.save(save_path) # save some information to tensorboard alive_frames[epoch_num] = np.mean(alive_frame) total_rewards[epoch_num] = np.mean(total_reward) self._store_action_played_train(training_step, pm_i) self._save_tensorboard(training_step, epoch_num, UPDATE_FREQ, total_rewards, alive_frames) training_step += 1 pbar.update(1) self.save(save_path)
# auxiliary functions # two below function: to train with multiple environments def _convert_obs_train(self, observations): """ create the observations that are used for training.""" if self._obs_as_vect is None: size_obs = self.convert_obs(observations[0]).shape[1] self._obs_as_vect = np.zeros((self.__nb_env, size_obs), dtype=np.float32) for i, obs in enumerate(observations): self._obs_as_vect[i, :] = self.convert_obs(obs).reshape(-1) return self._obs_as_vect def _create_action_if_not_registered(self, action_int): """make sure that `action_int` is present in dict_action""" if action_int not in self.dict_action: act = self.action_space.all_actions[action_int] is_inj, is_volt, is_topo, is_line_status, is_redisp, is_storage, is_dn, is_curtail = \ False, False, False, False, False, False, False, False try: # feature unavailble in grid2op <= 0.9.2 try: # storage introduced in grid2op 1.5.0 so if below it is not supported is_inj, is_volt, is_topo, is_line_status, is_redisp = act.get_types() except ValueError as exc_: try: is_inj, is_volt, is_topo, is_line_status, is_redisp, is_storage = act.get_types() except ValueError as exc_: is_inj, is_volt, is_topo, is_line_status, is_redisp, is_storage, is_curtail = act.get_types() is_dn = (not is_inj) and (not is_volt) and (not is_topo) and (not is_line_status) and (not is_redisp) is_dn = is_dn and (not is_storage) is_dn = is_dn and (not is_curtail) except Exception as exc_: pass self.dict_action[action_int] = [0, act, (is_inj, is_volt, is_topo, is_line_status, is_redisp, is_storage, is_curtail, is_dn)] def _store_action_played(self, action_int): """if activated, this function will store the action taken by the agent.""" if self.store_action: self._create_action_if_not_registered(action_int) self.dict_action[action_int][0] += 1 (is_inj, is_volt, is_topo, is_line_status, is_redisp, is_storage, is_curtail, is_dn) = self.dict_action[action_int][2] if is_inj: self.nb_injection += 1 if is_volt: self.nb_voltage += 1 if is_topo: self.nb_topology += 1 if is_line_status: self.nb_line += 1 if is_redisp: self.nb_redispatching += 1 if is_storage: self.nb_storage += 1 self.nb_redispatching += 1 if is_curtail: self.nb_curtail += 1 if is_dn: self.nb_do_nothing += 1 def _convert_all_act(self, act_as_integer): """this function converts the action given as a list of integer. It ouputs a list of valid grid2op Action""" res = [] for act_id in act_as_integer: res.append(self.convert_act(act_id)) self._store_action_played(act_id) return res def _load_action_space(self, path): """ load the action space in case the model is reloaded""" if not os.path.exists(path): raise RuntimeError("The model should be stored in \"{}\". But this appears to be empty".format(path)) try: self.action_space.init_converter( all_actions=os.path.join(path, "action_space.npy".format(self.name))) except Exception as e: raise RuntimeError("Impossible to reload converter action space with error \n{}".format(e)) # utilities for data reading def _set_chunk(self, env, nb): """ to optimize the data reading process. See the official grid2op documentation for the effect of setting the chunk size for the environment. """ env.set_chunk_size(int(max(100, nb))) def _train_model(self, training_step): """train the deep q networks.""" self._training_param.tell_step(training_step) if training_step > max(self._training_param.min_observation, self._training_param.minibatch_size) and \ self._training_param.do_train(): # train the model s_batch, a_batch, r_batch, d_batch, s2_batch = self.replay_buffer.sample(self._training_param.minibatch_size) tf_writer = None if self.__graph_saved is False: tf_writer = self._tf_writer loss = self.deep_q.train(s_batch, a_batch, r_batch, d_batch, s2_batch, tf_writer) # save learning rate for later if hasattr(self.deep_q._optimizer_model, "_decayed_lr"): self.train_lr = self.deep_q._optimizer_model._decayed_lr('float32').numpy() else: self.train_lr = self.deep_q._optimizer_model.learning_rate.numpy() self.__graph_saved = True if not np.all(np.isfinite(loss)): # if the loss is not finite i stop the learning return False self.deep_q.target_train() self._losses[training_step:] = np.sum(loss) return True def _updage_illegal_ambiguous(self, curr_step, info): """update the conunt of illegal and ambiguous actions""" tmp_ = curr_step % self._vector_size self._illegal_actions_per_ksteps[tmp_] = np.sum([el["is_illegal"] for el in info]) self._ambiguous_actions_per_ksteps[tmp_] = np.sum([el["is_ambiguous"] for el in info]) def _store_action_played_train(self, training_step, action_id): """store which action were played, for tensorboard only.""" which_row = training_step % self._vector_size self._actions_per_ksteps[which_row, :] = 0 self._actions_per_ksteps[which_row, action_id] += 1 def _fast_forward_env(self, env, time=7*24*60/5): """use this functio to skip some time steps when environment is reset.""" my_int = np.random.randint(0, min(time, env.chronics_handler.max_timestep())) env.fast_forward_chronics(my_int) def _reset_env_clean_state(self, env): """ reset this environment to a proper state. This should rather be integrated in grid2op. And will probably be integrated partially starting from grid2op 1.0.0 """ # /!\ DO NOT ATTEMPT TO MODIFY OTHERWISE IT WILL PROBABLY CRASH /!\ # /!\ THIS WILL BE PART OF THE ENVIRONMENT IN FUTURE GRID2OP RELEASE (>= 1.0.0) /!\ # AND OF COURSE USING THIS METHOD DURING THE EVALUATION IS COMPLETELY FORBIDDEN if self.__nb_env > 1: return env.current_obs = None env.env_modification = None env._reset_maintenance() env._reset_redispatching() env._reset_vectors_and_timings() _backend_action = env._backend_action_class() _backend_action.all_changed() env._backend_action =_backend_action env.backend.apply_action(_backend_action) _backend_action.reset() *_, fail_to_start, info = env.step(env.action_space()) if fail_to_start: # this is happening because not enough care has been taken to handle these problems # more care will be taken when this feature will be available in grid2op directly. raise Grid2OpException("Impossible to initialize the powergrid, the powerflow diverge at iteration 0. " "Available information are: {}".format(info)) env._reset_vectors_and_timings() def _need_reset(self, env, observation_num, epoch_num, done, new_state): """perform the proper reset of the environment""" if self._training_param.step_increase_nb_iter is not None and \ self._training_param.step_increase_nb_iter > 0: self._max_iter_env(min(max(self._training_param.min_iter, self._training_param.max_iter_fun(self._total_sucesses)), self._training_param.max_iter)) # TODO self._curr_iter_env += 1 if new_state is None: # it's the first ever loop obs = env.reset() if self.__nb_env == 1: # still hack to have same program interface between multi env and not multi env obs = [obs] new_state = self._convert_obs_train(obs) elif self.__nb_env > 1: # in multi env this is automatically handled pass elif done[0]: nb_ts_one_day = 24*60/5 if False: # the 3-4 lines below allow to reuse the loaded dataset and continue further up in the try: self._reset_env_clean_state(env) # random fast forward between now and next day self._fast_forward_env(env, time=nb_ts_one_day) except (StopIteration, Grid2OpException): env.reset() # random fast forward between now and next week self._fast_forward_env(env, time=7*nb_ts_one_day) # update the number of time steps it has live ts_lived = observation_num - self._prev_obs_num if self._time_step_lived is not None: self._time_step_lived[self._prev_id] += ts_lived self._prev_obs_num = observation_num if self._training_param.oversampling_rate is not None: # proba = np.sqrt(1. / (self._time_step_lived +1)) # # over sampling some kind of "UCB like" stuff # # https://banditalgs.com/2016/09/18/the-upper-confidence-bound-algorithm/ # proba = 1. / (self._time_step_lived + 1) self._proba[:] = 1. / (self._time_step_lived ** self._training_param.oversampling_rate + 1) self._proba /= np.sum(self._proba) _prev_id = self._prev_id self._prev_id = None if _CACHE_AVAILABLE_DEEPQAGENT: if isinstance(env.chronics_handler.real_data, MultifolderWithCache): self._prev_id = env.chronics_handler.real_data.sample_next_chronics(self._proba) if self._prev_id is None: self._prev_id = _prev_id + 1 self._prev_id %= self._time_step_lived.shape[0] obs = self._reset_env(env, epoch_num) if self._training_param.sample_one_random_action_begin is not None and \ observation_num < self._training_param.sample_one_random_action_begin: done = True while done: act = env.action_space(env.action_space._sample_set_bus()) obs, reward, done, info = env.step(act) if info["is_illegal"] or info["is_ambiguous"]: # there are no guarantee that sampled action are legal nor perfectly # correct. # if that is the case, i "simply" restart the process, as if the action # broke everything done = True if done: obs = self._reset_env(env, epoch_num) else: if self.verbose: print("step {}: {}".format(observation_num, act)) obs = [obs] # for compatibility with multi env... new_state = self._convert_obs_train(obs) return new_state def _reset_env(self, env, epoch_num): env.reset() if self._nb_chosen is not None: self._nb_chosen[self._prev_id] += 1 # random fast forward between now and next week if self._training_param.random_sample_datetime_start is not None: self._fast_forward_env(env, time=self._training_param.random_sample_datetime_start) self._curr_iter_env = 0 obs = [env.current_obs] if epoch_num % len(env.chronics_handler.real_data.subpaths) == 0: # re shuffle the data env.chronics_handler.shuffle(lambda x: x[np.random.choice(len(x), size=len(x), replace=False)]) return obs def _init_replay_buffer(self): """create and initialized the replay buffer""" self.replay_buffer = ReplayBuffer(self._training_param.buffer_size) def _store_new_state(self, initial_state, predict_movement_int, reward, done, new_state): """store the new state in the replay buffer""" # vectorized version of the previous code for i_s, pm_i, reward, done, ns in zip(initial_state, predict_movement_int, reward, done, new_state): self.replay_buffer.add(i_s, pm_i, reward, done, ns) def _max_iter_env(self, new_max_iter): """update the number of maximum iteration allowed.""" self._max_iter_env_ = new_max_iter def _next_move(self, curr_state, epsilon, training_step): # supposes that 0 encodes for do nothing, otherwise it will NOT work (for the observer) pm_i, pq_v, q_actions = self.deep_q.predict_movement(curr_state, epsilon, training=True) # TODO implement the "max XXX random action per scenarios" pm_i, pq_v = self._short_circuit_actions(training_step, pm_i, pq_v, q_actions) act = self._convert_all_act(pm_i) return pm_i, pq_v, act def _short_circuit_actions(self, training_step, pm_i, pq_v, q_actions): if self._training_param.min_observe is not None and \ training_step < self._training_param.min_observe: # action is replaced by do nothing due to the "observe only" specification pm_i[:] = 0 pq_v[:] = q_actions[:, 0] return pm_i, pq_v def _init_global_train_loop(self): alive_frame = np.zeros(self.__nb_env, dtype=np.int) total_reward = np.zeros(self.__nb_env, dtype=np.float32) return alive_frame, total_reward def _update_loop(self, done, temp_reward, temp_done, alive_frame, total_reward, reward, epoch_num): if self.__nb_env == 1: # force end of episode at early stage of learning if self._curr_iter_env >= self._max_iter_env_: temp_done[0] = True temp_reward[0] = self._max_reward self._total_sucesses += 1 done = temp_done alive_frame[done] = 0 total_reward[done] = 0. self._reset_num += np.sum(done) if self._reset_num >= self.__nb_env: # increase the "global epoch num" represented by "epoch_num" only when on average # all environments are "done" epoch_num += 1 self._reset_num = 0 total_reward[~done] += temp_reward[~done] alive_frame[~done] += 1 return done, temp_reward, total_reward, alive_frame, epoch_num def _init_local_train_loop(self): # reward, done = np.zeros(self.nb_process), np.full(self.nb_process, fill_value=False, dtype=np.bool) reward = np.zeros(self.__nb_env, dtype=np.float32) done = np.full(self.__nb_env, fill_value=False, dtype=np.bool) return reward, done def _init_deep_q(self, training_param, env): """ This function serves as initializin the neural network. """ if self.deep_q is None: self.deep_q = self._nn_archi.make_nn(training_param) self.init_obs_extraction(env.observation_space) def _save_tensorboard(self, step, epoch_num, UPDATE_FREQ, epoch_rewards, epoch_alive): """save all the informations needed in tensorboard.""" if self._tf_writer is None: return # Log some useful metrics every even updates if step % UPDATE_FREQ == 0 and epoch_num > 0: if step % (10 * UPDATE_FREQ) == 0: # print the top k scenarios the "hardest" (ie chosen the most number of times if self.verbose: top_k = 10 if self._nb_chosen is not None: array_ = np.argsort(self._nb_chosen)[-top_k:][::-1] print("hardest scenarios\n{}".format(array_)) print("They have been chosen respectively\n{}".format(self._nb_chosen[array_])) # print("Associated proba are\n{}".format(self._proba[array_])) print("The number of timesteps played is\n{}".format(self._time_step_lived[array_])) print("avg (accross all scenarios) number of timsteps played {}" "".format(np.mean(self._time_step_lived))) print("Time alive: {}".format(self._time_step_lived[array_] / (self._nb_chosen[array_] + 1))) print("Avg time alive: {}".format(np.mean(self._time_step_lived / (self._nb_chosen + 1 )))) with self._tf_writer.as_default(): last_alive = epoch_alive[(epoch_num-1)] last_reward = epoch_rewards[(epoch_num-1)] mean_reward = np.nanmean(epoch_rewards[:epoch_num]) mean_alive = np.nanmean(epoch_alive[:epoch_num]) mean_reward_30 = mean_reward mean_alive_30 = mean_alive mean_reward_100 = mean_reward mean_alive_100 = mean_alive tmp = self._actions_per_ksteps > 0 tmp = tmp.sum(axis=0) nb_action_taken_last_kstep = np.sum(tmp > 0) nb_illegal_act = np.sum(self._illegal_actions_per_ksteps) nb_ambiguous_act = np.sum(self._ambiguous_actions_per_ksteps) if epoch_num >= 100: mean_reward_100 = np.nanmean(epoch_rewards[(epoch_num-100):epoch_num]) mean_alive_100 = np.nanmean(epoch_alive[(epoch_num-100):epoch_num]) if epoch_num >= 30: mean_reward_30 = np.nanmean(epoch_rewards[(epoch_num-30):epoch_num]) mean_alive_30 = np.nanmean(epoch_alive[(epoch_num-30):epoch_num]) # to ensure "fair" comparison between single env and multi env step_tb = step # * self.__nb_env # if multiply by the number of env we have "trouble" with random exploration at the beginning # because it lasts the same number of "real" steps # show first the Mean reward and mine time alive (hence the upper case) tf.summary.scalar("Mean_alive_30", mean_alive_30, step_tb, description="Average number of steps (per episode) made over the last 30 " "completed episodes") tf.summary.scalar("Mean_reward_30", mean_reward_30, step_tb, description="Average (final) reward obtained over the last 30 completed episodes") # then it's alpha numerical order, hence the "z_" in front of some information tf.summary.scalar("loss", self._losses[step], step_tb, description="Training loss (for the last training batch)") tf.summary.scalar("last_alive", last_alive, step_tb, description="Final number of steps for the last complete episode") tf.summary.scalar("last_reward", last_reward, step_tb, description="Final reward over the last complete episode") tf.summary.scalar("mean_reward", mean_reward, step_tb, description="Average reward over the whole episodes played") tf.summary.scalar("mean_alive", mean_alive, step_tb, description="Average time alive over the whole episodes played") tf.summary.scalar("mean_reward_100", mean_reward_100, step_tb, description="Average number of steps (per episode) made over the last 100 " "completed episodes") tf.summary.scalar("mean_alive_100", mean_alive_100, step_tb, description="Average (final) reward obtained over the last 100 completed episodes") tf.summary.scalar("nb_different_action_taken", nb_action_taken_last_kstep, step_tb, description="Number of different actions played the last " "{} steps".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("nb_illegal_act", nb_illegal_act, step_tb, description="Number of illegal actions played the last " "{} steps".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("nb_ambiguous_act", nb_ambiguous_act, step_tb, description="Number of ambiguous actions played the last " "{} steps".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("nb_total_success", self._total_sucesses, step_tb, description="Number of times the episode was completed entirely " "(no game over)") tf.summary.scalar("z_lr", self._train_lr, step_tb, description="Current learning rate") tf.summary.scalar("z_epsilon", self.epsilon, step_tb, description="Current epsilon (from the epsilon greedy)") tf.summary.scalar("z_max_iter", self._max_iter_env_, step_tb, description="Maximum number of time steps before deciding a scenario " "is over (=win)") tf.summary.scalar("z_total_episode", epoch_num, step_tb, description="Total number of episode played (number of \"reset\")") self.deep_q.save_tensorboard(step_tb) if self.store_action: self._store_frequency_action_type(UPDATE_FREQ, step_tb) # if self._time_step_lived is not None: # tf.summary.histogram( # "timestep_lived", self._time_step_lived, step=step_tb, buckets=None, # description="Number of time steps lived for all scenarios" # ) # if self._nb_chosen is not None: # tf.summary.histogram( # "nb_chosen", self._nb_chosen, step=step_tb, buckets=None, # description="Number of times this scenarios has been played" # ) def _store_frequency_action_type(self, UPDATE_FREQ, step_tb): self.current_ += 1 self.current_ %= self.nb_ nb_inj, nb_volt, nb_topo, nb_line, nb_redisp, nb_storage, nb_curtail, nb_dn = self._nb_this_time[self.current_, :] self._nb_this_time[self.current_, :] = [self.nb_injection, self.nb_voltage, self.nb_topology, self.nb_line, self.nb_redispatching, self.nb_storage, self.nb_curtail, self.nb_do_nothing] curr_inj = self.nb_injection - nb_inj curr_volt = self.nb_voltage - nb_volt curr_topo = self.nb_topology - nb_topo curr_line = self.nb_line - nb_line curr_redisp = self.nb_redispatching - nb_redisp curr_storage = self.nb_storage - nb_storage curr_curtail = self.nb_curtail - nb_curtail curr_dn = self.nb_do_nothing - nb_dn total_act_num = curr_inj + curr_volt + curr_topo + curr_line + curr_redisp + curr_dn + curr_storage tf.summary.scalar("zz_freq_inj", curr_inj / total_act_num, step_tb, description="Frequency of \"injection\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("zz_freq_voltage", curr_volt / total_act_num, step_tb, description="Frequency of \"voltage\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_topo", curr_topo / total_act_num, step_tb, description="Frequency of \"topo\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_line_status", curr_line / total_act_num, step_tb, description="Frequency of \"line status\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_redisp", curr_redisp / total_act_num, step_tb, description="Frequency of \"redispatching\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_do_nothing", curr_dn / total_act_num, step_tb, description="Frequency of \"do nothing\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_storage", curr_storage / total_act_num, step_tb, description="Frequency of \"storage\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ)) tf.summary.scalar("z_freq_curtail", curr_curtail / total_act_num, step_tb, description="Frequency of \"curtailment\" actions " "type played over the last {} actions" "".format(self.nb_ * UPDATE_FREQ))