# Copyright (c) 2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of L2RPN Baselines, L2RPN Baselines a repository to host baselines for l2rpn competitions.
import numpy as np
# tf2.0 friendly
import warnings
try:
import tensorflow as tf
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Input, Lambda, subtract, add
import tensorflow.keras.backend as K
# TODO implement that in the leap net package too
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import add as tfk_add
from tensorflow.keras.layers import multiply as tfk_multiply
_CAN_USE_TENSORFLOW = True
except ImportError:
_CAN_USE_TENSORFLOW = False
class Layer(object):
"""Empty class to be used in the documentation. This should
be `from tensorflow.keras.layers import Layer`
"""
pass
from l2rpn_baselines.utils import BaseDeepQ, TrainingParam
class LtauBis(Layer):
"""
This layer implements the Ltau layer.
This kind of leap net layer computes, from their input `x`: `d.(e.x * tau)` where `.` denotes the
matrix multiplication and `*` the elementwise multiplication.
"""
def __init__(self, initializer='glorot_uniform', use_bias=True, trainable=True, name=None, **kwargs):
if not _CAN_USE_TENSORFLOW:
raise RuntimeError("Cannot import tensorflow, this function cannot be used.")
super(LtauBis, self).__init__(trainable=trainable, name=name, **kwargs)
self.initializer = initializer
self.use_bias = use_bias
self.e = None
self.d = None
def build(self, input_shape):
is_x, is_tau = input_shape
nm_e = None
nm_d = None
if self.name is not None:
nm_e = '{}_e'.format(self.name)
nm_d = '{}_d'.format(self.name)
self.e = Dense(is_tau[-1],
kernel_initializer=self.initializer,
use_bias=self.use_bias,
trainable=self.trainable,
name=nm_e)
self.d = Dense(is_x[-1],
kernel_initializer=self.initializer,
use_bias=False,
trainable=self.trainable,
name=nm_d)
def get_config(self):
config = super().get_config().copy()
config.update({
'initializer': self.initializer,
'use_bias': self.use_bias
})
return config
def call(self, inputs, **kwargs):
x, tau = inputs
tmp = self.e(x)
tmp = tfk_multiply([tau, tmp]) # element wise multiplication
res = self.d(tmp) # no addition of x
# res = tfk_add([x, tmp])
return res
[docs]class DuelQLeapNet_NN(BaseDeepQ):
"""
Constructs the desired duelling deep q learning network with a leap neural network as a modeling
of the q function
.. warning::
This baseline recodes entire the RL training procedure. You can use it if you
want to have a deeper look at Deep Q Learning algorithm and a possible (non
optimized, slow, etc. implementation ).
For a much better implementation, you can reuse the code of "PPO_RLLIB"
or the "PPO_SB3" baseline.
"""
def __init__(self,
nn_params,
training_param=None):
if not _CAN_USE_TENSORFLOW:
raise RuntimeError("Cannot import tensorflow, this function cannot be used.")
if training_param is None:
training_param = TrainingParam()
BaseDeepQ.__init__(self,
nn_params,
training_param)
self._custom_objects = {"LtauBis": LtauBis}
self.construct_q_network()
self._max_global_norm_grad = training_param.max_global_norm_grad
self._max_value_grad = training_param.max_value_grad
self._max_loss = training_param.max_loss
[docs] def construct_q_network(self):
"""
Build the Q network appropriatly.
It first build a standard Q network with regular inputs x.
Then encodes the tau
Then data are split and used in the "value" and the "advantage" networks as
done usually in D3QN.
"""
# Uses the network architecture found in DeepMind paper
# The inputs and outputs size have changed, as well as replacing the convolution by dense layers.
self._model = Sequential()
input_x = Input(shape=(self._nn_archi.x_dim,),
name="x")
inputs_tau = [Input(shape=(el,), name="tau_{}".format(nm_)) for el, nm_ in
zip(self._nn_archi.tau_dims, self._nn_archi.list_attr_obs_tau)]
lay = input_x
for (size, act) in zip(self._nn_archi.sizes, self._nn_archi.activs):
lay = Dense(size)(lay) # put at self.action_size
lay = Activation(act)(lay)
# TODO multiple taus
l_tau = lay
for el, nm_ in zip(inputs_tau, self._nn_archi.list_attr_obs_tau):
l_tau = l_tau + LtauBis(name="leap_{}".format(nm_))([lay, el])
advantage = Dense(self._action_size)(l_tau)
value = Dense(1, name="value")(l_tau)
meaner = Lambda(lambda x: K.mean(x, axis=1))
mn_ = meaner(advantage)
tmp = subtract([advantage, mn_])
policy = add([tmp, value], name="policy")
self._model = Model(inputs=[input_x, *inputs_tau], outputs=[policy])
self._schedule_model, self._optimizer_model = self.make_optimiser()
self._model.compile(loss='mse', optimizer=self._optimizer_model)
self._target_model = Model(inputs=[input_x, *inputs_tau], outputs=[policy])
def _make_x_tau(self, data):
data_x = data[:, :self._nn_archi.x_dim]
# for the taus
data_tau = []
prev = self._nn_archi.x_dim
for sz, add_, mul_ in zip(self._nn_archi.tau_dims, self._nn_archi.tau_adds, self._nn_archi.tau_mults):
data_tau.append((data[:, prev:prev+sz] + add_) * mul_)
prev += sz
res = [data_x, *data_tau]
return res
[docs] def predict_movement(self, data, epsilon, batch_size=None, training=False):
"""Predict movement of game controler where is epsilon
probability randomly move."""
if batch_size is None:
batch_size = data.shape[0]
data_split = self._make_x_tau(data)
res = super().predict_movement(data_split, epsilon=epsilon, batch_size=batch_size, training=training)
return res
[docs] def train(self, s_batch, a_batch, r_batch, d_batch, s2_batch, tf_writer=None, batch_size=None):
if batch_size is None:
batch_size = s_batch.shape[0]
s_batch_split = self._make_x_tau(s_batch)
s2_batch_split = self._make_x_tau(s2_batch)
res = super().train(s_batch_split,
a_batch,
r_batch,
d_batch,
s2_batch_split,
tf_writer=tf_writer,
batch_size=batch_size)
return res
[docs] def train_on_batch(self, model, optimizer_model, x, y_true):
"""
clip the loss
"""
with tf.GradientTape() as tape:
# Get y_pred for batch
y_pred = model(x)
# Compute loss for each sample in the batch
# and then clip it
batch_loss = self._clipped_batch_loss(y_true, y_pred)
# Compute mean scalar loss
loss = tf.math.reduce_mean(batch_loss)
loss_npy = loss.numpy()
# Compute gradients
grads = tape.gradient(loss, model.trainable_variables)
# clip gradients
if self._max_global_norm_grad is not None:
grads, _ = tf.clip_by_global_norm(grads, self._max_global_norm_grad)
if self._max_value_grad is not None:
grads = [tf.clip_by_value(grad, -self._max_value_grad, self._max_value_grad) for grad in grads]
# Apply gradients
optimizer_model.apply_gradients(zip(grads, model.trainable_variables))
# Store LR
if hasattr(optimizer_model, "_decayed_lr"):
self.train_lr = optimizer_model._decayed_lr('float32').numpy()
else:
self.train_lr = optimizer_model.learning_rate.numpy()
# Return loss scalar
return loss_npy
def _clipped_batch_loss(self, y_true, y_pred):
sq_error = tf.math.square(y_true - y_pred, name="sq_error")
batch_sq_error = tf.math.reduce_sum(sq_error, axis=1, name="batch_sq_error")
if self._max_loss is not None:
res = tf.clip_by_value(batch_sq_error, 0.0, self._max_loss, name="batch_sq_error_clip")
else:
res = batch_sq_error
return res