Source code for l2rpn_baselines.utils.baseDeepQ

# Copyright (c) 2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of L2RPN Baselines, L2RPN Baselines a repository to host baselines for l2rpn competitions.

import os
from abc import ABC, abstractmethod
import numpy as np
import warnings

try:
    import tensorflow as tf
    import tensorflow.keras.optimizers as tfko
    _CAN_USE_TENSORFLOW = True
except ImportError:
    _CAN_USE_TENSORFLOW = False
    


from l2rpn_baselines.utils.trainingParam import TrainingParam


# refactorization of the code in a base class to avoid copy paste.
[docs]class BaseDeepQ(ABC): """ This class aims at representing the Q value (or more in case of SAC) parametrization by a neural network. .. warning:: This baseline recodes entire the RL training procedure. You can use it if you want to have a deeper look at Deep Q Learning algorithm and a possible (non optimized, slow, etc. implementation ). For a much better implementation, you can reuse the code of "PPO_RLLIB" or the "PPO_SB3" baseline. Prefer to use the :class:`GymAgent` class and the :class:`GymEnvWithHeuristics` classes to train agent interacting with grid2op and fully compatible with gym framework. It is composed of 2 different networks: - model: which is the main model - target_model: which has the same architecture and same initial weights as "model" but is updated less frequently to stabilize training It has basic methods to make predictions, to train the model, and train the target model. This class is abstraction and need to be overide in order to create object from this class. The only pure virtual function is :func:`BaseDeepQ.construct_q_network` that creates the neural network from the nn_params (:class:`NNParam`) provided as input Attributes ---------- _action_size: ``int`` Total number of actions _observation_size: ``int`` Size of the observation space considered _nn_archi: :class:`NNParam` The parameters of the neural networks that will be created _training_param: :class:`TrainingParam` The meta parameters for the training scheme (used especially for learning rate or gradient clipping for example) _lr: ``float`` The initial learning rate _lr_decay_steps: ``float`` The decay step of the learning rate _lr_decay_rate: ``float`` The rate at which the learning rate will decay _model: Main neural network model, here a keras Model object. _target_model: a copy of the main neural network that will be updated less frequently (also known as "target model" in RL community) """ def __init__(self, nn_params, training_param=None, verbose=False): if not _CAN_USE_TENSORFLOW: raise RuntimeError("Cannot import tensorflow, this function cannot be used.") self._action_size = nn_params.action_size self._observation_size = nn_params.observation_size self._nn_archi = nn_params self.verbose = verbose if training_param is None: self._training_param = TrainingParam() else: self._training_param = training_param self._lr = training_param.lr self._lr_decay_steps = training_param.lr_decay_steps self._lr_decay_rate = training_param.lr_decay_rate self._model = None self._target_model = None self._schedule_model = None self._optimizer_model = None self._custom_objects = None # to be able to load other keras layers type
[docs] def make_optimiser(self): """ helper function to create the proper optimizer (Adam) with the learning rates and its decay parameters. """ schedule = tfko.schedules.InverseTimeDecay(self._lr, self._lr_decay_steps, self._lr_decay_rate) return schedule, tfko.Adam(learning_rate=schedule)
[docs] @abstractmethod def construct_q_network(self): """ Abstract method that need to be overide. It should create :attr:`BaseDeepQ._model` and :attr:`BaseDeepQ._target_model` """ raise NotImplementedError("Not implemented")
[docs] def predict_movement(self, data, epsilon, batch_size=None, training=False): """ Predict movement of game controler where is epsilon probability randomly move. """ if batch_size is None: batch_size = data.shape[0] # q_actions = self._model.predict(data, batch_size=batch_size) # q value of each action q_actions = self._model(data, training=training).numpy() opt_policy = np.argmax(q_actions, axis=-1) if epsilon > 0.: rand_val = np.random.random(batch_size) opt_policy[rand_val < epsilon] = np.random.randint(0, self._action_size, size=(np.sum(rand_val < epsilon))) return opt_policy, q_actions[np.arange(batch_size), opt_policy], q_actions
[docs] def train(self, s_batch, a_batch, r_batch, d_batch, s2_batch, tf_writer=None, batch_size=None): """ Trains network to fit given parameters: .. seealso:: https://towardsdatascience.com/dueling-double-deep-q-learning-using-tensorflow-2-x-7bbbcec06a2a for the update rules Parameters ---------- s_batch: the state vector (before the action is taken) a_batch: the action taken s2_batch: the state vector (after the action is taken) d_batch: says whether or not the episode was over r_batch: the reward obtained this step """ if batch_size is None: batch_size = s_batch.shape[0] # Save the graph just the first time if tf_writer is not None: tf.summary.trace_on() target = self._model(s_batch, training=True).numpy() fut_action = self._model(s2_batch, training=True).numpy() if tf_writer is not None: with tf_writer.as_default(): tf.summary.trace_export("model-graph", 0) tf.summary.trace_off() target_next = self._target_model(s2_batch, training=True).numpy() idx = np.arange(batch_size) target[idx, a_batch] = r_batch # update the value for not done episode nd_batch = ~d_batch # update with this rule only batch that did not game over next_a = np.argmax(fut_action, axis=-1) # compute the future action i will take in the next state fut_Q = target_next[idx, next_a] # get its Q value target[nd_batch, a_batch[nd_batch]] += self._training_param.discount_factor * fut_Q[nd_batch] loss = self.train_on_batch(self._model, self._optimizer_model, s_batch, target) return loss
[docs] def train_on_batch(self, model, optimizer_model, x, y_true): """train the model on a batch of example. This can be overide""" loss = model.train_on_batch(x, y_true) return loss
[docs] @staticmethod def get_path_model(path, name=None): """ Get the location at which the neural networks will be saved. Returns ------- path_model: ``str`` The path at which the model will be saved (path include both path and name, it is the full path at which the neural networks are saved) path_target_model: ``str`` The path at which the target model will be saved """ if name is None: path_model = path else: path_model = os.path.join(path, name) path_target_model = "{}_target".format(path_model) return path_model, path_target_model
[docs] def save_network(self, path, name=None, ext="h5"): """ save the neural networks. Parameters ---------- path: ``str`` The path at which the models need to be saved name: ``str`` The name given to this model ext: ``str`` The file extension (by default h5) """ # Saves model at specified path as h5 file # nothing has changed path_model, path_target_model = self.get_path_model(path, name) self._model.save('{}.{}'.format(path_model, ext)) self._target_model.save('{}.{}'.format(path_target_model, ext))
[docs] def load_network(self, path, name=None, ext="h5"): """ Load the neural networks. Parameters ---------- path: ``str`` The path at which the models need to be saved name: ``str`` The name given to this model ext: ``str`` The file extension (by default h5) """ path_model, path_target_model = self.get_path_model(path, name) # fix for issue https://github.com/keras-team/keras/issues/7440 self.construct_q_network() self._model.load_weights('{}.{}'.format(path_model, ext)) with warnings.catch_warnings(): warnings.filterwarnings("ignore") self._target_model.load_weights('{}.{}'.format(path_target_model, ext)) if self.verbose: print("Succesfully loaded network.")
[docs] def target_train(self, tau=None): """ update the target model with the parameters given in the :attr:`BaseDeepQ._training_param`. """ if tau is None: tau = self._training_param.tau tau_inv = 1.0 - tau target_params = self._target_model.trainable_variables source_params = self._model.trainable_variables for src, dest in zip(source_params, target_params): # Polyak averaging var_update = src.value() * tau var_persist = dest.value() * tau_inv dest.assign(var_update + var_persist)
[docs] def save_tensorboard(self, current_step): """function used to save other information to tensorboard""" pass