Source code for l2rpn_baselines.utils.nnParam

# Copyright (c) 2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of L2RPN Baselines, L2RPN Baselines a repository to host baselines for l2rpn competitions.
import os
import json
import numpy as np
from collections.abc import Iterable

import grid2op
from l2rpn_baselines.utils.baseDeepQ import BaseDeepQ


[docs]class NNParam(object): """ This class provides an easy way to save and restore, as json, the shape of your neural networks (number of layers, non linearities, size of each layers etc.) It is recommended to overload this class for each specific model. .. warning:: This baseline recodes entire the RL training procedure. You can use it if you want to have a deeper look at Deep Q Learning algorithm and a possible (non optimized, slow, etc. implementation ). For a much better implementation, you can reuse the code of "PPO_RLLIB" or the "PPO_SB3" baseline. Prefer to use the :class:`GymAgent` class and the :class:`GymEnvWithHeuristics` classes to train agent interacting with grid2op and fully compatible with gym framework. Attributes ---------- nn_class: :class:`l2rpn_baselines.BaseDeepQ` The neural network class that will be created with each call of :func:`l2rpn_baselines.make_nn` observation_size: ``int`` The size of the observation space. action_size: ``int`` The size of the action space. sizes: ``list`` A list of integer, each will represent the number of hidden units. The number of hidden layer is given by the size / length of this list. activs: ``list`` List of activation functions (given as string). It should have the same length as the :attr:`NNParam.sizes`. This function should be name of keras activation function. list_attr_obs: ``list`` List of the attributes that will be used from the observation and concatenated to be fed to the neural network. """ _int_attr = ["action_size", "observation_size"] _float_attr = [] _str_attr = [] _list_float = [] _list_str = ["activs", "list_attr_obs"] _list_int = ["sizes"] nn_class = BaseDeepQ def __init__(self, action_size, observation_size, sizes, activs, list_attr_obs, ): self.observation_size = observation_size self.action_size = action_size self.sizes = [int(el) for el in sizes] self.activs = [str(el) for el in activs] if len(self.sizes) != len(self.activs): raise RuntimeError("\"sizes\" and \"activs\" lists have not the same size. It's not clear how many layers " "you want your neural network to have.") self.list_attr_obs = [str(el) for el in list_attr_obs]
[docs] @classmethod def get_path_model(cls, path, name=None): """get the path at which the model will be saved""" return cls.nn_class.get_path_model(path, name=name)
[docs] def make_nn(self, training_param): """build the appropriate BaseDeepQ""" res = self.nn_class(self, training_param) return res
[docs] @staticmethod def get_obs_size(env, list_attr_name): """get the size of the flatten observation""" res = 0 for obs_attr_name in list_attr_name: beg_, end_, dtype_ = env.observation_space.get_indx_extract(obs_attr_name) res += end_ - beg_ # no "+1" needed because "end_" is exclude by python convention return res
[docs] def get_obs_attr(self): """get the names of the observation attributes that will be extracted """ return self.list_attr_obs
# utilitaries, do not change
[docs] def to_dict(self): """convert this instance to a dictionnary""" # TODO copy and paste from TrainingParam res = {} for attr_nm in self._int_attr: tmp = getattr(self, attr_nm) if tmp is not None: res[attr_nm] = int(tmp) else: res[attr_nm] = None for attr_nm in self._float_attr: tmp = getattr(self, attr_nm) if tmp is not None: res[attr_nm] = float(tmp) else: res[attr_nm] = None for attr_nm in self._str_attr: tmp = getattr(self, attr_nm) if tmp is not None: res[attr_nm] = str(tmp) else: res[attr_nm] = None for attr_nm in self._list_float: tmp = getattr(self, attr_nm) res[attr_nm] = self._convert_list_to_json(tmp, float) for attr_nm in self._list_int: tmp = getattr(self, attr_nm) res[attr_nm] = self._convert_list_to_json(tmp, int) for attr_nm in self._list_str: tmp = getattr(self, attr_nm) res[attr_nm] = self._convert_list_to_json(tmp, str) return res
@classmethod def _convert_list_to_json(cls, obj, type_): if isinstance(obj, type_): res = obj elif isinstance(obj, np.ndarray): if len(obj.shape) == 1: res = [type_(el) for el in obj] else: res = [cls._convert_list_to_json(el, type_) for el in obj] elif isinstance(obj, Iterable): res = [cls._convert_list_to_json(el, type_) for el in obj] else: res = type_(obj) return res @classmethod def _attr_from_json(cls, json, type_): if isinstance(json, type_): res = json elif isinstance(json, list): res = [cls._convert_list_to_json(obj=el, type_=type_) for el in json] else: res = type_(json) return res
[docs] @classmethod def from_dict(cls, tmp): """load from a dictionnary""" # TODO copy and paste from TrainingParam (more or less) cls_as_dict = {} for attr_nm in cls._int_attr: if attr_nm in tmp: tmp_ = tmp[attr_nm] if tmp_ is not None: cls_as_dict[attr_nm] = int(tmp_) else: cls_as_dict[attr_nm] = None for attr_nm in cls._float_attr: if attr_nm in tmp: tmp_ = tmp[attr_nm] if tmp_ is not None: cls_as_dict[attr_nm] = float(tmp_) else: cls_as_dict[attr_nm] = None for attr_nm in cls._str_attr: if attr_nm in tmp: tmp_ = tmp[attr_nm] if tmp_ is not None: cls_as_dict[attr_nm] = str(tmp_) else: cls_as_dict[attr_nm] = None for attr_nm in cls._list_float: if attr_nm in tmp: cls_as_dict[attr_nm] = cls._attr_from_json(tmp[attr_nm], float) for attr_nm in cls._list_int: if attr_nm in tmp: cls_as_dict[attr_nm] = cls._attr_from_json(tmp[attr_nm], int) for attr_nm in cls._list_str: if attr_nm in tmp: cls_as_dict[attr_nm] = cls._attr_from_json(tmp[attr_nm], str) res = cls(**cls_as_dict) return res
[docs] @classmethod def from_json(cls, json_path): """load from a json file""" # TODO copy and paste from TrainingParam if not os.path.exists(json_path): raise FileNotFoundError("No path are located at \"{}\"".format(json_path)) with open(json_path, "r") as f: dict_ = json.load(f) return cls.from_dict(dict_)
[docs] def save_as_json(self, path, name=None): """save as a json file""" # TODO copy and paste from TrainingParam res = self.to_dict() if name is None: name = "neural_net_parameters.json" if not os.path.exists(path): raise RuntimeError("Directory \"{}\" not found to save the NN parameters".format(path)) if not os.path.isdir(path): raise NotADirectoryError("\"{}\" should be a directory".format(path)) path_out = os.path.join(path, name) with open(path_out, "w", encoding="utf-8") as f: json.dump(res, fp=f, indent=4, sort_keys=True)
[docs] def center_reduce(self, env): """currently not implemented for this class, "coming soon" as we might say""" # TODO see TestLeapNet for this feature self._center_reduce_vect(env.get_obs(), "x")
def _get_adds_mults_from_name(self, obs, attr_nm): if attr_nm in ["prod_p"]: add_tmp = np.array([-0.5 * (pmax + pmin) for pmin, pmax in zip(obs.gen_pmin, obs.gen_pmax)]) mult_tmp = np.array([1. / max((pmax - pmin), 0.) for pmin, pmax in zip(obs.gen_pmin, obs.gen_pmax)]) elif attr_nm in ["prod_q"]: add_tmp = 0. mult_tmp = np.array([1. / max(abs(val), 1.0) for val in obs.prod_q]) elif attr_nm in ["load_p", "load_q"]: add_tmp = np.array([-val for val in getattr(obs, attr_nm)]) mult_tmp = 0.5 elif attr_nm in ["load_v", "prod_v", "v_or", "v_ex"]: add_tmp = 0. mult_tmp = np.array([1. / val for val in getattr(obs, attr_nm)]) elif attr_nm == "hour_of_day": add_tmp = -12. mult_tmp = 1.0 / 12 elif attr_nm == "minute_of_hour": add_tmp = -30. mult_tmp = 1.0 / 30 elif attr_nm == "day_of_week": add_tmp = -4. mult_tmp = 1.0 / 4 elif attr_nm == "day": add_tmp = -15. mult_tmp = 1.0 / 15. elif attr_nm in ["target_dispatch", "actual_dispatch"]: add_tmp = 0. mult_tmp = np.array([1. / (pmax - pmin) for pmin, pmax in zip(obs.gen_pmin, obs.gen_pmax)]) elif attr_nm in ["a_or", "a_ex", "p_or", "p_ex", "q_or", "q_ex"]: add_tmp = 0. mult_tmp = np.array([1.0 / max(val, 1.0) for val in getattr(obs, attr_nm)]) else: add_tmp = 0. mult_tmp = 1.0 return add_tmp, mult_tmp def _center_reduce_vect(self, obs, nn_part): """ compute the xxxx_adds and xxxx_mults for one part of the neural network called nn_part, depending on what attribute of the observation is extracted """ if not isinstance(obs, grid2op.Observation.BaseObservation): # in multi processing i receive a set of observation there so i might need # to extract only the first one obs = obs[0] li_attr_obs = getattr(self, "list_attr_obs_{}".format(nn_part)) adds = [] mults = [] for attr_nm in li_attr_obs: add_tmp, mult_tmp = self._get_adds_mults_from_name(obs, attr_nm) mults.append(mult_tmp) adds.append(add_tmp) setattr(self, "{}_adds".format(nn_part), adds) setattr(self, "{}_mults".format(nn_part), mults)