DQN(Deep Q-Learning)可谓是深度强化学习(Deep Reinforcement Learning,DRL)的开山之作,是将深度学习与强化学习结合起来从而实现从感知(Perception)到动作( Action )的端对端(End-to-end)学习的一种全新的算法 ,该如何实现深度强化学习(DQN)呢?
找了很多DQN的例子,有原版的实现Atari的,也有Flappy Bird的,但是最简单的还是莫烦大神的Demo,github地址是:https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow。
其中,红色的方块代表寻宝人,黑色的方块代表陷阱,黄色的方块代表宝藏,我们的目标就是让寻宝人找到最终的宝藏。
这里,我们的状态可以用横纵坐标表示,而动作有上下左右四个动作。使用tkinter来做这样一个动画效果。宝藏的奖励是1,陷阱的奖励是-1,而其他时候的奖励都为0。
接下来,我们重点看一下我们DQN相关的代码。
定义相关输入
这了,我们用s代表当前状态,用a代表当前状态下采取的动作,r代表获得的奖励,s_代表转移后的状态。
self.s = tf.placeholder(tf.float32,[None,self.n_features],name='s')
self.s_ = tf.placeholder(tf.float32,[None,self.n_features],name='s_')
self.r = tf.placeholder(tf.float32,[None,],name='r')
self.a = tf.placeholder(tf.int32,[None,],name='a')
经验池
def store_transition(self,s,a,r,s_):
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
# hstack:Stack arrays in sequence horizontally
transition = np.hstack((s,[a,r],s_))
index = self.memory_counter % self.memory_size
self.memory[index,:] = transition
self.memory_counter += 1
双网络结构
target_net和eval_net的网络结构必须保持一致,这里我们使用的是两层全链接的神经网络,值得注意的一点是对于eval_net来说,网络的输入是当前的状态s,而对target_net网络来说,网络的输入是下一个状态s_,因为target_net的输出要根据贝尔曼公式计算q-target值,即
代码如下:
w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
# ------------------ build evaluate_net ------------------
with tf.variable_scope('eval_net'):
e1 = tf.layers.dense(self.s,20,tf.nn.relu,kernel_initializer=w_initializer,
bias_initializer=b_initializer,name='e1'
)
self.q_eval = tf.layers.dense(e1,self.n_actions,kernel_initializer=w_initializer,
bias_initializer=b_initializer,name='q')
# ------------------ build target_net ------------------
with tf.variable_scope('target_net'):
t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='t1')
self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='t2')
每隔一定的步数,我们就要将target_net中的参数复制到eval_net中:
t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target_net')
e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='eval_net')
with tf.variable_scope('soft_replacement'):
self.target_replace_op = [tf.assign(t,e) for t,e in zip(t_params,e_params)]
计算损失并优化
首先,对于eval_net来说,我们只要得到当前的网络输出即可,但是我们定义的网络输出是四个动作对应的q-eval值,我们要根据实际的a来选择对应的q-eval值,这一部分的代码如下:
with tf.variable_scope('q_eval'):
# tf.stack
#a = tf.constant([1,2,3])
# b = tf.constant([4,5,6])
# c = tf.stack([a,b],axis=1)
# [[1 4]
# [2 5]
# [3 6]]
a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
# 用indices从张量params得到新张量
# indices = [[0, 0], [1, 1]]
# params = [['a', 'b'], ['c', 'd']]
# output = ['a', 'd']
# 这里self.q_eval是batch * action_number,a_indices是batch * 1,也就是说选择当前估计每个动作的Q值
self.q_eval_wrt_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
中间有几个函数不太了解的,上面都有详细的注释,如果还不是很理解的话,大家可以百度或者阅读相应函数的源码。
对于target_net网络来说,我们要根据下面的式子来计算q-target值:
第一部分的R我们是已经得到了的,剩下的就是根据贪心策略选择四个输出中最大的一个即可:
with tf.variable_scope('q_target'):
q_target = self.r + self.gamma * tf.reduce_max(self.q_next,axis=1,name='Qmax_s_')
# 一个节点被 stop之后,这个节点上的梯度,就无法再向前BP了
self.q_target = tf.stop_gradient(q_target)
接下来,我们就可以定义我们的损失函数并选择优化器进行优化:
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target,self.q_eval_wrt_a,name='TD_error'))
with tf.variable_scope('train'):
self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
网络的训练
每隔一定的步数,我们就要将eval_net中的参数复制到target_net中,同时我们要从经验池中选择batch大小的数据输入到网络中进行训练。
def learn(self):
if self.learn_step_counter % self.replace_target_iter == 0:
self.sess.run(self.target_replace_op)
print('\ntarget_params_replaced\n')
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size,size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter,size = self.batch_size)
batch_memory = self.memory[sample_index,:]
_,cost = self.sess.run(
[self._train_op,self.loss],
feed_dict={
self.s:batch_memory[:,:self.n_features],
self.a:batch_memory[:,self.n_features],
self.r:batch_memory[:,self.n_features+1],
self.s_:batch_memory[:,-self.n_features:]
}
)
定义超参数 我们首先定义网络中的超参数,比如经验池的大小,两个网络的学习率等等:
MAX_EPISODES = 200
MAX_EP_STEPS = 200
LR_A = 0.001 # learning rate for actor
LR_C = 0.002 # learning rate for critic
GAMMA = 0.9 # reward discount
TAU = 0.01 # soft replacement
MEMORY_CAPACITY = 10000
BATCH_SIZE = 32
RENDER = False
ENV_NAME = 'Pendulum-v0'
定义网络输入 我们需要定义的placeholder包括当前的状态S,下一时刻的状态S',以及对应的奖励R,而动作A由Actor得到,因此不需要再定义:
self.S = tf.placeholder(tf.float32, [None, s_dim], 's')
self.S_ = tf.placeholder(tf.float32, [None, s_dim], 's_')
self.R = tf.placeholder(tf.float32, [None, 1], 'r')
构建两个网络 两个网络都是两层全链接的神经网络,Actor输出一个具体的动作,而Critic网络输出一个具体的Q值
def _build_a(self, s, scope, trainable):
with tf.variable_scope(scope):
net = tf.layers.dense(s, 30, activation=tf.nn.relu, name='l1', trainable=trainable)
a = tf.layers.dense(net, self.a_dim, activation=tf.nn.tanh, name='a', trainable=trainable)
return tf.multiply(a, self.a_bound, name='scaled_a')
def _build_c(self, s, a, scope, trainable):
with tf.variable_scope(scope):
n_l1 = 30
w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], trainable=trainable)
w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], trainable=trainable)
b1 = tf.get_variable('b1', [1, n_l1], trainable=trainable)
net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1)
return tf.layers.dense(net, 1, trainable=trainable) # Q(s,a)
soft模式参数更新 可以看到,我们这里进行的是soft模式的参数更新,每次在原来target-net参数的基础上,改变一丢丢,增加一点点eval-net的参数信息。
# networks parameters
self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/eval')
self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/target')
self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/eval')
self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/target')
# target net replacement
self.soft_replace = [[tf.assign(ta, (1 - TAU) * ta + TAU * ea), tf.assign(tc, (1 - TAU) * tc + TAU * ec)]
for ta, ea, tc, ec in zip(self.at_params, self.ae_params, self.ct_params, self.ce_params)]
定义两个网络的损失 关于两个网络的损失,我们之前已经详细介绍过了,这里只是对刚才思路的一个代码实现。
q_target = self.R + GAMMA * q_
# in the feed_dic for the td_error, the self.a should change to actions in memory
td_error = tf.losses.mean_squared_error(labels=q_target, predictions=q)
self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(td_error, var_list=self.ce_params)
a_loss = - tf.reduce_mean(q) # maximize the q
self.atrain = tf.train.AdamOptimizer(LR_A).minimize(a_loss, var_list=self.ae_params)
学习 我们首先要从经验池中取出一个batch的数据,然后训练我们的Actor和Critic
def learn(self):
# soft target replacement
self.sess.run(self.soft_replace)
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
bt = self.memory[indices, :]
bs = bt[:, :self.s_dim]
ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
br = bt[:, -self.s_dim - 1: -self.s_dim]
bs_ = bt[:, -self.s_dim:]
self.sess.run(self.atrain, {self.S: bs})
self.sess.run(self.ctrain, {self.S: bs, self.a: ba, self.R: br, self.S_: bs_})
存储经验
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, a, [r], s_))
index = self.pointer % MEMORY_CAPACITY # replace the old memory with new memory
self.memory[index, :] = transition
self.pointer += 1
深度学习的开源类库比较多,比较著名的有tensorlow、caffe等。此处我们使用Tensorflow来训练游戏“接砖块”。
游戏截图如下:
通过点击鼠标左键、右键控制滑块的左右移动来接住小球,如果球碰到底面,则游戏结束
主要python代码如下(游戏本身的代码省略,此处主要关注算法代码):
#Game的定义类,此处Game是什么不重要,只要提供执行Action的方法,获取当前游戏区域像素的方法即可
class Game(object):
def __init__(self): #Game初始化
# action是MOVE_STAY、MOVE_LEFT、MOVE_RIGHT
# ai控制棒子左右移动;返回游戏界面像素数和对应的奖励。(像素->奖励->强化棒子往奖励高的方向移动)
def step(self, action):
# learning_rate
LEARNING_RATE = 0.99
# 跟新梯度
INITIAL_EPSILON = 1.0
FINAL_EPSILON = 0.05
# 测试观测次数
EXPLORE = 500000
OBSERVE = 500
# 记忆经验大小
REPLAY_MEMORY = 500000
# 每次训练取出的记录数
BATCH = 100
# 输出层神经元数。代表3种操作-MOVE_STAY:[1, 0, 0] MOVE_LEFT:[0, 1, 0] MOVE_RIGHT:[0, 0, 1]
output = 3 # MOVE_STAY:[1, 0, 0] MOVE_LEFT:[0, 1, 0] MOVE_RIGHT:[0, 0, 1]
input_image = tf.placeholder("float", [None, 80, 100, 4]) # 游戏像素
action = tf.placeholder("float", [None, output]) # 操作
#定义CNN-卷积神经网络
def convolutional_neural_network(input_image):
weights = {'w_conv1':tf.Variable(tf.zeros([8, 8, 4, 32])),
'w_conv2':tf.Variable(tf.zeros([4, 4, 32, 64])),
'w_conv3':tf.Variable(tf.zeros([3, 3, 64, 64])),
'w_fc4':tf.Variable(tf.zeros([3456, 784])),
'w_out':tf.Variable(tf.zeros([784, output]))}
biases = {'b_conv1':tf.Variable(tf.zeros([32])),
'b_conv2':tf.Variable(tf.zeros([64])),
'b_conv3':tf.Variable(tf.zeros([64])),
'b_fc4':tf.Variable(tf.zeros([784])),
'b_out':tf.Variable(tf.zeros([output]))}
conv1 = tf.nn.relu(tf.nn.conv2d(input_image, weights['w_conv1'], strides = [1, 4, 4, 1], padding = "VALID") + biases['b_conv1'])
conv2 = tf.nn.relu(tf.nn.conv2d(conv1, weights['w_conv2'], strides = [1, 2, 2, 1], padding = "VALID") + biases['b_conv2'])
conv3 = tf.nn.relu(tf.nn.conv2d(conv2, weights['w_conv3'], strides = [1, 1, 1, 1], padding = "VALID") + biases['b_conv3'])
conv3_flat = tf.reshape(conv3, [-1, 3456])
fc4 = tf.nn.relu(tf.matmul(conv3_flat, weights['w_fc4']) + biases['b_fc4'])
output_layer = tf.matmul(fc4, weights['w_out']) + biases['b_out']
return output_layer
#训练神经网络
def train_neural_network(input_image):
predict_action = convolutional_neural_network(input_image)
argmax = tf.placeholder("float", [None, output])
gt = tf.placeholder("float", [None])
action = tf.reduce_sum(tf.mul(predict_action, argmax), reduction_indices = 1)
cost = tf.reduce_mean(tf.square(action - gt))
optimizer = tf.train.AdamOptimizer(1e-6).minimize(cost)
game = Game()
D = deque()
_, image = game.step(MOVE_STAY)
image = cv2.cvtColor(cv2.resize(image, (100, 80)), cv2.COLOR_BGR2GRAY)
ret, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
input_image_data = np.stack((image, image, image, image), axis = 2)
#print ("IMG2:%s" %input_image_data)
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
saver = tf.train.Saver()
n = 0
epsilon = INITIAL_EPSILON
while True:
#print("InputImageData:", input_image_data)
action_t = predict_action.eval(feed_dict = {input_image : [input_image_data]})[0]
argmax_t = np.zeros([output], dtype=np.int)
if(random.random() <= INITIAL_EPSILON):
maxIndex = random.randrange(output)
else:
maxIndex = np.argmax(action_t)
argmax_t[maxIndex] = 1
if epsilon > FINAL_EPSILON:
epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE
reward, image = game.step(list(argmax_t))
image = cv2.cvtColor(cv2.resize(image, (100, 80)), cv2.COLOR_BGR2GRAY)
ret, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
image = np.reshape(image, (80, 100, 1))
input_image_data1 = np.append(image, input_image_data[:, :, 0:3], axis = 2)
D.append((input_image_data, argmax_t, reward, input_image_data1))
if len(D) > REPLAY_MEMORY:
D.popleft()
if n > OBSERVE:
minibatch = random.sample(D, BATCH)
input_image_data_batch = [d[0] for d in minibatch]
argmax_batch = [d[1] for d in minibatch]
reward_batch = [d[2] for d in minibatch]
input_image_data1_batch = [d[3] for d in minibatch]
gt_batch = []
out_batch = predict_action.eval(feed_dict = {input_image : input_image_data1_batch})
for i in range(0, len(minibatch)):
gt_batch.append(reward_batch[i] + LEARNING_RATE * np.max(out_batch[i]))
print("gt_batch:", gt_batch, "argmax:", argmax_batch)
optimizer.run(feed_dict = {gt : gt_batch, argmax : argmax_batch, input_image : input_image_data_batch})
input_image_data = input_image_data1
n = n+1
print(n, "epsilon:", epsilon, " " ,"action:", maxIndex, " " ,"reward:", reward)
train_neural_network(input_image)