虽然强化学习不需要有监督学习中的标签数据,但它十分依赖奖励函数的设置。有时在奖励函数上做一些微小的改动,训练出来的策略就会有天差地别。在很多现实场景中,奖励函数并未给定,或者奖励信号极其稀疏,此时随机设计奖励函数将无法保证强化学习训练出来的策略满足实际需要。例如,对于无人驾驶车辆智能体的规控,其观测是当前的环境感知恢复的 3D 局部环境,动作是车辆接下来数秒的具体路径规划,那么奖励是什么?如果只是规定正常行驶而不发生碰撞的奖励为+1,发生碰撞为-100,那么智能体学习的结果则很可能是找个地方停滞不前。具体能帮助无人驾驶小车规控的奖励函数往往需要专家的精心设计和调试。
假设存在一个专家智能体,其策略可以看成最优策略,我们就可以直接模仿这个专家在环境中交互的状态动作数据来训练一个策略,并且不需要用到环境提供的奖励信号。模仿学习(imitation learning)研究的便是这一类问题,在模仿学习的框架下,专家能够提供一系列状态动作对
,表示专家在环境
下做出了的动作
,而模仿者的任务则是利用这些专家数据进行训练,无须奖励信号就可以达到一个接近专家的策略。目前学术界模仿学习的方法基本上可以分为 3 类:
在本章将主要介绍行为克隆方法和生成式对抗模仿学习方法。尽管逆强化学习有良好的学术贡献,但由于其计算复杂度较高,实际应用的价值较小。
行为克隆(BC)就是直接使用监督学习方法,将专家数据中
的
看作样本输入,
视为标签,学习的目标为
其中,
是专家的数据集,
是对应监督学习框架下的损失函数。若动作是离散的,该损失函数可以是最大似然估计得到的。若动作是连续的,该损失函数可以是均方误差函数。
在训练数据量比较大的时候,BC 能够很快地学习到一个不错的策略。例如,围棋人工智能 AlphaGo 就是首先在 16 万盘棋局的 3000 万次落子数据中学习人类选手是如何下棋的,仅仅凭这个行为克隆方法,AlphaGo 的棋力就已经超过了很多业余围棋爱好者。由于 BC 的实现十分简单,因此在很多实际场景下它都可以作为策略预训练的方法。BC 能使得策略无须在较差时仍然低效地通过和环境交互来探索较好的动作,而是通过模仿专家智能体的行为数据来快速达到较高水平,为接下来的强化学习创造一个高起点。
BC 也存在很大的局限性,该局限在数据量比较小的时候犹为明显。具体来说,由于通过 BC 学习得到的策略只是拿小部分专家数据进行训练,因此 BC 只能在专家数据的状态分布下预测得比较准。然而,强化学习面对的是一个序贯决策问题,通过 BC 学习得到的策略在和环境交互过程中不可能完全学成最优,只要存在一点偏差,就有可能导致下一个遇到的状态是在专家数据中没有见过的。此时,由于没有在此状态(或者比较相近的状态)下训练过,策略可能就会随机选择一个动作,这会导致下一个状态进一步偏离专家策略遇到的的数据分布。最终,该策略在真实环境下不能得到比较好的效果,这被称为行为克隆的复合误差(compounding error)问题,如图 15-1 所示。
图15-1 行为克隆带来的复合误差问题
生成式对抗模仿学习(generative adversarial imitation learning,GAIL)是 2016 年由斯坦福大学研究团队提出的基于生成式对抗网络的模仿学习,它诠释了生成式对抗网络的本质其实就是模仿学习。GAIL 实质上是模仿了专家策略的占用度量
,即尽量使得策略在环境中的所有状态动作对
的占用度量
和专家策略的占用度量
一致。为了达成这个目标,策略需要和环境进行交互,收集下一个状态的信息并进一步做出动作。这一点和 BC 不同,BC 完全不需要和环境交互。GAIL 算法中有一个判别器和一个策略,策略
就相当于是生成式对抗网络中的生成器(generator),给定一个状态,策略会输出这个状态下应该采取的动作,而判别器(discriminator)
将状态动作对
作为输入,输出一个
到
之间的实数,表示判别器认为该状态动作对
是来自智能体策略而非专家的概率。判别器
的目标是尽量将专家数据的输出靠近
,将模仿者策略的输出靠近
,这样就可以将两组数据分辨开来。于是,判别器
的损失函数为
其中
是判别器
的参数。有了判别器
之后,模仿者策略的目标就是其交互产生的轨迹能被判别器误认为专家轨迹。于是,我们可以用判别器
的输出来作为奖励函数来训练模仿者策略。具体来说,若模仿者策略在环境中采样到状态
,并且采取动作
,此时该状态动作对
会输入到判别器
中,输出
的值,然后将奖励设置为
。于是,我们可以用任意强化学习算法,使用这些数据继续训练模仿者策略。最后,在对抗过程不断进行后,模仿者策略生成的数据分布将接近真实的专家数据分布,达到模仿学习的目标。GAIL 的优化目标如图 15-2 所示。
图15-2 GAIL 的优化目标
第 3 章介绍过一个策略和给定 MDP 交互的占用度量呈一一对应的关系。因此,模仿学习的本质就是通过更新策略使其占用度量尽量靠近专家的占用度量,而这正是 GAIL 的训练目标。由于一旦策略改变,其占用度量就会改变,因此为了训练好最新的判别器,策略需要不断和环境做交互,采样出最新的状态动作对样本。
首先,我们需要有一定量的专家数据,为此,预先通过 PPO 算法训练出一个表现良好的专家模型,再利用专家模型生成专家数据。本次代码实践的环境是 CartPole-v0,以下是 PPO 代码内容。
import gym
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import random
import rl_utils
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
return F.softmax(self.fc2(x), dim=1)
class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
return self.fc2(x)
class PPO:
''' PPO算法,采用截断方式 '''
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.gamma = gamma
self.lmbda = lmbda
self.epochs = epochs ## 一条序列的数据用于训练轮数
self.eps = eps ## PPO中截断范围的参数
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to( self.device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
td_delta = td_target - self.critic(states)
advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
for _ in range(self.epochs):
log_probs = torch.log(self.actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage ## 截断
actor_loss = torch.mean(-torch.min(surr1, surr2)) ## PPO损失函数
critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 250
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
ppo_agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, ppo_agent, num_episodes)
Iteration 0: 100%|██████████| 25/25 [00:00<00:00, 29.67it/s, episode=20, return=40.700]
Iteration 1: 100%|██████████| 25/25 [00:01<00:00, 15.19it/s, episode=45, return=182.800]
Iteration 2: 100%|██████████| 25/25 [00:01<00:00, 14.11it/s, episode=70, return=176.100]
Iteration 3: 100%|██████████| 25/25 [00:01<00:00, 14.44it/s, episode=95, return=191.500]
Iteration 4: 100%|██████████| 25/25 [00:01<00:00, 14.45it/s, episode=120, return=151.300]
Iteration 5: 100%|██████████| 25/25 [00:02<00:00, 12.15it/s, episode=145, return=200.000]
Iteration 6: 100%|██████████| 25/25 [00:01<00:00, 13.47it/s, episode=170, return=200.000]
Iteration 7: 100%|██████████| 25/25 [00:01<00:00, 13.20it/s, episode=195, return=200.000]
Iteration 8: 100%|██████████| 25/25 [00:01<00:00, 14.43it/s, episode=220, return=188.100]
Iteration 9: 100%|██████████| 25/25 [00:01<00:00, 13.13it/s, episode=245, return=200.000]
接下来开始生成专家数据。因为车杆环境比较简单,我们只生成一条轨迹,并且从中采样 30 个状态动作对(s,a)样本。我们只用这 30 个专家数据样本来训练模仿策略。
def sample_expert_data(n_episode):
states = []
actions = []
for episode in range(n_episode):
state = env.reset()
done = False
while not done:
action = ppo_agent.take_action(state)
states.append(state)
actions.append(action)
next_state, reward, done, _ = env.step(action)
state = next_state
return np.array(states), np.array(actions)
env.seed(0)
torch.manual_seed(0)
random.seed(0)
n_episode = 1
expert_s, expert_a = sample_expert_data(n_episode)
n_samples = 30 ## 采样30个数据
random_index = random.sample(range(expert_s.shape[0]), n_samples)
expert_s = expert_s[random_index]
expert_a = expert_a[random_index]
在 BC 中,我们将专家数据中的(st,at)中的ata_tat视为标签,BC 则转化成监督学习中经典的分类问题,采用最大似然估计的训练方法可得到分类结果。
class BehaviorClone:
def __init__(self, state_dim, hidden_dim, action_dim, lr):
self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
def learn(self, states, actions):
states = torch.tensor(states, dtype=torch.float).to(device)
actions = torch.tensor(actions).view(-1, 1).to(device)
log_probs = torch.log(self.policy(states).gather(1, actions))
bc_loss = torch.mean(-log_probs) ## 最大似然估计
self.optimizer.zero_grad()
bc_loss.backward()
self.optimizer.step()
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(device)
probs = self.policy(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()
def test_agent(agent, env, n_episode):
return_list = []
for episode in range(n_episode):
episode_return = 0
state = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
state = next_state
episode_return += reward
return_list.append(episode_return)
return np.mean(return_list)
env.seed(0)
torch.manual_seed(0)
np.random.seed(0)
lr = 1e-3
bc_agent = BehaviorClone(state_dim, hidden_dim, action_dim, lr)
n_iterations = 1000
batch_size = 64
test_returns = []
with tqdm(total=n_iterations, desc="进度条") as pbar:
for i in range(n_iterations):
sample_indices = np.random.randint(low = 0, high = expert_s.shape[0], size=batch_size)
bc_agent.learn(expert_s[sample_indices], expert_a[sample_indices])
current_return = test_agent(bc_agent, env, 5)
test_returns.append(current_return)
if (i + 1) % 10 == 0:
pbar.set_postfix({'return': '%.3f' % np.mean(test_returns[-10:])})
pbar.update(1)
进度条: 100%|██████████| 1000/1000 [03:05<00:00, 5.40it/s, return=199.320]
iteration_list = list(range(len(test_returns)))
plt.plot(iteration_list, test_returns)
plt.xlabel('Iterations')
plt.ylabel('Returns')
plt.title('BC on {}'.format(env_name))
plt.show()
我们发现 BC 无法学习到最优策略(不同设备运行结果可能会有不同),这主要是因为在数据量比较少的情况下,学习容易发生过拟合。
接下来我们实现 GAIL 的代码。
首先实现判别器模型,其模型架构为一个两层的全连接网络,模型输入为一个状态动作对,输出一个概率标量。
class Discriminator(nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(Discriminator, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x, a):
cat = torch.cat([x, a], dim=1)
x = F.relu(self.fc1(cat))
return torch.sigmoid(self.fc2(x))
接下来正式实现 GAIL 的代码。每一轮迭代中,GAIL 中的策略和环境交互,采样新的状态动作对。基于专家数据和策略新采样的数据,首先训练判别器,然后将判别器的输出转换为策略的奖励信号,指导策略用 PPO 算法做训练。
class GAIL:
def __init__(self, agent, state_dim, action_dim, hidden_dim, lr_d):
self.discriminator = Discriminator(state_dim, hidden_dim, action_dim).to(device)
self.discriminator_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr_d)
self.agent = agent
def learn(self, expert_s, expert_a, agent_s, agent_a, next_s, dones):
expert_states = torch.tensor(expert_s, dtype=torch.float).to(device)
expert_actions = torch.tensor(expert_a).to(device)
agent_states = torch.tensor(agent_s, dtype=torch.float).to(device)
agent_actions = torch.tensor(agent_a).to(device)
expert_actions = F.one_hot(expert_actions, num_classes=2).float()
agent_actions = F.one_hot(agent_actions, num_classes=2).float()
expert_prob = self.discriminator(expert_states, expert_actions)
agent_prob = self.discriminator(agent_states, agent_actions)
discriminator_loss = nn.BCELoss()(agent_prob, torch.ones_like(agent_prob))
+ nn.BCELoss()(expert_prob, torch.zeros_like(expert_prob))
self.discriminator_optimizer.zero_grad()
discriminator_loss.backward()
self.discriminator_optimizer.step()
rewards = -torch.log(agent_prob).detach().cpu().numpy()
transition_dict = {
'states': agent_s,
'actions': agent_a,
'rewards': rewards,
'next_states': next_s,
'dones': dones
}
self.agent.update(transition_dict)
env.seed(0)
torch.manual_seed(0)
lr_d = 1e-3
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
gail = GAIL(agent, state_dim, action_dim, hidden_dim, lr_d)
n_episode = 500
return_list = []
with tqdm(total=n_episode, desc="进度条") as pbar:
for i in range(n_episode):
episode_return = 0
state = env.reset()
done = False
state_list = []
action_list = []
next_state_list = []
done_list = []
while not done:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
state_list.append(state)
action_list.append(action)
next_state_list.append(next_state)
done_list.append(done)
state = next_state
episode_return += reward
return_list.append(episode_return)
gail.learn(expert_s, expert_a, state_list, action_list, next_state_list, done_list)
if (i + 1) % 10 == 0:
pbar.set_postfix({'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
进度条: 100%|██████████| 500/500 [00:35<00:00, 14.20it/s, return=200.000]
iteration_list = list(range(len(return_list)))
plt.plot(iteration_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('GAIL on {}'.format(env_name))
plt.show()
通过上面两个实验的对比我们可以直观地感受到,在数据样本有限的情况下,BC 不能学习到最优策略,但是 GAIL 在相同的专家数据下可以取得非常好的结果。这一方面归因于 GAIL 的训练目标(拉近策略和专家的占用度量)十分贴合模仿学习任务的目标,避免了 BC 中的复合误差问题;另一方面得益于 GAIL 训练中,策略可以和环境交互出更多的数据,以此训练判别器,进而生成对基于策略“量身定做”的指导奖励信号。
本章讲解了模仿学习的基础概念,即根据一些专家数据来学习一个策略,数据中不包含奖励,和环境交互也不能获得奖励。本章还介绍了模仿学习中的两类方法,分别是行为克隆(BC)和生成式对抗模仿学习(GAIL)。通过实验对比发现,在少量专家数据的情况下,GAIL 能获得更好的效果。
此外,逆向强化学习(IRL)也是模仿学习中的重要方法,它假设环境的奖励函数应该使得专家轨迹获得最高的奖励值,进而学习背后的奖励函数,最后基于该奖励函数做正向强化学习,从而得到模仿策略。感兴趣的读者可以查阅相关文献进行学习。
[1] SYED U, BOWLING M, SCHAPIRE R E. Apprenticeship learning using linear programming [C]// Proceedings of the 25th international conference on Machine learning, 2008: 1032-1039.
[2] HO J, ERMON S. Generative adversarial imitation learning [J]. Advances in neural information processing systems 2016, 29: 4565-4573.
[3] ABBEEL P, NG A Y. Apprenticeship learning via inverse reinforcement learning [C] // Proceedings of the twenty-first international conference on machine learning, 2004.
之前几章介绍了基于值函数的方法 DQN、基于策略的方法 REINFORCE 以及两者结合的方法 Actor-Critic。它们都是无模型(model-free)的方法,即没有建立一个环境模型来帮助智能体决策。而在深度强化学习领域下,基于模型(model-based)的方法通常用神经网络学习一个环境模型,然后利用该环境模型来帮助智能体训练和决策。利用环境模型帮助智能体训练和决策的方法有很多种,例如可以用与之前的 Dyna 类似的思想生成一些数据来加入策略训练中。本章要介绍的模型预测控制(model predictive control,MPC)算法并不构建一个显式的策略,只根据环境模型来选择当前步要采取的动作。
首先,让我们用一个形象的比喻来帮助理解模型预测控制方法。假设我们在下围棋,现在根据棋盘的布局,我们要选择现在落子的位置。一个优秀的棋手会根据目前局势来推演落子几步可能发生的局势,然后选择局势最好的一种情况来决定当前落子位置。
模型预测控制方法就是这样一种迭代的、基于模型的控制方法。值得注意的是,MPC 方法中不存在一个显式的策略。具体而言,MPC 方法在每次采取动作时,首先会生成一些候选动作序列,然后根据当前状态来确定每一条候选序列能得到多好的结果,最终选择结果最好的那条动作序列的第一个动作来执行。因此,在使用 MPC 方法时,主要在两个过程中迭代,一是根据历史数据学习环境模型P^(s,a),二是在和真实环境交互过程中用环境模型来选择动作。
首先,我们定义模型预测方法的目标。在第
步时,我们要想做的就是最大化智能体的累积奖励,具体来说就是:
其中
为推演的长度,
表示从所有动作序列中选取累积奖励最大的序列。我们每次取最优序列中的第一个动作
来与环境交互。MPC 方法中的一个关键是如何生成一些候选动作序列,候选动作生成的好坏将直接影响到 MPC 方法得到的动作。生成候选动作序列的过程我们称为打靶(shooting).
随机打靶法(random shooting method)的做法便是随机生成N条动作序列,即在生成每条动作序列的每一个动作时,都是从动作空间中随机采样一个动作,最终组合成NNN条长度为K的动作序列。
对于一些简单的环境,这个方法不但十分简单,而且效果还不错。那么,能不能在随机的基础上,根据已有的结果做得更好一些呢?接下来,我们来介绍另外一种打靶法:交叉熵方法。
交叉熵方法(cross entropy method,CEM)是一种进化策略方法,它的核心思想是维护一个带参数的分布,根据每次采样的结果来更新分布中的参数,使得分布中能获得较高累积奖励的动作序列的概率比较高。相比于随机打靶法,交叉熵方法能够利用之前采样到的比较好的结果,在一定程度上减少采样到一些较差动作的概率,从而使得算法更加高效。对于一个与连续动作交互的环境来说,每次交互时交叉熵方法的做法如下:
for 次数
do
中选取
条动作序列
,用环境模型评估累积奖励
条最优的动作序列
去更新分布
我们可以使用如下的代码来实现交叉熵方法,其中将采用截断正态分布。
import numpy as np
from scipy.stats import truncnorm
import gym
import itertools
import torch
import torch.nn as nn
import torch.nn.functional as F
import collections
import matplotlib.pyplot as plt
class CEM:
def __init__(self, n_sequence, elite_ratio, fake_env, upper_bound, lower_bound):
self.n_sequence = n_sequence
self.elite_ratio = elite_ratio
self.upper_bound = upper_bound
self.lower_bound = lower_bound
self.fake_env = fake_env
def optimize(self, state, init_mean, init_var):
mean, var = init_mean, init_var
X = truncnorm(-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))
state = np.tile(state, (self.n_sequence, 1))
for _ in range(5):
lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean
constrained_var = np.minimum(
np.minimum(
np.square(lb_dist / 2),
np.square(ub_dist / 2)
),
var
)
## 生成动作序列
action_sequences = [X.rvs() for _ in range(self.n_sequence)] * np.sqrt(constrained_var) + mean
## 计算每条动作序列的累积奖励
returns = self.fake_env.propagate(state, action_sequences)[:, 0]
## 选取累积奖励最高的若干条动作序列
elites = action_sequences[np.argsort(returns)][-int(self.elite_ratio * self.n_sequence):]
new_mean = np.mean(elites, axis=0)
new_var = np.var(elites, axis=0)
## 更新动作序列分布
mean = 0.1 * mean + 0.9 * new_mean
var = 0.1 * var + 0.9 * new_var
return mean
带有轨迹采样的概率集成(probabilistic ensembles with trajectory sampling,PETS)是一种使用 MPC 的基于模型的强化学习算法。在 PETS 中,环境模型采用了集成学习的方法,即会构建多个环境模型,然后用这多个环境模型来进行预测,最后使用 CEM 进行模型预测控制。接下来,我们来详细介绍模型构建与模型预测的方法。
在强化学习中,与智能体交互的环境是一个动态系统,所以拟合它的环境模型也通常是一个动态模型。我们通常认为一个系统中有两种不确定性,分别是偶然不确定性(aleatoric uncertainty)和认知不确定性(epistemic uncertainty)。偶然不确定性是由于系统中本身存在的随机性引起的,而认知不确定性是由“见”过的数据较少导致的自身认知的不足而引起的,如图 16-1 所示。
图16-1 偶然不确定性和认知不确定性
在 PET 算法中,环境模型的构建会同时考虑到这两种不确定性。首先,我们定义环境模型的输出为一个高斯分布,用来捕捉偶然不确定性。令环境模型为
,其参数为
,那么基于当前状态动作对
,下一个状态
的分布可以写为
这里我们可以采用神经网络来构建
和
。这样,神经网络的损失函数则为
这样我们就得到了一个由神经网络表示的环境模型。在此基础之上,我们选择用集成(ensemble)方法来捕捉认知不确定性。具体而言,我们构建BBB个网络框架一样的神经网络,它们的输入都是状态动作对,输出都是下一个状态的高斯分布的均值向量和协方差矩阵。但是它们的参数采用不同的随机初始化方式,并且当每次训练时,会从真实数据中随机采样不同的数据来训练。
有了环境模型的集成后,MPC 算法会用其来预测奖励和下一个状态。具体来说,每一次预测会从B个模型中挑选一个来进行预测,因此一条轨迹的采样会使用到多个环境模型,如图 16-2 所示。
图16-2 PETS 算法利用各个环境模型选取动作
首先,为了搭建这样一个较为复杂的模型,我们定义模型中每一层的构造。在定义时就必须考虑每一层都是一个集成。
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
class Swish(nn.Module):
''' Swish激活函数 '''
def __init__(self):
super(Swish, self).__init__()
def forward(self, x):
return x * torch.sigmoid(x)
def init_weights(m):
''' 初始化模型权重 '''
def truncated_normal_init(t, mean=0.0, std=0.01):
torch.nn.init.normal_(t, mean=mean, std=std)
while True:
cond = (t < mean - 2 * std) | (t > mean + 2 * std)
if not torch.sum(cond):
break
t = torch.where(
cond,
torch.nn.init.normal_(
torch.ones(t.shape, device=device),
mean=mean,
std=std
),
t
)
return t
if type(m) == nn.Linear or isinstance(m, FCLayer):
truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))
m.bias.data.fill_(0.0)
class FCLayer(nn.Module):
''' 集成之后的全连接层 '''
def __init__(self, input_dim, output_dim, ensemble_size, activation):
super(FCLayer, self).__init__()
self._input_dim, self._output_dim = input_dim, output_dim
self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
self._activation = activation
self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device))
def forward(self, x):
return self._activation(
torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))
接着,使用高斯分布的概率模型来定义一个集成模型。
class EnsembleModel(nn.Module):
''' 环境模型集成 '''
def __init__(
self,
state_dim,
action_dim,
ensemble_size=5,
learning_rate=1e-3
):
super(EnsembleModel, self).__init__()
## 输出包括均值和方差,因此是状态与奖励维度之和的两倍
self._output_dim = (state_dim + 1) * 2
self._max_logvar = nn.Parameter(
(torch.ones((1, self._output_dim // 2)).float() / 2).to(device),
requires_grad=False
)
self._min_logvar = nn.Parameter(
(-torch.ones((1, self._output_dim // 2)).float() * 10).to(device),
requires_grad=False
)
self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size, Swish())
self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
self.layer5 = FCLayer(200, self._output_dim, ensemble_size, nn.Identity())
self.apply(init_weights) ## 初始化环境模型中的参数
self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
def forward(self, x, return_log_var=False):
ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
mean = ret[:, :, :self._output_dim // 2]
## 在PETS算法中,将方差控制在最小值和最大值之间
logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:])
logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
return mean, logvar if return_log_var else torch.exp(logvar)
def loss(self, mean, logvar, labels, use_var_loss=True):
inverse_var = torch.exp(-logvar)
if use_var_loss:
mse_loss = torch.mean(
torch.mean(torch.pow(mean - labels, 2) * inverse_var, dim=-1),
dim=-1
)
var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
else:
mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
total_loss = torch.sum(mse_loss)
return total_loss, mse_loss
def train(self, loss):
self.optimizer.zero_grad()
loss += 0.01 * torch.sum(self._max_logvar) - 0.01 * torch.sum(self._min_logvar)
loss.backward()
self.optimizer.step()
接下来,我们定义一个EnsembleDynamicsModel
的类,把模型集成的训练设计得更加精细化。具体而言,我们并不会选择模型训练的轮数,而是在每次训练的时候将一部分数据单独取出来,用于验证模型的表现,在 5 次没有获得表现提升时就结束训练。
class EnsembleDynamicsModel:
''' 环境模型集成,加入精细化的训练 '''
def __init__(self, state_dim, action_dim, num_network=5):
self._num_network = num_network
self._state_dim, self._action_dim = state_dim, action_dim
self.model = EnsembleModel(
state_dim,
action_dim,
ensemble_size=num_network
)
self._epoch_since_last_update = 0
def train(
self,
inputs,
labels,
batch_size=64,
holdout_ratio=0.1,
max_iter=20
):
## 设置训练集与验证集
permutation = np.random.permutation(inputs.shape[0])
inputs, labels = inputs[permutation], labels[permutation]
num_holdout = int(inputs.shape[0] * holdout_ratio)
train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]
holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]
holdout_inputs = torch.from_numpy(holdout_inputs).float().to(device)
holdout_labels = torch.from_numpy(holdout_labels).float().to(device)
holdout_inputs = holdout_inputs[None, :, :].repeat([self._num_network, 1, 1])
holdout_labels = holdout_labels[None, :, :].repeat([self._num_network, 1, 1])
## 保留最好的结果
self._snapshots = {i: (None, 1e10) for i in range(self._num_network)}
for epoch in itertools.count():
## 定义每一个网络的训练数据
train_index = np.vstack([
np.random.permutation(train_inputs.shape[0])
for _ in range(self._num_network)
])
## 所有真实数据都用来训练
for batch_start_pos in range(0, train_inputs.shape[0], batch_size):
batch_index = train_index[:, batch_start_pos:batch_start_pos + batch_size]
train_input = torch.from_numpy(train_inputs[batch_index]).float().to(device)
train_label = torch.from_numpy(train_labels[batch_index]).float().to(device)
mean, logvar = self.model(train_input, return_log_var=True)
loss, _ = self.model.loss(mean, logvar, train_label)
self.model.train(loss)
with torch.no_grad():
mean, logvar = self.model(holdout_inputs, return_log_var=True)
_, holdout_losses = self.model.loss(
mean,
logvar,
holdout_labels,
use_var_loss=False
)
holdout_losses = holdout_losses.cpu()
break_condition = self._save_best(epoch, holdout_losses)
if break_condition or epoch > max_iter: ## 结束训练
break
def _save_best(self, epoch, losses, threshold=0.1):
updated = False
for i in range(len(losses)):
current = losses[i]
_, best = self._snapshots[i]
improvement = (best - current) / best
if improvement > threshold:
self._snapshots[i] = (epoch, current)
updated = True
self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1
return self._epoch_since_last_update > 5
def predict(self, inputs, batch_size=64):
mean, var = [], []
for i in range(0, inputs.shape[0], batch_size):
input = torch.from_numpy(
inputs[i:min(i + batch_size, inputs.shape[0])]
).float().to(device)
cur_mean, cur_var = self.model(input[None, :, :].repeat([self._num_network, 1, 1]), return_log_var=False)
mean.append(cur_mean.detach().cpu().numpy())
var.append(cur_var.detach().cpu().numpy())
return np.hstack(mean), np.hstack(var)
有了环境模型之后,我们就可以定义一个FakeEnv,主要用于实现给定状态和动作,用模型集成来进行预测。该功能会用在 MPC 算法中。
class FakeEnv:
def __init__(self, model):
self.model = model
def step(self, obs, act):
inputs = np.concatenate((obs, act), axis=-1)
ensemble_model_means, ensemble_model_vars = self.model.predict(inputs)
ensemble_model_means[:, :, 1:] += obs.numpy()
ensemble_model_stds = np.sqrt(ensemble_model_vars)
ensemble_samples = ensemble_model_means + np.random.normal(
size=ensemble_model_means.shape
) * ensemble_model_stds
num_models, batch_size, _ = ensemble_model_means.shape
models_to_use = np.random.choice(
[i for i in range(self.model._num_network)],
size=batch_size
)
batch_inds = np.arange(0, batch_size)
samples = ensemble_samples[models_to_use, batch_inds]
rewards, next_obs = samples[:, :1], samples[:, 1:]
return rewards, next_obs
def propagate(self, obs, actions):
with torch.no_grad():
obs = np.copy(obs)
total_reward = np.expand_dims(np.zeros(obs.shape[0]), axis=-1)
obs, actions = torch.as_tensor(obs), torch.as_tensor(actions)
for i in range(actions.shape[1]):
action = torch.unsqueeze(actions[:, i], 1)
rewards, next_obs = self.step(obs, action)
total_reward += rewards
obs = torch.as_tensor(next_obs)
return total_reward
接下来定义经验回放池的类Replay Buffer
。与之前的章节对比,此处经验回放缓冲区会额外实现一个返回所有数据的函数。
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def size(self):
return len(self.buffer)
def return_all_samples(self):
all_transitions = list(self.buffer)
state, action, reward, next_state, done = zip(*all_transitions)
return np.array(state), action, reward, np.array(next_state), done
接下来是 PETS
算法的主体部分。
class PETS:
''' PETS算法 '''
def __init__(self, env, replay_buffer, n_sequence, elite_ratio,
plan_horizon, num_episodes):
self._env = env
self._env_pool = replay_buffer
obs_dim = env.observation_space.shape[0]
self._action_dim = env.action_space.shape[0]
self._model = EnsembleDynamicsModel(obs_dim, self._action_dim)
self._fake_env = FakeEnv(self._model)
self.upper_bound = env.action_space.high[0]
self.lower_bound = env.action_space.low[0]
self._cem = CEM(n_sequence, elite_ratio, self._fake_env,
self.upper_bound, self.lower_bound)
self.plan_horizon = plan_horizon
self.num_episodes = num_episodes
def train_model(self):
env_samples = self._env_pool.return_all_samples()
obs = env_samples[0]
actions = np.array(env_samples[1])
rewards = np.array(env_samples[2]).reshape(-1, 1)
next_obs = env_samples[3]
inputs = np.concatenate((obs, actions), axis=-1)
labels = np.concatenate((rewards, next_obs - obs), axis=-1)
self._model.train(inputs, labels)
def mpc(self):
mean = np.tile(
(self.upper_bound + self.lower_bound) / 2.0,
self.plan_horizon
)
var = np.tile(
np.square(self.upper_bound - self.lower_bound) / 16,
self.plan_horizon
)
obs, done, episode_return = self._env.reset(), False, 0
while not done:
actions = self._cem.optimize(obs, mean, var)
action = actions[:self._action_dim] ## 选取第一个动作
next_obs, reward, done, _ = self._env.step(action)
self._env_pool.add(obs, action, reward, next_obs, done)
obs = next_obs
episode_return += reward
mean = np.concatenate([
np.copy(actions)[self._action_dim:],
np.zeros(self._action_dim)
])
return episode_return
def explore(self):
obs, done, episode_return = self._env.reset(), False, 0
while not done:
action = self._env.action_space.sample()
next_obs, reward, done, _ = self._env.step(action)
self._env_pool.add(obs, action, reward, next_obs, done)
obs = next_obs
episode_return += reward
return episode_return
def train(self):
return_list = []
explore_return = self.explore() ## 先进行随机策略的探索来收集一条序列的数据
print('episode: 1, return: %d' % explore_return)
return_list.append(explore_return)
for i_episode in range(self.num_episodes - 1):
self.train_model()
episode_return = self.mpc()
return_list.append(episode_return)
print('episode: %d, return: %d' % (i_episode + 2, episode_return))
return return_list
大功告成!让我们在倒立摆环境上试一下吧,以下代码需要一定的运行时间。
buffer_size = 100000
n_sequence = 50
elite_ratio = 0.2
plan_horizon = 25
num_episodes = 10
env_name = 'Pendulum-v0'
env = gym.make(env_name)
replay_buffer = ReplayBuffer(buffer_size)
pets = PETS(env, replay_buffer, n_sequence, elite_ratio, plan_horizon, num_episodes)
return_list = pets.train()
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PETS on {}'.format(env_name))
plt.show()
episode: 1, return: -985
episode: 2, return: -1384
episode: 3, return: -1006
episode: 4, return: -1853
episode: 5, return: -378
episode: 6, return: -123
episode: 7, return: -124
episode: 8, return: -122
episode: 9, return: -124
episode: 10, return: -125
可以看出,PETS 算法的效果非常好,但是由于每次选取动作都需要在环境模型上进行大量的模拟,因此运行速度非常慢。与 SAC 算法的结果进行对比可以看出,PETS 算法大大提高了样本效率,在比 SAC 算法的环境交互次数少得多的情况下就取得了差不多的效果。
通过学习与实践,我们可以看出模型预测控制(MPC)方法有着其独特的优势,例如它不用构建和训练策略,可以更好地利用环境,可以进行更长步数的规划。但是 MPC 也有其局限性,例如模型在多步推演之后的准确性会大大降低,简单的控制策略对于复杂系统可能不够。MPC 还有一个更为严重的问题,即每次计算动作的复杂度太大,这使其在一些策略及时性要求较高的系统中应用就变得不太现实。
[1] CHUA K, CALANDRA R, MCALLISTER R, et al. Deep reinforcement learning in a handful of trials using probabilistic dynamics models [J]. Advances in neural information processing systems, 2018: 31.
[2] LAKSHMINARAYANAN B, PRITZEL A, BLUNDELL C. Simple and scalable predictive uncertainty estimation using deep ensembles [J]. Advances in neural information processing systems, 2017: 30.
第 16 章介绍的 PETS 算法是基于模型的强化学习算法中的一种,它没有显式构建一个策略(即一个从状态到动作的映射函数)。回顾一下之前介绍过的 Dyna-Q 算法,它也是一种基于模型的强化学习算法。但是 Dyna-Q 算法中的模型只存储之前遇到的数据,只适用于表格型环境。而在连续型状态和动作的环境中,我们需要像 PETS 算法一样学习一个用神经网络表示的环境模型,此时若继续利用 Dyna 的思想,可以在任意状态和动作下用环境模型来生成一些虚拟数据,这些虚拟数据可以帮助进行策略的学习。如此,通过和模型进行交互产生额外的虚拟数据,对真实环境中样本的需求量就会减少,因此通常会比无模型的强化学习方法具有更高的采样效率。本章将介绍这样一种算法——MBPO 算法。
基于模型的策略优化(model-based policy optimization,MBPO)算法是加州大学伯克利分校的研究员在 2019 年的 NeurIPS 会议中提出的。随即 MBPO 成为深度强化学习中最重要的基于模型的强化学习算法之一。
MBPO 算法基于以下两个关键的观察: (1) 随着环境模型的推演步数变长,模型累积的复合误差会快速增加,使得环境模型得出的结果变得很不可靠; (2) 必须要权衡推演步数增加后模型增加的误差带来的负面作用与步数增加后使得训练的策略更优的正面作用,二者的权衡决定了推演的步数。
MBPO 算法在这两个观察的基础之上,提出只使用模型来从之前访问过的真实状态开始进行较短步数的推演,而非从初始状态开始进行完整的推演。这就是 MBPO 中的分支推演(branched rollout)的概念,即在原来真实环境中采样的轨迹上面推演出新的“短分支”,如图 17-1 所示。这样做可以使模型的累积误差不至于过大,从而保证最后的采样效率和策略表现。
图17-1 分支推演示意图
MBPO 与第 6 章讲解的经典的 Dyna-Q 算法十分类似。Dyna-Q 采用的无模型强化学习部分是 Q-learning,而 MBPO 采用的是 SAC。此外,MBPO 算法中环境模型的构建和 PETS 算法中一致,都使用模型集成的方式,并且其中每一个环境模型的输出都是一个高斯分布。接下来,我们来看一下 MBPO 的具体算法框架。MBPO 算法会把真实环境样本作为分支推演的起点,使用模型进行一定步数的推演,并用推演得到的模型数据用来训练模型。
、环境模型参数
、真实环境数据集
、模型数据集
do
do
与环境交互,并将交互的轨迹添加到
中
do
中均匀随机采样一个状态
为初始状态,在模型中使用策略
进行
步的推演,并将生成的轨迹添加到
中
do
,使用 SAC 来更新策略参数
分支推演的长度kkk是平衡样本效率和策略性能的重要超参数。接下来我们看看 MBPO 的代码,本章最后会给出关于 MBPO 的理论推导,可以指导参数kkk的选取。
首先,我们先导入一些必要的包。
import gym
from collections import namedtuple
import itertools
from itertools import count
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.normal import Normal
import numpy as np
import collections
import random
import matplotlib.pyplot as plt
MBPO 算法使用 SAC 算法来训练策略。和 SAC 算法相比,MBPO 多用了一些模型推演得到的数据来训练策略。要想了解 SAC 方法的详细过程,读者可以阅读第 14 章对应的内容。我们将 SAC 代码直接复制到此处。
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)
self.fc_std = torch.nn.Linear(hidden_dim, action_dim)
self.action_bound = action_bound
def forward(self, x):
x = F.relu(self.fc1(x))
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x))
dist = Normal(mu, std)
normal_sample = dist.rsample() ## rsample()是重参数化采样函数
log_prob = dist.log_prob(normal_sample)
action = torch.tanh(normal_sample) ## 计算tanh_normal分布的对数概率密度
log_prob = log_prob - torch.log(1 - torch.tanh(action).pow(2) + 1e-7)
action = action * self.action_bound
return action, log_prob
class QValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(QValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x, a):
cat = torch.cat([x, a], dim=1) ## 拼接状态和动作
x = F.relu(self.fc1(cat))
return self.fc2(x)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
"cpu")
class SAC:
''' 处理连续动作的SAC算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device) ## 策略网络
## 第一个Q网络
self.critic_1 = QValueNet(state_dim, hidden_dim, action_dim).to(device)
## 第二个Q网络
self.critic_2 = QValueNet(state_dim, hidden_dim, action_dim).to(device)
self.target_critic_1 = QValueNet(state_dim, hidden_dim, action_dim).to(device) ## 第一个目标Q网络
self.target_critic_2 = QValueNet(state_dim, hidden_dim, action_dim).to(device) ## 第二个目标Q网络
## 令目标Q网络的初始参数和Q网络一样
self.target_critic_1.load_state_dict(self.critic_1.state_dict())
self.target_critic_2.load_state_dict(self.critic_2.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr)
self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr)
## 使用alpha的log值,可以使训练结果比较稳定
self.log_alpha = torch.tensor(np.log(0.01), dtype=torch.float)
self.log_alpha.requires_grad = True ## 可以对alpha求梯度
self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr)
self.target_entropy = target_entropy ## 目标熵的大小
self.gamma = gamma
self.tau = tau
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(device)
action = self.actor(state)[0]
return [action.item()]
def calc_target(self, rewards, next_states, dones): ## 计算目标Q值
next_actions, log_prob = self.actor(next_states)
entropy = -log_prob
q1_value = self.target_critic_1(next_states, next_actions)
q2_value = self.target_critic_2(next_states, next_actions)
next_value = torch.min(q1_value, q2_value) + self.log_alpha.exp() * entropy
td_target = rewards + self.gamma * next_value * (1 - dones)
return td_target
def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(),
net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) +
param.data * self.tau)
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(device)
actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(device)
rewards = (rewards + 8.0) / 8.0 ## 对倒立摆环境的奖励进行重塑
## 更新两个Q网络
td_target = self.calc_target(rewards, next_states, dones)
critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach()))
critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach()))
self.critic_1_optimizer.zero_grad()
critic_1_loss.backward()
self.critic_1_optimizer.step()
self.critic_2_optimizer.zero_grad()
critic_2_loss.backward()
self.critic_2_optimizer.step()
## 更新策略网络
new_actions, log_prob = self.actor(states)
entropy = -log_prob
q1_value = self.critic_1(states, new_actions)
q2_value = self.critic_2(states, new_actions)
actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min(q1_value, q2_value))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
## 更新alpha值
alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp())
self.log_alpha_optimizer.zero_grad()
alpha_loss.backward()
self.log_alpha_optimizer.step()
self.soft_update(self.critic_1, self.target_critic_1)
self.soft_update(self.critic_2, self.target_critic_2)
接下来定义环境模型,注意这里的环境模型和 PETS 算法中的环境模型是一样的,由多个高斯分布策略的集成来构建。我们也沿用 PETS 算法中的模型构建代码。
class Swish(nn.Module):
''' Swish激活函数 '''
def __init__(self):
super(Swish, self).__init__()
def forward(self, x):
return x * torch.sigmoid(x)
def init_weights(m):
''' 初始化模型权重 '''
def truncated_normal_init(t, mean=0.0, std=0.01):
torch.nn.init.normal_(t, mean=mean, std=std)
while True:
cond = (t < mean - 2 * std) | (t > mean + 2 * std)
if not torch.sum(cond):
break
t = torch.where(
cond,
torch.nn.init.normal_(
torch.ones(t.shape, device=device),
mean=mean,
std=std
),
t
)
return t
if type(m) == nn.Linear or isinstance(m, FCLayer):
truncated_normal_init(m.weight, std= 1 / (2 * np.sqrt(m._input_dim)))
m.bias.data.fill_(0.0)
class FCLayer(nn.Module):
''' 集成之后的全连接层 '''
def __init__(self, input_dim, output_dim, ensemble_size, activation):
super(FCLayer, self).__init__()
self._input_dim, self._output_dim = input_dim, output_dim
self.weight = nn.Parameter(
torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
self._activation = activation
self.bias = nn.Parameter(
torch.Tensor(ensemble_size, output_dim).to(device))
def forward(self, x):
return self._activation(
torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))
接着,我们就可以定义集成模型了,其中就会用到刚刚定义的全连接层。
class EnsembleModel(nn.Module):
''' 环境模型集成 '''
def __init__(self,
state_dim,
action_dim,
model_alpha,
ensemble_size=5,
learning_rate=1e-3):
super(EnsembleModel, self).__init__()
## 输出包括均值和方差,因此是状态与奖励维度之和的两倍
self._output_dim = (state_dim + 1) * 2
self._model_alpha = model_alpha ## 模型损失函数中加权时的权重
self._max_logvar = nn.Parameter((torch.ones(
(1, self._output_dim // 2)).float() / 2).to(device),
requires_grad=False)
self._min_logvar = nn.Parameter((-torch.ones(
(1, self._output_dim // 2)).float() * 10).to(device),
requires_grad=False)
self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size,
Swish())
self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
self.layer5 = FCLayer(200, self._output_dim, ensemble_size,
nn.Identity())
self.apply(init_weights) ## 初始化环境模型中的参数
self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
def forward(self, x, return_log_var=False):
ret = self.layer5(self.layer4(self.layer3(self.layer2(
self.layer1(x)))))
mean = ret[:, :, :self._output_dim // 2]
## 在PETS算法中,将方差控制在最小值和最大值之间
logvar = ret[:, :, self._output_dim // 2:]
logvar = self._max_logvar - F.softplus(self._max_logvar - logvar)
logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
return mean, logvar if return_log_var else torch.exp(logvar)
def loss(self, mean, logvar, labels, use_var_loss=True):
inverse_var = torch.exp(-logvar)
if use_var_loss:
mse_loss = torch.mean(
torch.mean(
torch.pow(mean - labels, 2) * inverse_var, dim=-1),
dim=-1
)
var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
else:
mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
total_loss = torch.sum(mse_loss)
return total_loss, mse_loss
def train(self, loss):
self.optimizer.zero_grad()
loss += self._model_alpha * torch.sum(
self._max_logvar) - self._model_alpha * torch.sum(self._min_logvar)
loss.backward()
self.optimizer.step()
class EnsembleDynamicsModel:
''' 环境模型集成,加入精细化的训练 '''
def __init__(self, state_dim, action_dim, model_alpha=0.01, num_network=5):
self._num_network = num_network
self._state_dim, self._action_dim = state_dim, action_dim
self.model = EnsembleModel(
state_dim,
action_dim,
model_alpha,
ensemble_size=num_network
)
self._epoch_since_last_update = 0
def train(
self,
inputs,
labels,
batch_size=64,
holdout_ratio=0.1,
max_iter=20
):
## 设置训练集与验证集
permutation = np.random.permutation(inputs.shape[0])
inputs, labels = inputs[permutation], labels[permutation]
num_holdout = int(inputs.shape[0] * holdout_ratio)
train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]
holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]
holdout_inputs = torch.from_numpy(holdout_inputs).float().to(device)
holdout_labels = torch.from_numpy(holdout_labels).float().to(device)
holdout_inputs = holdout_inputs[None, :, :].repeat([self._num_network, 1, 1])
holdout_labels = holdout_labels[None, :, :].repeat([self._num_network, 1, 1])
## 保留最好的结果
self._snapshots = {i: (None, 1e10) for i in range(self._num_network)}
for epoch in itertools.count():
## 定义每一个网络的训练数据
train_index = np.vstack([
np.random.permutation(train_inputs.shape[0])
for _ in range(self._num_network)
])
## 所有真实数据都用来训练
for batch_start_pos in range(0, train_inputs.shape[0], batch_size):
batch_index = train_index[:, batch_start_pos:batch_start_pos + batch_size]
train_input = torch.from_numpy(train_inputs[batch_index]).float().to(device)
train_label = torch.from_numpy(train_labels[batch_index]).float().to(device)
mean, logvar = self.model(train_input, return_log_var=True)
loss, _ = self.model.loss(mean, logvar, train_label)
self.model.train(loss)
with torch.no_grad():
mean, logvar = self.model(holdout_inputs, return_log_var=True)
_, holdout_losses = self.model.loss(mean, logvar, holdout_labels, use_var_loss=False)
holdout_losses = holdout_losses.cpu()
break_condition = self._save_best(epoch, holdout_losses)
if break_condition or epoch > max_iter: ## 结束训练
break
def _save_best(self, epoch, losses, threshold=0.1):
updated = False
for i in range(len(losses)):
current = losses[i]
_, best = self._snapshots[i]
improvement = (best - current) / best
if improvement > threshold:
self._snapshots[i] = (epoch, current)
updated = True
self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1
return self._epoch_since_last_update > 5
def predict(self, inputs, batch_size=64):
inputs = np.tile(inputs, (self._num_network, 1, 1))
inputs = torch.tensor(inputs, dtype=torch.float).to(device)
mean, var = self.model(inputs, return_log_var=False)
return mean.detach().cpu().numpy(), var.detach().cpu().numpy()
class FakeEnv:
def __init__(self, model):
self.model = model
def step(self, obs, act):
inputs = np.concatenate((obs, act), axis=-1)
ensemble_model_means, ensemble_model_vars = self.model.predict(inputs)
ensemble_model_means[:, :, 1:] += obs
ensemble_model_stds = np.sqrt(ensemble_model_vars)
ensemble_samples = ensemble_model_means + np.random.normal(size=ensemble_model_means.shape) * ensemble_model_stds
num_models, batch_size, _ = ensemble_model_means.shape
models_to_use = np.random.choice([i for i in range(self.model._num_network)], size=batch_size)
batch_inds = np.arange(0, batch_size)
samples = ensemble_samples[models_to_use, batch_inds]
rewards, next_obs = samples[:, :1][0][0], samples[:, 1:][0]
return rewards, next_obs
最后,我们来实现 MBPO 算法的具体流程。
class MBPO:
def __init__(self, env, agent, fake_env, env_pool, model_pool,
rollout_length, rollout_batch_size, real_ratio, num_episode):
self.env = env
self.agent = agent
self.fake_env = fake_env
self.env_pool = env_pool
self.model_pool = model_pool
self.rollout_length = rollout_length
self.rollout_batch_size = rollout_batch_size
self.real_ratio = real_ratio
self.num_episode = num_episode
def rollout_model(self):
observations, _, _, _, _ = self.env_pool.sample(self.rollout_batch_size)
for obs in observations:
for i in range(self.rollout_length):
action = self.agent.take_action(obs)
reward, next_obs = self.fake_env.step(obs, action)
self.model_pool.add(obs, action, reward, next_obs, False)
obs = next_obs
def update_agent(self, policy_train_batch_size=64):
env_batch_size = int(policy_train_batch_size * self.real_ratio)
model_batch_size = policy_train_batch_size - env_batch_size
for epoch in range(10):
env_obs, env_action, env_reward, env_next_obs, env_done = self.env_pool.sample(env_batch_size)
if self.model_pool.size() > 0:
model_obs, model_action, model_reward, model_next_obs, model_done = self.model_pool.sample(model_batch_size)
obs = np.concatenate((env_obs, model_obs), axis=0)
action = np.concatenate((env_action, model_action), axis=0)
next_obs = np.concatenate((env_next_obs, model_next_obs), axis=0)
reward = np.concatenate((env_reward, model_reward), axis=0)
done = np.concatenate((env_done, model_done), axis=0)
else:
obs, action, next_obs, reward, done = env_obs, env_action, env_next_obs, env_reward, env_done
transition_dict = {
'states': obs,
'actions': action,
'next_states': next_obs,
'rewards': reward,
'dones': done
}
self.agent.update(transition_dict)
def train_model(self):
obs, action, reward, next_obs, done = self.env_pool.return_all_samples()
inputs = np.concatenate((obs, action), axis=-1)
reward = np.array(reward)
labels = np.concatenate(
(np.reshape(reward, (reward.shape[0], -1)), next_obs - obs),
axis=-1
)
self.fake_env.model.train(inputs, labels)
def explore(self):
obs, done, episode_return = self.env.reset(), False, 0
while not done:
action = self.agent.take_action(obs)
next_obs, reward, done, _ = self.env.step(action)
self.env_pool.add(obs, action, reward, next_obs, done)
obs = next_obs
episode_return += reward
return episode_return
def train(self):
return_list = []
explore_return = self.explore() ## 随机探索采取数据
print('episode: 1, return: %d' % explore_return)
return_list.append(explore_return)
for i_episode in range(self.num_episode - 1):
obs, done, episode_return = self.env.reset(), False, 0
step = 0
while not done:
if step % 50 == 0:
self.train_model()
self.rollout_model()
action = self.agent.take_action(obs)
next_obs, reward, done, _ = self.env.step(action)
self.env_pool.add(obs, action, reward, next_obs, done)
obs = next_obs
episode_return += reward
self.update_agent()
step += 1
return_list.append(episode_return)
print('episode: %d, return: %d' % (i_episode + 2, episode_return))
return return_list
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def size(self):
return len(self.buffer)
def sample(self, batch_size):
if batch_size > len(self.buffer):
return self.return_all_samples()
else:
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
def return_all_samples(self):
all_transitions = list(self.buffer)
state, action, reward, next_state, done = zip(*all_transitions)
return np.array(state), action, reward, np.array(next_state), done
对于不同的环境,我们需要设置不同的参数。这里以 OpenAI Gym 中的 Pendulum-v0 环境为例,给出一组效果较为不错的参数。读者可以试着自己调节参数,观察调节后的效果。
real_ratio = 0.5
env_name = 'Pendulum-v0'
env = gym.make(env_name)
num_episodes = 20
actor_lr = 5e-4
critic_lr = 5e-3
alpha_lr = 1e-3
hidden_dim = 128
gamma = 0.98
tau = 0.005 ## 软更新参数
buffer_size = 10000
target_entropy = -1
model_alpha = 0.01 ## 模型损失函数中的加权权重
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high[0] ## 动作最大值
rollout_batch_size = 1000
rollout_length = 1 ## 推演长度k,推荐更多尝试
model_pool_size = rollout_batch_size * rollout_length
agent = SAC(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma)
model = EnsembleDynamicsModel(state_dim, action_dim, model_alpha)
fake_env = FakeEnv(model)
env_pool = ReplayBuffer(buffer_size)
model_pool = ReplayBuffer(model_pool_size)
mbpo = MBPO(env, agent, fake_env, env_pool, model_pool, rollout_length, rollout_batch_size, real_ratio, num_episodes)
return_list = mbpo.train()
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('MBPO on {}'.format(env_name))
plt.show()
episode: 1, return: -1617
episode: 2, return: -1463
episode: 3, return: -1407
episode: 4, return: -929
episode: 5, return: -860
episode: 6, return: -643
episode: 7, return: -128
episode: 8, return: -368
episode: 9, return: -118
episode: 10, return: -123
episode: 11, return: -122
episode: 12, return: -118
episode: 13, return: -119
episode: 14, return: -119
episode: 15, return: -121
episode: 16, return: -123
episode: 17, return: 0
episode: 18, return: -125
episode: 19, return: -126
episode: 20, return: -243
可以看到,相比无模型的强化学习算法,基于模型的方法 MBPO 在样本效率上要高很多。虽然这里的效果可能不如 16.3 节提到的 PETS 算法优秀,但是在许多更加复杂的环境中(如 Hopper 和 HalfCheetah),MBPO 的表现远远好于 PETS 算法。
MBPO 算法是一种前沿的基于模型的强化学习算法,它提出了一个重要的概念——分支推演。在各种复杂的环境中,作者验证了 MBPO 的效果超过了之前基于模型的方法。MBPO 对于基于模型的强化学习的发展起着重要的作用,不少之后的工作都是在此基础上进行的。
除了算法的有效性,MBPO 的重要贡献还包括它给出了关于分支推演步数与模型误差、策略偏移程度之间的定量关系,进而阐明了什么时候我们可以相信并使用环境模型,什么样的环境导出的最优分支推演步数为 0,进而建议不使用环境模型。相应的理论分析在 17.5 节给出。
基于模型的方法往往是在环境模型中提升策略的性能,但这并不能保证在真实环境中策略性能也有所提升。因此,我们希望模型环境和真实环境中的结果的差距有一定的限制,具体可形式化为:
其中,
表示策略在真实环境中的期望回报,而
表示策略在模型环境中的期望回报。这一公式保证了在模型环境中提高策略性能超过
时,就可以在真实环境中取得策略性能的提升。
在 MBPO 中,根据泛化误差和分布偏移估计出这样一个下界:
其中,
刻画了模型泛化误差,而
刻画了当前策略
与数据收集策略
之间的策略转移(policy shift)。
在上面的公式里,如果模型泛化误差很大,就可能不存在一个使得推演误差
最小的正数推演步数
,进而无法使用模型。因此作者提出了分支推演(branched rollout)的办法,即从之前访问过的状态开始进行有限制的推演,从而保证模型工作时的泛化误差不要太大。
如果我们使用当前策略
而非本轮之前的数据收集策略
来估计模型误差(记为
),那么有:
并且对其进行线性近似:
结合上
步分支推演,我们就可以得到一个新的策略期望回报界:
其中,
表示使用分支推演的方法得到的策略期望回报。通过以上公式,我们就可以得到理论最优的推演步长,即:
在上式中可以看到,对于
,当推演步数变大时,
变小,而
变大。更进一步,如果
足够小的话,由
的关系可知,如果
足够小,那么最优推演步长
就是为正,此时分支推演(或者说使用基于模型的方法)就是一个有效的方法。
MBPO 论文中展示了在主流的机器人运动环境 Mojoco 的典型场景中,
的数量级非常小,大约都在
区间,而且可以看出它随着训练数据的增多的而不断下降,说明模型的泛化能力逐渐增强,而我们对于推演步长为正数的假设也是合理的。但要知道,并不是所有的强化学习环境都可以有如此小的
。例如在高随机性的离散状态环境中,往往环境模型的拟合精度较低,以至于
较大,此时使用基于分支推演的方法效果有限。
[1] JANNER M, FU J, ZHANG M, et al. When to trust your model: model-based policy optimization [J]. Advances in neural information processing systems 2019, 32: 12519-12530.
在前面的学习中,我们已经对强化学习有了不少了解。无论是在线策略(on-policy)算法还是离线策略(off-policy)算法,都有一个共同点:智能体在训练过程中可以不断和环境交互,得到新的反馈数据。二者的区别主要在于在线策略算法会直接使用这些反馈数据,而离线策略算法会先将数据存入经验回放池中,需要时再采样。然而,在现实生活中的许多场景下,让尚未学习好的智能体和环境交互可能会导致危险发生,或是造成巨大损失。例如,在训练自动驾驶的规控智能体时,如果让智能体从零开始和真实环境进行交互,那么在训练的最初阶段,它操控的汽车无疑会横冲直撞,造成各种事故。再例如,在推荐系统中,用户的反馈往往比较滞后,统计智能体策略的回报需要很长时间。而如果策略存在问题,早期的用户体验不佳,就会导致用户流失等后果。因此,离线强化学习(offline reinforcement learning)的目标是,在智能体不和环境交互的情况下,仅从已经收集好的确定的数据集中,通过强化学习算法得到比较好的策略。离线强化学习和在线策略算法、离线策略算法的区别如图 18-1 所示。
图18-1 离线强化学习和在线策略算法、离线策略算法的区别
图 18-1 中的离线强化学习和离线策略强化学习很像,都要从经验回放池中采样进行训练,并且离线策略算法的策略评估方式也多种多样。因此,研究者们最开始尝试将离线策略算法直接照搬到离线的环境下,仅仅是去掉算法中和环境交互的部分。然而,这种做法完全失败了。研究者进行了 3 个简单的实验。第一个实验,作者使用 DDPG 算法训练了一个智能体,并将智能体与环境交互的所有数据都记录下来,再用这些数据训练离线 DDPG 智能体。第二个实验,在线 DDPG 算法在训练时每次从经验回放池中采样,并用相同的数据同步训练离线 DDPG 智能体,这样两个智能体甚至连训练时用到的数据顺序都完全相同。第三个实验,在线 DDPG 算法在训练完毕后作为专家,在环境中采集大量数据,供离线 DDPG 智能体学习。这 3 个实验,即完全回放、同步训练、模仿训练的结果依次如图 18-2 所示。
让人惊讶的是,3 个实验中,离线 DDPG 智能体的表现都远远差于在线 DDPG 智能体,即便是第二个实验的同步训练都无法提高离线智能体的表现。在第三个模仿训练实验中,离线智能体面对非常优秀的数据样本却什么都没学到!针对这种情况,研究者指出,外推误差(extrapolation error)是离线策略算法不能直接迁移到离线环境中的原因。
外推误差,是指由于当前策略可能访问到的状态动作对与从数据集中采样得到的状态动作对的分布不匹配而产生的误差。为什么在线强化学习算法没有受到外推误差的影响呢?因为对于在线强化学习,即使训练是离线策略的,智能体依然有机会通过与环境交互及时采样到新的数据,从而修正这些误差。但是在离线强化学习中,智能体无法和环境交互。因此,一般来说,离线强化学习算法要想办法尽可能地限制外推误差的大小,从而得到较好的策略。
为了减少外推误差,当前的策略需要做到只访问与数据集中相似的(s,a)数据。满足这一要求的策略称为批量限制策略(batch-constrained policy)。具体来说,这样的策略在选择动作时有 3 个目标:
对于标准的表格(tabular)型环境,状态和动作空间都是离散且有限的。标准的 Q-learning 更新公式可以写为:
这时,只需要把策略
能选择的动作限制在数据集
内,就能满足上述 3 个目标的平衡,这样就得到了表格设定下的批量限制 Q-learning(batch-constrained Q-learning,BCQ)算法:
可以证明,如果数据中包含了所有可能的
对,按上式进行迭代可以收敛到最优的价值函数
。
连续状态和动作的情况要复杂一些,因为批量限制策略的目标需要被更详细地定义。例如,该如何定义两个状态动作对的距离呢?BCQ采用了一种巧妙的方法:训练一个生成模型
。对于数据集
和其中的状态
,生成模型
能给出与
中数据接近的一系列动作
用于
网络的训练。更进一步,为了增加生成动作的多样性,减少生成次数,BCQ 还引入了扰动模型
。输入
时,模型给出一个绝对值最大为
的微扰并附加在动作上。这两个模型综合起来相当于给出了一个批量限制策略
:
其中,生成模型
用变分自动编码器(variational auto-encoder, VAE)实现;扰动模型直接通过确定性策略梯度算法训练,目标是使
函数最大化:
总结起来,BCQ 算法的流程如下:
除此之外,BCQ 还使用了一些实现上的小技巧。由于不是 BCQ 的重点,此处不再赘述。考虑到 VAE 不属于本书的讨论范围,并且 BCQ 的代码中有较多技巧,有兴趣的读者可以参阅 BCQ 原文,自行实现代码。此处介绍 BCQ 算法,一是因为它对离线强化学习的误差分析和实验很有启发性,二是因为它是无模型离线强化学习中限制策略集合算法中的经典方法。下面我们介绍另一类直接限制QQQ函数的算法的代表:保守 Q-learning。
18.2 节已经讲到,离线强化学习面对的巨大挑战是如何减少外推误差。实验证明,外推误差主要会导致在远离数据集的点上函数
的过高估计,甚至常常出现
值向上发散的情况。因此,如果能用某种方法将算法中偏离数据集的点上的函数
保持在很低的值,或许能消除部分外推误差的影响,这就是保守 Q-learning(conservative Q-learning,CQL)算法的基本思想。CQL 在普通的贝尔曼方程上引入一些额外的限制项,达到了这一目标。接下来一步步介绍 CQL 算法的思路。
在普通的 Q-learning 中,
的更新方程可以写为:
其中,
是实际计算时策略
的贝尔曼算子。为了防止
值在各个状态上(尤其是不在数据集中的状态上)的过高估计,我们要对某些状态上的高
值进行惩罚。考虑一般情况,我们希望
在某个特定分布
上的期望值最小。在上式中,
的计算需要用到
,但只有
是生成的,可能不在数据集中。因此,我们对数据集中的状态
按策略
得到的动作进行惩罚:
其中,
是平衡因子。可以证明,上式迭代收敛给出的函数
在任何
上的值都比真实值要小。不过,如果我们放宽条件,只追求
在
上的期望值
比真实值小的话,就可以略微放松对上式的约束。一个自然的想法是,对于符合用于生成数据集的行为策略
的数据点,我们可以认为
对这些点的估值较为准确,在这些点上不必限制让
值很小。作为对第一项的补偿,将上式改为:
将行为策略
写为
是因为我们无法获知真实的行为策略,只能通过数据集中已有的数据近似得到。可以证明,当
时,上式迭代收敛得到的函数
虽然不是在每一点上都小于真实值,但其期望是小于真实值的,即
。
至此,CQL 算法已经有了理论上的保证,但仍有一个缺陷:计算的时间开销太大了。当令
时,在
迭代的每一步,算法都要对策略
做完整的离线策略评估来计算上式中的arg min,再进行一次策略迭代,而离线策略评估是非常耗时的。既然
并非与
独立,而是通过
值最大的动作衍生出来的,那么我们完全可以用使
取最大值的
去近似
,即:
为了防止过拟合,再加上正则项
。综合起来就得到完整的迭代方程:
正则项采用和一个先验策略
的 KL 距离,即
。一般来说,取为均匀分布
即可,这样可以将迭代方程化简为:
可以注意到,简化后式中已经不含有
,为计算提供了很大方便。该化简需要进行一些数学推导,详细过程附在 18.6 节中,感兴趣的读者也可以先尝试自行推导。
上面给出了函数
的迭代方法。CQL 属于直接在函数
上做限制的一类方法,对策略
没有特殊要求,因此参考文献中分别给出了基于 DQN 和 SAC 两种框架的 CQL 算法。考虑到后者应用更广泛,且参考文献中大部分的实验结果都是基于后者得出的,这里只介绍基于 SAC 的版本。对 SAC 的策略迭代和自动调整熵正则系数不熟悉的读者,可以先阅读第 14 章的相关内容。
总结起来,CQL 算法流程如下:
网络
、目标
网络
和策略
、熵正则系数
do
:
下面在倒立摆环境中实现基础的 CQL 算法。该环境在前面的章节中已出现了多次,这里不再重复介绍。首先导入必要的库。
import numpy as np
import gym
from tqdm import tqdm
import random
import rl_utils
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
为了生成数据集,在倒立摆环境中从零开始训练一个在线 SAC 智能体,直到算法达到收敛效果,把训练过程中智能体采集的所有轨迹保存下来作为数据集。这样,数据集中既包含训练初期较差策略的采样,又包含训练后期较好策略的采样,是一个混合数据集。下面给出生成数据集的代码,SAC 部分直接使用 14.5 节中的代码,因此不再详细解释。
class PolicyNetContinuous(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
super(PolicyNetContinuous, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)
self.fc_std = torch.nn.Linear(hidden_dim, action_dim)
self.action_bound = action_bound
def forward(self, x):
x = F.relu(self.fc1(x))
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x))
dist = Normal(mu, std)
normal_sample = dist.rsample() ## rsample()是重参数化采样
log_prob = dist.log_prob(normal_sample)
action = torch.tanh(normal_sample)
## 计算tanh_normal分布的对数概率密度
log_prob = log_prob - torch.log(1 - torch.tanh(action).pow(2) + 1e-7)
action = action * self.action_bound
return action, log_prob
class QValueNetContinuous(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(QValueNetContinuous, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc_out = torch.nn.Linear(hidden_dim, 1)
def forward(self, x, a):
cat = torch.cat([x, a], dim=1)
x = F.relu(self.fc1(cat))
x = F.relu(self.fc2(x))
return self.fc_out(x)
class SACContinuous:
''' 处理连续动作的SAC算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma,
device):
self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim, action_bound).to(device) ## 策略网络
self.critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) ## 第一个Q网络
self.critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) ## 第二个Q网络
self.target_critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) ## 第一个目标Q网络
self.target_critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) ## 第二个目标Q网络
## 令目标Q网络的初始参数和Q网络一样
self.target_critic_1.load_state_dict(self.critic_1.state_dict())
self.target_critic_2.load_state_dict(self.critic_2.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr)
self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr)
## 使用alpha的log值,可以使训练结果比较稳定
self.log_alpha = torch.tensor(np.log(0.01), dtype=torch.float)
self.log_alpha.requires_grad = True #对alpha求梯度
self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr)
self.target_entropy = target_entropy ## 目标熵的大小
self.gamma = gamma
self.tau = tau
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.actor(state)[0]
return [action.item()]
def calc_target(self, rewards, next_states, dones): ## 计算目标Q值
next_actions, log_prob = self.actor(next_states)
entropy = -log_prob
q1_value = self.target_critic_1(next_states, next_actions)
q2_value = self.target_critic_2(next_states, next_actions)
next_value = torch.min(q1_value, q2_value) + self.log_alpha.exp() * entropy
td_target = rewards + self.gamma * next_value * (1 - dones)
return td_target
def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau)
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
rewards = (rewards + 8.0) / 8.0 ## 对倒立摆环境的奖励进行重塑
## 更新两个Q网络
td_target = self.calc_target(rewards, next_states, dones)
critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach()))
critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach()))
self.critic_1_optimizer.zero_grad()
critic_1_loss.backward()
self.critic_1_optimizer.step()
self.critic_2_optimizer.zero_grad()
critic_2_loss.backward()
self.critic_2_optimizer.step()
## 更新策略网络
new_actions, log_prob = self.actor(states)
entropy = -log_prob
q1_value = self.critic_1(states, new_actions)
q2_value = self.critic_2(states, new_actions)
actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min(q1_value, q2_value))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
## 更新alpha值
alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp())
self.log_alpha_optimizer.zero_grad()
alpha_loss.backward()
self.log_alpha_optimizer.step()
self.soft_update(self.critic_1, self.target_critic_1)
self.soft_update(self.critic_2, self.target_critic_2)
env_name = 'Pendulum-v0'
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high[0] ## 动作最大值
random.seed(0)
np.random.seed(0)
env.seed(0)
torch.manual_seed(0)
actor_lr = 3e-4
critic_lr = 3e-3
alpha_lr = 3e-4
num_episodes = 100
hidden_dim = 128
gamma = 0.99
tau = 0.005 ## 软更新参数
buffer_size = 100000
minimal_size = 1000
batch_size = 64
target_entropy = -env.action_space.shape[0]
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
replay_buffer = rl_utils.ReplayBuffer(buffer_size)
agent = SACContinuous(state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, alpha_lr, target_entropy, tau,
gamma, device)
return_list = rl_utils.train_off_policy_agent(env, agent, num_episodes,
replay_buffer, minimal_size,
batch_size)
Iteration 0: 100%|██████████| 10/10 [00:08<00:00, 1.23it/s, episode=10, return=-1534.655]
Iteration 1: 100%|██████████| 10/10 [00:15<00:00, 1.52s/it, episode=20, return=-1085.715]
Iteration 2: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=30, return=-364.507]
Iteration 3: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=40, return=-222.485]
Iteration 4: 100%|██████████| 10/10 [00:15<00:00, 1.58s/it, episode=50, return=-157.978]
Iteration 5: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=60, return=-166.056]
Iteration 6: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=70, return=-143.147]
Iteration 7: 100%|██████████| 10/10 [00:15<00:00, 1.57s/it, episode=80, return=-127.939]
Iteration 8: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=90, return=-180.905]
Iteration 9: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=100, return=-171.265]
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('SAC on {}'.format(env_name))
plt.show()
下面实现本章重点讨论的 CQL 算法,它在 SAC 的代码基础上做了修改。
class CQL:
''' CQL算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma,
device, beta, num_random):
self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim, action_bound).to(device)
self.critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device)
self.critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device)
self.target_critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device)
self.target_critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device)
self.target_critic_1.load_state_dict(self.critic_1.state_dict())
self.target_critic_2.load_state_dict(self.critic_2.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr)
self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr)
self.log_alpha = torch.tensor(np.log(0.01), dtype=torch.float)
self.log_alpha.requires_grad = True #对alpha求梯度
self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr)
self.target_entropy = target_entropy ## 目标熵的大小
self.gamma = gamma
self.tau = tau
self.beta = beta ## CQL损失函数中的系数
self.num_random = num_random ## CQL中的动作采样数
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(device)
action = self.actor(state)[0]
return [action.item()]
def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(),
net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) +
param.data * self.tau)
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(device)
actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(device)
rewards = (rewards + 8.0) / 8.0 ## 对倒立摆环境的奖励进行重塑
next_actions, log_prob = self.actor(next_states)
entropy = -log_prob
q1_value = self.target_critic_1(next_states, next_actions)
q2_value = self.target_critic_2(next_states, next_actions)
next_value = torch.min(q1_value, q2_value) + self.log_alpha.exp() * entropy
td_target = rewards + self.gamma * next_value * (1 - dones)
critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach()))
critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach()))
## 以上与SAC相同,以下Q网络更新是CQL的额外部分
batch_size = states.shape[0]
random_unif_actions = torch.rand(
[batch_size * self.num_random, actions.shape[-1]],
dtype=torch.float).uniform_(-1, 1).to(device)
random_unif_log_pi = np.log(0.5**next_actions.shape[-1])
tmp_states = states.unsqueeze(1).repeat(1, self.num_random, 1).view(-1, states.shape[-1])
tmp_next_states = next_states.unsqueeze(1).repeat(1, self.num_random, 1).view(-1, next_states.shape[-1])
random_curr_actions, random_curr_log_pi = self.actor(tmp_states)
random_next_actions, random_next_log_pi = self.actor(tmp_next_states)
q1_unif = self.critic_1(tmp_states, random_unif_actions).view(-1, self.num_random, 1)
q2_unif = self.critic_2(tmp_states, random_unif_actions).view(-1, self.num_random, 1)
q1_curr = self.critic_1(tmp_states, random_curr_actions).view(-1, self.num_random, 1)
q2_curr = self.critic_2(tmp_states, random_curr_actions).view(-1, self.num_random, 1)
q1_next = self.critic_1(tmp_states, random_next_actions).view(-1, self.num_random, 1)
q2_next = self.critic_2(tmp_states, random_next_actions).view(-1, self.num_random, 1)
q1_cat = torch.cat([
q1_unif - random_unif_log_pi,
q1_curr - random_curr_log_pi.detach().view(-1, self.num_random, 1),
q1_next - random_next_log_pi.detach().view(-1, self.num_random, 1)
], dim=1)
q2_cat = torch.cat([
q2_unif - random_unif_log_pi,
q2_curr - random_curr_log_pi.detach().view(-1, self.num_random, 1),
q2_next - random_next_log_pi.detach().view(-1, self.num_random, 1)
], dim=1)
qf1_loss_1 = torch.logsumexp(q1_cat, dim=1).mean()
qf2_loss_1 = torch.logsumexp(q2_cat, dim=1).mean()
qf1_loss_2 = self.critic_1(states, actions).mean()
qf2_loss_2 = self.critic_2(states, actions).mean()
qf1_loss = critic_1_loss + self.beta * (qf1_loss_1 - qf1_loss_2)
qf2_loss = critic_2_loss + self.beta * (qf2_loss_1 - qf2_loss_2)
self.critic_1_optimizer.zero_grad()
qf1_loss.backward(retain_graph=True)
self.critic_1_optimizer.step()
self.critic_2_optimizer.zero_grad()
qf2_loss.backward(retain_graph=True)
self.critic_2_optimizer.step()
## 更新策略网络
new_actions, log_prob = self.actor(states)
entropy = -log_prob
q1_value = self.critic_1(states, new_actions)
q2_value = self.critic_2(states, new_actions)
actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min(q1_value, q2_value))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
## 更新alpha值
alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp())
self.log_alpha_optimizer.zero_grad()
alpha_loss.backward()
self.log_alpha_optimizer.step()
self.soft_update(self.critic_1, self.target_critic_1)
self.soft_update(self.critic_2, self.target_critic_2)
接下来设置好超参数,就可以开始训练了。最后再绘图看一下算法的表现。因为不能通过与环境交互来获得新的数据,离线算法最终的效果和数据集有很大关系,并且波动会比较大。通常来说,调参后数据集中的样本质量越高,算法的表现就越好。感兴趣的读者可以使用其他方式生成数据集,并观察算法效果的变化。
random.seed(0)
np.random.seed(0)
env.seed(0)
torch.manual_seed(0)
beta = 5.0
num_random = 5
num_epochs = 100
num_trains_per_epoch = 500
agent = CQL(state_dim, hidden_dim, action_dim, action_bound, actor_lr,
critic_lr, alpha_lr, target_entropy, tau, gamma, device, beta,
num_random)
return_list = []
for i in range(10):
with tqdm(total=int(num_epochs / 10), desc='Iteration %d' % i) as pbar:
for i_epoch in range(int(num_epochs / 10)):
## 此处与环境交互只是为了评估策略,最后作图用,不会用于训练
epoch_return = 0
state = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
state = next_state
epoch_return += reward
return_list.append(epoch_return)
for _ in range(num_trains_per_epoch):
b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
transition_dict = {
'states': b_s,
'actions': b_a,
'next_states': b_ns,
'rewards': b_r,
'dones': b_d
}
agent.update(transition_dict)
if (i_epoch + 1) % 10 == 0:
pbar.set_postfix({
'epoch':
'%d' % (num_epochs / 10 * i + i_epoch + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)
Iteration 0: 100%|██████████| 10/10 [01:26<00:00, 8.62s/it, epoch=10, return=-941.721]
Iteration 1: 100%|██████████| 10/10 [01:27<00:00, 8.72s/it, epoch=20, return=-432.056]
Iteration 2: 100%|██████████| 10/10 [01:26<00:00, 8.68s/it, epoch=30, return=-810.899]
Iteration 3: 100%|██████████| 10/10 [01:27<00:00, 8.70s/it, epoch=40, return=-636.281]
Iteration 4: 100%|██████████| 10/10 [01:26<00:00, 8.64s/it, epoch=50, return=-224.978]
Iteration 5: 100%|██████████| 10/10 [01:27<00:00, 8.79s/it, epoch=60, return=-298.303]
Iteration 6: 100%|██████████| 10/10 [01:30<00:00, 9.05s/it, epoch=70, return=-210.535]
Iteration 7: 100%|██████████| 10/10 [01:29<00:00, 8.99s/it, epoch=80, return=-209.631]
Iteration 8: 100%|██████████| 10/10 [01:29<00:00, 8.98s/it, epoch=90, return=-213.836]
Iteration 9: 100%|██████████| 10/10 [01:33<00:00, 9.36s/it, epoch=100, return=-206.435]
epochs_list = list(range(len(return_list)))
plt.plot(epochs_list, return_list)
plt.xlabel('Epochs')
plt.ylabel('Returns')
plt.title('CQL on {}'.format(env_name))
plt.show()
mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('CQL on {}'.format(env_name))
plt.show()
本章介绍了离线强化学习的基本概念和两个与模型无关的离线强化学习算法——BCQ 和 CQL,并讲解了 CQL 的代码。事实上,离线强化学习还有一类基于模型的方法,如 model-based offline reinforcement learning (MOReL)和 model-based offline policy optimization(MOPO),本章由于篇幅原因不再介绍。这一类算法的思路基本是通过模型生成更多数据,同时通过衡量模型预测的不确定性来对生成的偏离数据集的数据进行惩罚,感兴趣的读者可以自行查阅相关资料。
离线强化学习的另一大难点是算法通常对超参数极为敏感,非常难调参。并且在实际复杂场景中通常不能像在模拟器中那样,每训练几轮就在环境中评估策略好坏,如何确定何时停止算法也是离线强化学习在实际应用中面临的一大挑战。此外,离线强化学习在现实场景中的落地还需要关注离散策略评估和选择、数据收集策略的保守性和数据缺失性等现实问题。不过无论如何,离线强化学习和模仿学习都是为了解决在现实中训练智能体的困难而提出的,也都是强化学习真正落地的重要途径。
这里对 CQL 算法中
的情况给出详细推导。对于一般的变量
及其概率密度函数
,首先有归一化条件:
把 KL 散度展开,由于
是常数,因此得到
回到 18.3 节,可以发现,在原迭代方程中,含有
的只有
和
两项。在分布
的条件下,状态
是给定的,因此第一项中
的采样和
无关,第一项可以抽象为
。此外,概率密度函数应恒大于等于零,即
。最后舍去同样和
无关的常数
。综合起来,我们要求解如下的优化问题:
这一问题的求解要用到变分法。对于等式约束和不等式约束,引入拉格朗日乘数
和松弛函数
,得到相应的无约束优化问题:
其中,
是
的简写。可以发现,
事实上与
无关,可以写为
。代入
,得到
写出欧拉-拉格朗日方程
,分别计算
由于第二项
和
无关,直接等于
。最终拉格朗日方程简化为:
两项的乘积等于零,因此在每一点上,要么
,即
,要么
满足
。先来解后面的方程,直接计算得到
最终的解
应是
和
两者的分段组合。由于指数函数必定大于零,为了使目标泛函取到最大值,应当全部取
的部分。代回归一化条件,就得到原问题的最优解:
其中,
是归一化系数。此时,优化问题取到最大值,为
对照原迭代方程,将
改为
,积分改为在动作空间上求和,上式就变为:
此处的期望是对
而言的,与
无关,因此可以把期望移到最前面,得到
至此,式中已经不含
,完成化简。
[1] LEVINE S, KUMAR A, TUCKER. G, et al. Offline reinforcement learning: tutorial, review, and perspectives on open problems [J]. 2020.
[2] FUJIMOTO S, MEGER D, PRECUP D. Off-policy deep reinforcement learning without exploration [C] // International conference on machine learning, PMLR, 2019.
[3] KINGMA D P, WELLING M. Auto-encoding variational bayes [C] // International conference on learning representations, ICLR, 2014.
[4] KUMAR A, ZHOU A, TUCKER G, et al. Conservative q-learning for offline reinforcement learning [J]. NeurIPS, 2020.
[5] KIDAMBL R, RAJESWARAN A, NETRAPALLI P, et al. MOReL: Model-based offline reinforcement learning [J]. Advances in neural information processing systems, 2020: 33.
[6] YU T, THOMAS G, YU L, et al. MOPO: Model-based offline policy optimization [J]. Advances in neural information processing systems, 2020: 33.
前文已经学习了 PPO、SAC 等经典的深度强化学习算法,大部分算法都能在各自的任务中取得比较好的效果,但是它们都局限在单个任务上,换句话说,对于训练完的算法,在使用时它们都只能完成一个特定的任务。如果面对较为复杂的复合任务,之前的强化学习算法往往不容易训练出有效的策略。本章将介绍目标导向的强化学习(goal-oriented reinforcement learning,GoRL)以及该类别下的一种经典算法 HER。GoRL 可以学习一个策略,使其可以在不同的目标(goal)作为条件下奏效,以此来解决较为复杂的决策任务。
在介绍概念之前,先介绍一个目标导向的强化学习的实际场景。例如,策略π\piπ要操控机械臂抓取桌子上的一个物体。值得注意的是,每一次任务开始,物体的位置可能是不同的,也就是说,智能体需要完成一系列相似并不同的任务。在使用传统的强化学习算法时,采用单一策略只能抓取同一个位置的物体。对于不同的目标位置,要训练多个策略。想象一下,在悬崖漫步环境中,若目标位置变成了右上角,便只能重新训练一个策略。同一个策略无法完成一系列不同的目标。
接下来讨论 GoRL 的数学形式。有别于一般的强化学习算法中定义的马尔可夫决策过程,在目标导向的强化学习中,使用一个扩充过的元组
来定义 MDP,其中,
是状态空间,
是动作空间,
是状态转移函数,
是目标空间,
是一个将状态
从状态空间映射为目标空间内的一个目标
的函数,
是奖励函数,与目标
有关。接下来详细介绍目标导向的强化学习中与一般强化学习不同的概念。
首先是补充的目标空间
和目标
。在目标导向的强化学习中,任务是由目标定义的,并且目标本身是和状态
相关的,可以将一个状态
使用映射函数
映射为目标
。继续使用之前机械臂抓取物体的任务作为例子:状态
中包含了机械臂的力矩、物体的位置等信息。因为任务是抓取物体,所以规定目标
是物体的位置,此时映射函数
相当于一个从状态
中提取物体位置的函数。
然后介绍奖励函数,奖励函数不仅与状态
和动作
相关,在目标导向强化学习中,还与设定的目标相关,以下是其中一种常见的形式:
其中,
是一个比较小的值,表示到达目标附近就不会受到
的惩罚。在目标导向强化学习中,由于对于不同的目标,奖励函数是不同的,因此状态价值函数
也是基于目标的,动作状态价值函数
同理。接下来介绍目标导向的强化学习的优化目标。定义
为环境中初始状态
与目标
的联合分布,那么 GoRL 的目标为优化策略
,使以下目标函数最大化:
根据 19.2 节的定义,可以发现目标导向的强化学习的奖励往往是非常稀疏的。由于智能体在训练初期难以完成目标而只能得到−1-1−1的奖励,从而使整个算法的训练速度较慢。那么,有没有一种方法能有效地利用这些“失败”的经验呢?从这个角度出发,事后经验回放(hindsight experience replay,HER)算法于 2017 年神经信息处理系统(Neural Information Processing Systems,NeurIPS)大会中被提出,成为 GoRL 的一大经典方法。
假设现在使用策略
在环境中以
为目标进行探索,得到了这么一条轨迹:
,并且
。这意味着这整一条轨迹上,我们得到的奖励值都是
,这对我们的训练起到的帮助很小。那么,如果我们换一个目标
来重新审视整条轨迹呢?换句话说,虽然并没有达到目标
,但是策略在探索的过程中,完成了
等对应的目标,即完成了
等目标。如果用这些目标来将原先的目标
替换成新的目标
,重新计算轨迹中的奖励值,就能使策略从失败的经验中得到对训练有用的信息。
下面来看看具体的算法流程。值得注意的是,这里的策略优化算法可以选择任意合适的算法,比如 DQN、DDPG 等。
的参数
,初始化经验回放池
do
和初始状态
,使用
在环境中采样得到轨迹
,将其以
的形式存入
中
中采样
个
元组
,将其映射为新的目标
并计算新的奖励值
,然后用新的数据
替换原先的元组
进行训练
对于算法中状态
的选择,HER 提出了 3 种不同的方案。
处于同一个轨迹并在时间上处于
之后的某个状态作为
。
处于同一个轨迹的某个状态作为
。
。
在 HER 的实验中,future 方案给出了最好的效果,该方案也最直观。因此在代码实现中用的是 future 方案。
接下来看看如何实现 HER 算法。首先定义一个简单二维平面上的环境。在一个二维网格世界上,每个维度的位置范围是
,在每一个序列的初始,智能体都处于
的位置,环境将自动从
的矩形区域内生成一个目标。每个时刻智能体可以选择纵向和横向分别移动
作为这一时刻的动作。当智能体距离目标足够近的时候,它将收到
的奖励并结束任务,否则奖励为
。每一条序列的最大长度为
。环境示意图如图 19-1 所示。
图19-1 环境示意图
使用 Python 实现这个环境。导入一些需要用到的包,并且用代码来定义该环境。
import torch
import torch.nn.functional as F
import numpy as np
import random
from tqdm import tqdm
import collections
import matplotlib.pyplot as plt
class WorldEnv:
def __init__(self):
self.distance_threshold = 0.15
self.action_bound = 1
def reset(self): ## 重置环境
## 生成一个目标状态, 坐标范围是[3.5~4.5, 3.5~4.5]
self.goal = np.array([4 + random.uniform(-0.5, 0.5), 4 + random.uniform(-0.5, 0.5)])
self.state = np.array([0, 0]) ## 初始状态
self.count = 0
return np.hstack((self.state, self.goal))
def step(self, action):
action = np.clip(action, -self.action_bound, self.action_bound)
x = max(0, min(5, self.state[0] + action[0]))
y = max(0, min(5, self.state[1] + action[1]))
self.state = np.array([x, y])
self.count += 1
dis = np.sqrt(np.sum(np.square(self.state - self.goal)))
reward = -1.0 if dis > self.distance_threshold else 0
if dis <= self.distance_threshold or self.count == 50:
done = True
else:
done = False
return np.hstack((self.state, self.goal)), reward, done
接下来实现 DDPG 算法中用到的与 Actor 网络和 Critic 网络的网络结构相关的代码。
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
self.action_bound = action_bound ## action_bound是环境可以接受的动作最大值
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return torch.tanh(self.fc3(x)) * self.action_bound
class QValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(QValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x, a):
cat = torch.cat([x, a], dim=1) ## 拼接状态和动作
x = F.relu(self.fc2(F.relu(self.fc1(cat))))
return self.fc3(x)
在定义好 Actor 和 Critic 的网络结构之后,来看一下 DDPG 算法的代码。这部分代码和 13.3 节中的代码基本一致,主要区别在于 13.3 节中的 DDPG 算法是在倒立摆环境中运行的,动作只有 1 维,而这里的环境中动作有 2 维,导致一小部分代码不同。读者可以先思考一下此时应该修改哪一部分代码,然后自行对比,就能找到不同之处。
class DDPG:
''' DDPG算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, sigma, tau, gamma, device):
self.action_dim = action_dim
self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
self.target_critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
## 初始化目标价值网络并使其参数和价值网络一样
self.target_critic.load_state_dict(self.critic.state_dict())
## 初始化目标策略网络并使其参数和策略网络一样
self.target_actor.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr = actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr = critic_lr)
self.gamma = gamma
self.sigma = sigma ## 高斯噪声的标准差,均值直接设为0
self.tau = tau ## 目标网络软更新参数
self.action_bound = action_bound
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.actor(state).detach().cpu().numpy()[0]
## 给动作添加噪声,增加探索
action = action + self.sigma * np.random.randn(self.action_dim)
return action
def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau)
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions'], dtype=torch.float).to(self.device)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
next_q_values = self.target_critic(next_states, self.target_actor(next_states))
q_targets = rewards + self.gamma * next_q_values * (1 - dones)
## MSE损失函数
critic_loss = torch.mean(F.mse_loss(self.critic(states, actions), q_targets))
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
## 策略网络就是为了使Q值最大化
actor_loss = -torch.mean(self.critic(states, self.actor(states)))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
self.soft_update(self.actor, self.target_actor) ## 软更新策略网络
self.soft_update(self.critic, self.target_critic) ## 软更新价值网络
接下来定义一个特殊的经验回放池,此时回放池内不再存储每一步的数据,而是存储一整条轨迹。这是 HER 算法中的核心部分,之后可以用 HER 算法从该经验回放池中构建新的数据来帮助策略训练。
class Trajectory:
''' 用来记录一条完整轨迹 '''
def __init__(self, init_state):
self.states = [init_state]
self.actions = []
self.rewards = []
self.dones = []
self.length = 0
def store_step(self, action, state, reward, done):
self.actions.append(action)
self.states.append(state)
self.rewards.append(reward)
self.dones.append(done)
self.length += 1
class ReplayBuffer_Trajectory:
''' 存储轨迹的经验回放池 '''
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
def add_trajectory(self, trajectory):
self.buffer.append(trajectory)
def size(self):
return len(self.buffer)
def sample(self, batch_size, use_her, dis_threshold=0.15, her_ratio=0.8):
batch = dict(
states=[],
actions=[],
next_states=[],
rewards=[],
dones=[]
)
for _ in range(batch_size):
traj = random.sample(self.buffer, 1)[0]
step_state = np.random.randint(traj.length)
state = traj.states[step_state]
next_state = traj.states[step_state + 1]
action = traj.actions[step_state]
reward = traj.rewards[step_state]
done = traj.dones[step_state]
if use_her and np.random.uniform() <= her_ratio:
step_goal = np.random.randint(step_state + 1, traj.length + 1)
goal = traj.states[step_goal][:2] ## 使用HER算法的future方案设置目标
dis = np.sqrt(np.sum(np.square(next_state[:2] - goal)))
reward = -1.0 if dis > dis_threshold else 0
done = False if dis > dis_threshold else True
state = np.hstack((state[:2], goal))
next_state = np.hstack((next_state[:2], goal))
batch['states'].append(state)
batch['next_states'].append(next_state)
batch['actions'].append(action)
batch['rewards'].append(reward)
batch['dones'].append(done)
batch['states'] = np.array(batch['states'])
batch['next_states'] = np.array(batch['next_states'])
batch['actions'] = np.array(batch['actions'])
return batch
最后,便可以开始在这个有目标的环境中运行采用了 HER 的 DDPG 算法,一起来看一下效果吧。
actor_lr = 1e-3
critic_lr = 1e-3
hidden_dim = 128
state_dim = 4
action_dim = 2
action_bound = 1
sigma = 0.1
tau = 0.005
gamma = 0.98
num_episodes = 2000
n_train = 20
batch_size = 256
minimal_episodes = 200
buffer_size = 10000
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
env = WorldEnv()
replay_buffer = ReplayBuffer_Trajectory(buffer_size)
agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr,
critic_lr, sigma, tau, gamma, device)
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
episode_return = 0
state = env.reset()
traj = Trajectory(state)
done = False
while not done:
action = agent.take_action(state)
state, reward, done = env.step(action)
episode_return += reward
traj.store_step(action, state, reward, done)
replay_buffer.add_trajectory(traj)
return_list.append(episode_return)
if replay_buffer.size() >= minimal_episodes:
for _ in range(n_train):
transition_dict = replay_buffer.sample(batch_size, True)
agent.update(transition_dict)
if (i_episode + 1) % 10 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG with HER on {}'.format('GridWorld'))
plt.show()
Iteration 0: 100%|██████████| 200/200 [00:08<00:00, 24.96it/s, episode=200, return=-50.000]
Iteration 1: 100%|██████████| 200/200 [01:41<00:00, 1.96it/s, episode=400, return=-4.400]
Iteration 2: 100%|██████████| 200/200 [01:37<00:00, 2.06it/s, episode=600, return=-4.000]
Iteration 3: 100%|██████████| 200/200 [01:36<00:00, 2.07it/s, episode=800, return=-4.100]
Iteration 4: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1000, return=-4.500]
Iteration 5: 100%|██████████| 200/200 [01:34<00:00, 2.11it/s, episode=1200, return=-4.500]
Iteration 6: 100%|██████████| 200/200 [01:36<00:00, 2.08it/s, episode=1400, return=-4.600]
Iteration 7: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1600, return=-4.100]
Iteration 8: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1800, return=-4.300]
Iteration 9: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=2000, return=-3.600]
接下来尝试不采用 HER 重新构造数据,而是直接使用收集的数据训练一个策略,看看是什么效果。
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
env = WorldEnv()
replay_buffer = ReplayBuffer_Trajectory(buffer_size)
agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr,
critic_lr, sigma, tau, gamma, device)
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
episode_return = 0
state = env.reset()
traj = Trajectory(state)
done = False
while not done:
action = agent.take_action(state)
state, reward, done = env.step(action)
episode_return += reward
traj.store_step(action, state, reward, done)
replay_buffer.add_trajectory(traj)
return_list.append(episode_return)
if replay_buffer.size() >= minimal_episodes:
for _ in range(n_train):
## 和使用HER训练的唯一区别
transition_dict = replay_buffer.sample(batch_size, False)
agent.update(transition_dict)
if (i_episode + 1) % 10 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG without HER on {}'.format('GridWorld'))
plt.show()
Iteration 0: 100%|██████████| 200/200 [00:03<00:00, 65.09it/s, episode=200, return=-50.000]
Iteration 1: 100%|██████████| 200/200 [00:45<00:00, 4.42it/s, episode=400, return=-50.000]
Iteration 2: 100%|██████████| 200/200 [00:46<00:00, 4.28it/s, episode=600, return=-50.000]
Iteration 3: 100%|██████████| 200/200 [00:48<00:00, 4.14it/s, episode=800, return=-50.000]
Iteration 4: 100%|██████████| 200/200 [00:47<00:00, 4.24it/s, episode=1000, return=-50.000]
Iteration 5: 100%|██████████| 200/200 [00:46<00:00, 4.28it/s, episode=1200, return=-50.000]
Iteration 6: 100%|██████████| 200/200 [00:46<00:00, 4.27it/s, episode=1400, return=-50.000]
Iteration 7: 100%|██████████| 200/200 [00:48<00:00, 4.14it/s, episode=1600, return=-40.600]
Iteration 8: 100%|██████████| 200/200 [00:47<00:00, 4.18it/s, episode=1800, return=-50.000]
Iteration 9: 100%|██████████| 200/200 [00:50<00:00, 3.99it/s, episode=2000, return=-31.500]
通过实验对比,可以观察到使用 HER 算法后,效果有显著提升。这里 HER 算法的主要好处是通过重新对历史轨迹设置其目标(使用 future 方案)而使得奖励信号更加稠密,进而从原本失败的数据中学习到使“新任务”成功的经验,提升训练的稳定性和样本效率。
本章介绍了目标导向的强化学习(GoRL)的基本定义,以及一个解决 GoRL 的有效的经典算法 HER。通过代码实践,HER 算法的效果得到了很好的呈现。我们从 HER 的代码实践中还可以领会一种思维方式,即可以通过整条轨迹的信息来改善每个转移片段带给智能体策略的学习价值。例如,在 HER 算法的 future 方案中,采样当前轨迹后续的状态作为目标,然后根据下一步状态是否离目标足够近来修改当前步的奖励信号。此外,HER 算法只是一个经验回放的修改方式,并没有对策略网络和价值网络的架构做出任何修改。而在后续的部分 GoRL 研究中,策略函数和动作价值函数会被显式建模成
和
,即构建较为复杂的策略架构,使其直接知晓当前状态和目标,并使用更大的网络容量去完成目标。有兴趣的读者可以自行查阅相关的文献。
[1] ANDRYCHOWICZ M, WOLSKI F, RAY A, et al. Hindsight Experience Replay [J]. Advances in neural information processing systems, 2017: 5055-5065.
[2] FLORENSA C, HELD D, GENG X Y, et al. Automatic goal generation for reinforcement learning agents [C]// International conference on machine learning, PMLR, 2018: 1515-1528.
[3] REN Z Z, DONG K, ZHOU Y, et al. Exploration via Hindsight Goal Generation [J]. Advances in neural information processing systems 2019, 32: 13485-13496.
[4] PITIS S, CHAN H, ZHAO S, et al. Maximum entropy gain exploration for long horizon multi-goal reinforcement learning [C]// International conference on machine learning, PMLR, 2020: 7750-7761.
本书之前介绍的算法都是单智能体强化学习算法,其基本假设是动态环境是稳态的(stationary),即状态转移概率和奖励函数不变,并依此来设计相应的算法。而如果环境中还有其他智能体做交互和学习,那么任务则上升为多智能体强化学习(multi-agent reinforcement learning,MARL),如图 20-1 所示。
图20-1 多智能体强化学习环境概览
多智能体的情形相比于单智能体更加复杂,因为每个智能体在和环境交互的同时也在和其他智能体进行直接或者间接的交互。因此,多智能体强化学习要比单智能体更困难,其难点主要体现在以下几点:
将一个多智能体环境用一个元组
表示,其中
是智能体的数目,
是所有智能体的状态集合,
是所有智能体的动作集合,
是所有智能体奖励函数的集合,
是环境的状态转移概率。一般多智能体强化学习的目标是为每个智能体学习一个策略来最大化其自身的累积奖励。
面对上述问题形式,最直接的想法是基于已经熟悉的单智能体算法来进行学习,这主要分为两种思路。
本章介绍完全去中心化方法,在原理解读和代码实践之后,进一步通过实验结果图看看这种方法的效果。第 21 章会进一步介绍进阶的多智能体强化学习的求解范式。
接下来将介绍一个完全去中心化的算法,此类算法被称为独立学习(independent learning)。由于对于每个智能体使用单智能体算法 PPO 进行训练,所因此这个算法叫作独立PPO(Independent PPO,IPPO)算法。具体而言,这里使用的 PPO 算法版本为 PPO-截断,其算法流程如下:
个智能体,为每个智能体初始化各自的策略以及价值函数
do
下面介绍一下要使用的多智能体环境:ma_gym
库中的 Combat 环境。Combat 是一个在二维的格子世界上进行的两个队伍的对战模拟游戏,每个智能体的动作集合为:向四周移动
格,攻击周围
格范围内其他敌对智能体,或者不采取任何行动。起初每个智能体有
点生命值,如果智能体在敌人的攻击范围内被攻击到了,则会扣
生命值,生命值掉为
则死亡,最后存活的队伍获胜。每个智能体的攻击有一轮的冷却时间。
图20-2 Combat 环境示例
首先仍然导入一些需要用到的包,然后从 GitHub 中克隆ma-gym
仓库到本地,并且导入其中的 Combat 环境。
import torch
import torch.nn.functional as F
import numpy as np
import rl_utils
from tqdm import tqdm
import matplotlib.pyplot as plt
! git clone https://github.com/boyu-ai/ma-gym.git
import sys
sys.path.append("./ma-gym")
from ma_gym.envs.combat.combat import Combat
接下来的代码块与 12.4 节介绍过的 PPO 代码实践基本一致,不再赘述。
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return F.softmax(self.fc3(x), dim=1)
class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return self.fc3(x)
class PPO:
''' PPO算法,采用截断方式 '''
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
lmbda, eps, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
self.gamma = gamma
self.lmbda = lmbda
self.eps = eps ## PPO中截断范围的参数
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'],
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * self.critic(next_states) * (1 -
dones)
td_delta = td_target - self.critic(states)
advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,
td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(states).gather(1,
actions)).detach()
log_probs = torch.log(self.actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps,
1 + self.eps) * advantage ## 截断
actor_loss = torch.mean(-torch.min(surr1, surr2)) ## PPO损失函数
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach()))
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
现在进入 IPPO 代码实践的最主要部分。值得注意的是,在训练时使用了参数共享(parameter sharing)的技巧,即对于所有智能体使用同一套策略参数,这样做的好处是能够使得模型训练数据更多,同时训练更稳定。能够这样做的前提是,两个智能体是同质的(homogeneous),即它们的状态空间和动作空间是完全一致的,并且它们的优化目标也完全一致。感兴趣的读者也可以自行实现非参数共享版本的 IPPO,此时每个智能体就是一个独立的 PPO 的实例。
和之前的一些实验不同,这里不再展示智能体获得的回报,而是将 IPPO 训练的两个智能体团队的胜率作为主要的实验结果。接下来就可以开始训练 IPPO 了!
actor_lr = 3e-4
critic_lr = 1e-3
num_episodes = 100000
hidden_dim = 64
gamma = 0.99
lmbda = 0.97
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
"cpu")
team_size = 2
grid_size = (15, 15)
#创建Combat环境,格子世界的大小为15x15,己方智能体和敌方智能体数量都为2
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)
state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n
#两个智能体共享同一个策略
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps,
gamma, device)
win_list = []
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
transition_dict_1 = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
transition_dict_2 = {
'states': [],
'actions': [],
'next_states': [],
'rewards': [],
'dones': []
}
s = env.reset()
terminal = False
while not terminal:
a_1 = agent.take_action(s[0])
a_2 = agent.take_action(s[1])
next_s, r, done, info = env.step([a_1, a_2])
transition_dict_1['states'].append(s[0])
transition_dict_1['actions'].append(a_1)
transition_dict_1['next_states'].append(next_s[0])
transition_dict_1['rewards'].append(
r[0] + 100 if info['win'] else r[0] - 0.1)
transition_dict_1['dones'].append(False)
transition_dict_2['states'].append(s[1])
transition_dict_2['actions'].append(a_2)
transition_dict_2['next_states'].append(next_s[1])
transition_dict_2['rewards'].append(
r[1] + 100 if info['win'] else r[1] - 0.1)
transition_dict_2['dones'].append(False)
s = next_s
terminal = all(done)
win_list.append(1 if info["win"] else 0)
agent.update(transition_dict_1)
agent.update(transition_dict_2)
if (i_episode + 1) % 100 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(win_list[-100:])
})
pbar.update(1)
Iteration 0: 100%|██████████| 10000/10000 [07:17<00:00, 22.85it/s, episode=10000, return=0.310]
Iteration 1: 100%|██████████| 10000/10000 [05:43<00:00, 29.08it/s, episode=20000, return=0.370]
Iteration 2: 100%|██████████| 10000/10000 [05:30<00:00, 30.26it/s, episode=30000, return=0.560]
Iteration 3: 100%|██████████| 10000/10000 [04:54<00:00, 33.96it/s, episode=40000, return=0.670]
Iteration 4: 100%|██████████| 10000/10000 [04:20<00:00, 38.46it/s, episode=50000, return=0.670]
Iteration 5: 100%|██████████| 10000/10000 [03:52<00:00, 43.09it/s, episode=60000, return=0.620]
Iteration 6: 100%|██████████| 10000/10000 [03:55<00:00, 42.53it/s, episode=70000, return=0.610]
Iteration 7: 100%|██████████| 10000/10000 [03:40<00:00, 45.26it/s, episode=80000, return=0.640]
Iteration 8: 100%|██████████| 10000/10000 [03:48<00:00, 43.81it/s, episode=90000, return=0.650]
Iteration 9: 100%|██████████| 10000/10000 [03:42<00:00, 44.91it/s, episode=100000, return=0.770]
win_array = np.array(win_list)
#每100条轨迹取一次平均
win_array = np.mean(win_array.reshape(-1, 100), axis=1)
episodes_list = np.arange(win_array.shape[0]) * 100
plt.plot(episodes_list, win_array)
plt.xlabel('Episodes')
plt.ylabel('Win rate')
plt.title('IPPO on Combat')
plt.show()
可以看出,当智能体数量较少的时候,IPPO 这种完全去中心化学习在一定程度上能够取得好的效果,但是最终达到的胜率也比较有限。这可能是因为多个智能体之间无法有效地通过合作来共同完成目标。同时,好奇的读者也可以尝试增加智能体的数量,比较一下训练结果。当数量增加到 5 时,这种完全去中心化学习的训练效果就不是很好了。这时候可能就需要引入更多的算法来考虑多个智能体之间的交互行为,或者使用中心化训练去中心化执行(centralized training with decentralized execution,CTDE)的范式来进行多智能体训练,该方法将在第 21 章中详细介绍。
本章介绍了多智能体强化学习的概念和两类基本的解决范式,并针对其中的完全去中心化方法进行了详细的介绍,讲解了一个具体的算法 IPPO,即用 PPO 算法为各个智能体训练各自的策略。在 Combat 环境中,我们共享了两个智能体之间的策略,以达到更好的效果。但这仅限于多个智能体同质的情况,若它们的状态空间或动作空间不一致,那便无法进行策略共享。
[1] HERNANDEZ L P, BILAL K, TAYLOR M E. A survey and critique of multiagent deep reinforcement learning[J]. Autonomous Agents and Multi-Agent Systems, 2019, 33(6): 750-797.
[2] TAMPUU A, MATIISEN T, KODELJA D, et al. Multiagent cooperation and competition with deep reinforcement learning [J]. PloS One, 2017; 12(4): e0172395.
[3] TAN M. Multi-agent reinforcement learning: independent vs. cooperative agents [C]// International conference on machine learning, 1993: 330-337.
[4] Combat 环境(参见 GitHub 网站中的 koulanurag/ma-gym 项目).
第 20 章中已经初步介绍了多智能体强化学习研究的问题和最基本的求解范式。本章来介绍一种比较经典且效果不错的进阶范式:中心化训练去中心化执行(centralized training with decentralized execution,CTDE)。所谓中心化训练去中心化执行是指在训练的时候使用一些单个智能体看不到的全局信息而以达到更好的训练效果,而在执行时不使用这些信息,每个智能体完全根据自己的策略直接动作以达到去中心化执行的效果。中心化训练去中心化执行的算法能够在训练时有效地利用全局信息以达到更好且更稳定的训练效果,同时在进行策略模型推断时可以仅利用局部信息,使得算法具有一定的扩展性。CTDE 可以类比成一个足球队的训练和比赛过程:在训练时,11 个球员可以直接获得教练的指导从而完成球队的整体配合,而教练本身掌握着比赛全局信息,教练的指导也是从整支队、整场比赛的角度进行的;而训练好的 11 个球员在上场比赛时,则根据场上的实时情况直接做出决策,不再有教练的指导。
CTDE 算法主要分为两种:一种是基于值函数的方法,例如 VDN,QMIX 算法等;另一种是基于 Actor-Critic 的方法,例如 MADDPG 和 COMA 等。本章将重点介绍 MADDPG 算法。
多智能体 DDPG(muli-agent DDPG,MADDPG)算法从字面意思上来看就是对于每个智能体实现一个 DDPG 的算法。所有智能体共享一个中心化的 Critic 网络,该 Critic 网络在训练的过程中同时对每个智能体的 Actor 网络给出指导,而执行时每个智能体的 Actor 网络则是完全独立做出行动,即去中心化地执行。
CTDE 算法的应用场景通常可以被建模为一个部分可观测马尔可夫博弈(partially observable Markov games):用
代表
个智能体所有可能的状态空间,这是全局的信息。对于每个智能体
,其动作空间为
,观测空间为
,每个智能体的策略
是一个概率分布,用来表示智能体在每个观测下采取各个动作的概率。环境的状态转移函数为
。每个智能体的奖励函数为
,每个智能体从全局状态得到的部分观测信息为
,初始状态分布为
。每个智能体的目标是最大化其期望累积奖励
。
接下来我们看一下 MADDPG 算法的主要细节吧!如图 21-1 所示,每个智能体用 Actor-Critic 的方法训练,但不同于传统单智能体的情况,在 MADDPG 中每个智能体的 Critic 部分都能够获得其他智能体的策略信息。具体来说,考虑一个有
个智能体的博弈,每个智能体的策略参数为
,记
为所有智能体的策略集合,那么我们可以写出在随机性策略情况下每个智能体的期望收益的策略梯度:
其中,
就是一个中心化的动作价值函数。为什么说
是一个中心化的动作价值函数呢?一般来说
包含了所有智能体的观测,另外
也需要输入所有智能体在此刻的动作,因此
工作的前提就是所有智能体要同时给出自己的观测和相应的动作。
图21-1 MADDPG 算法总览图
对于确定性策略来说,考虑现在有
个连续的策略
,可以得到 DDPG 的梯度公式:
其中,
是我们用来存储数据的经验回放池,它存储的每一个数据为
。而在 MADDPG 中,中心化动作价值函数可以按照下面的损失函数来更新:
其中,
是更新价值函数中使用的目标策略的集合,它们有着延迟更新的参数。
MADDPG 的具体算法流程如下:
do
,用于动作探索;
;
do:
,用当前的策略选择一个动作
;
并且获得奖励
和新的观测
;
存储到经验回放池
中;
中随机采样一些数据;
,中心化训练 Critic 网络
,训练自身的 Actor 网络
,更新目标 Actor 网络和目标 Critic 网络
下面我们来看看如何实现 MADDPG 算法,首先是导入一些需要用到的包。
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import random
import rl_utils
我们要使用的环境为多智能体粒子环境(multiagent particles environment,MPE),它是一些面向多智能体交互的环境的集合,在这个环境中,粒子智能体可以移动、通信、“看”到其他智能体,也可以和固定位置的地标交互。
接下来安装环境,由于 MPE 的官方仓库的代码已经不再维护了,而其依赖于 gym 的旧版本,因此我们需要重新安装 gym 库。
!git clone https://github.com/boyu-ai/multiagent-particle-envs.git --quiet
!pip install -e multiagent-particle-envs
import sys
sys.path.append("multiagent-particle-envs")
## 由于multiagent-pariticle-env底层的实现有一些版本问题,因此gym需要改为可用的版本
!pip install --upgrade gym==0.10.5 -q
import gym
from multiagent.environment import MultiAgentEnv
import multiagent.scenarios as scenarios
def make_env(scenario_name):
## 从环境文件脚本中创建环境
scenario = scenarios.load(scenario_name + ".py").Scenario()
world = scenario.make_world()
env = MultiAgentEnv(world, scenario.reset_world, scenario.reward,
scenario.observation)
return env
本章选择 MPE 中的simple_adversary环境作为代码实践的示例,如图 21-2 所示。该环境中有
个红色的对抗智能体(adversary)、
个蓝色的正常智能体,以及
个地点(一般
),这
个地点中有一个是目标地点(绿色)。这
个正常智能体知道哪一个是目标地点,但对抗智能体不知道。正常智能体是合作关系:它们其中任意一个距离目标地点足够近,则每个正常智能体都能获得相同的奖励。对抗智能体如果距离目标地点足够近,也能获得奖励,但它需要猜哪一个才是目标地点。因此,正常智能体需要进行合作,分散到不同的坐标点,以此欺骗对抗智能体。
图21-2 MPE 中的`simple_adversary`环境
需要说明的是,MPE 环境中的每个智能体的动作空间是离散的。第 13 章介绍过 DDPG 算法本身需要使智能体的动作对于其策略参数可导,这对连续的动作空间来说是成立的,但是对于离散的动作空间并不成立。但这并不意味着当前的任务不能使用 MADDPG 算法求解,因为我们可以使用一个叫作 Gumbel-Softmax 的方法来得到离散分布的近似采样。下面我们对其原理进行简要的介绍并给出实现代码。
假设有一个随机变量
服从某个离散分布
。其中,
表示
并且满足
。当我们希望按照这个分布即
进行采样时,可以发现这种离散分布的采样是不可导的。
那有没有什么办法可以让离散分布的采样可导呢?答案是肯定的!那就是重参数化方法,这一方法在第 14 章的 SAC 算法中已经介绍过,而这里要用的是 Gumbel-Softmax 技巧。具体来说,我们引入一个重参数因子
,它是一个采样自
的噪声:
Gumbel-Softmax 采样可以写成
此时,如果通过
计算离散值,该离散值就近似等价于离散采样
的值。更进一步,采样的结果
中自然地引入了对于
的梯度。
被称作分布的温度参数,通过调整它可以控制生成的 G umbel-Softmax 分布与离散分布的近似程度:
越小,生成的分布越趋向于
的结果;
越大,生成的分布越趋向于均匀分布。
接着再定义一些需要用到的工具函数,其中包括让 DDPG 可以适用于离散动作空间的 Gumbel Softmax 采样的相关函数。
def onehot_from_logits(logits, eps=0.01):
''' 生成最优动作的独热(one-hot)形式 '''
argmax_acs = (logits == logits.max(1, keepdim=True)[0]).float()
## 生成随机动作,转换成独热形式
rand_acs = torch.autograd.Variable(torch.eye(logits.shape[1])[[
np.random.choice(range(logits.shape[1]), size=logits.shape[0])
]],
requires_grad=False).to(logits.device)
## 通过epsilon-贪婪算法来选择用哪个动作
return torch.stack([
argmax_acs[i] if r > eps else rand_acs[i]
for i, r in enumerate(torch.rand(logits.shape[0]))
])
def sample_gumbel(shape, eps=1e-20, tens_type=torch.FloatTensor):
"""从Gumbel(0,1)分布中采样"""
U = torch.autograd.Variable(tens_type(*shape).uniform_(),
requires_grad=False)
return -torch.log(-torch.log(U + eps) + eps)
def gumbel_softmax_sample(logits, temperature):
""" 从Gumbel-Softmax分布中采样"""
y = logits + sample_gumbel(logits.shape, tens_type=type(logits.data)).to(
logits.device)
return F.softmax(y / temperature, dim=1)
def gumbel_softmax(logits, temperature=1.0):
"""从Gumbel-Softmax分布中采样,并进行离散化"""
y = gumbel_softmax_sample(logits, temperature)
y_hard = onehot_from_logits(y)
y = (y_hard.to(logits.device) - y).detach() + y
## 返回一个y_hard的独热量,但是它的梯度是y,我们既能够得到一个与环境交互的离散动作,又可以
## 正确地反传梯度
return y
接着实现我们的单智能体 DDPG。其中包含 Actor 网络与 Critic 网络,以及计算动作的函数,这在第 13 章中的已经介绍过,此处不再赘述。但这里没有更新网络参数的函数,其将会在 MADDPG 类中被实现。
class TwoLayerFC(torch.nn.Module):
def __init__(self, num_in, num_out, hidden_dim):
super().__init__()
self.fc1 = torch.nn.Linear(num_in, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, num_out)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
class DDPG:
''' DDPG算法 '''
def __init__(self, state_dim, action_dim, critic_input_dim, hidden_dim,
actor_lr, critic_lr, device):
self.actor = TwoLayerFC(state_dim, action_dim, hidden_dim).to(device)
self.target_actor = TwoLayerFC(state_dim, action_dim,
hidden_dim).to(device)
self.critic = TwoLayerFC(critic_input_dim, 1, hidden_dim).to(device)
self.target_critic = TwoLayerFC(critic_input_dim, 1,
hidden_dim).to(device)
self.target_critic.load_state_dict(self.critic.state_dict())
self.target_actor.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
def take_action(self, state, explore=False):
action = self.actor(state)
if explore:
action = gumbel_softmax(action)
else:
action = onehot_from_logits(action)
return action.detach().cpu().numpy()[0]
def soft_update(self, net, target_net, tau):
for param_target, param in zip(target_net.parameters(),
net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - tau) +
param.data * tau)
接下来正式实现一个 MADDPG 类,该类对于每个智能体都会维护一个 DDPG 算法。它们的策略更新和价值函数更新使用的是 21.2 节中关于
和
的公式给出的形式。
class MADDPG:
def __init__(self, env, device, actor_lr, critic_lr, hidden_dim,
state_dims, action_dims, critic_input_dim, gamma, tau):
self.agents = []
for i in range(len(env.agents)):
self.agents.append(
DDPG(state_dims[i], action_dims[i], critic_input_dim,
hidden_dim, actor_lr, critic_lr, device))
self.gamma = gamma
self.tau = tau
self.critic_criterion = torch.nn.MSELoss()
self.device = device
@property
def policies(self):
return [agt.actor for agt in self.agents]
@property
def target_policies(self):
return [agt.target_actor for agt in self.agents]
def take_action(self, states, explore):
states = [
torch.tensor([states[i]], dtype=torch.float, device=self.device)
for i in range(len(env.agents))
]
return [
agent.take_action(state, explore)
for agent, state in zip(self.agents, states)
]
def update(self, sample, i_agent):
obs, act, rew, next_obs, done = sample
cur_agent = self.agents[i_agent]
cur_agent.critic_optimizer.zero_grad()
all_target_act = [
onehot_from_logits(pi(_next_obs))
for pi, _next_obs in zip(self.target_policies, next_obs)
]
target_critic_input = torch.cat((*next_obs, *all_target_act), dim=1)
target_critic_value = rew[i_agent].view(
-1, 1) + self.gamma * cur_agent.target_critic(
target_critic_input) * (1 - done[i_agent].view(-1, 1))
critic_input = torch.cat((*obs, *act), dim=1)
critic_value = cur_agent.critic(critic_input)
critic_loss = self.critic_criterion(critic_value,
target_critic_value.detach())
critic_loss.backward()
cur_agent.critic_optimizer.step()
cur_agent.actor_optimizer.zero_grad()
cur_actor_out = cur_agent.actor(obs[i_agent])
cur_act_vf_in = gumbel_softmax(cur_actor_out)
all_actor_acs = []
for i, (pi, _obs) in enumerate(zip(self.policies, obs)):
if i == i_agent:
all_actor_acs.append(cur_act_vf_in)
else:
all_actor_acs.append(onehot_from_logits(pi(_obs)))
vf_in = torch.cat((*obs, *all_actor_acs), dim=1)
actor_loss = -cur_agent.critic(vf_in).mean()
actor_loss += (cur_actor_out**2).mean() * 1e-3
actor_loss.backward()
cur_agent.actor_optimizer.step()
def update_all_targets(self):
for agt in self.agents:
agt.soft_update(agt.actor, agt.target_actor, self.tau)
agt.soft_update(agt.critic, agt.target_critic, self.tau)
现在我们来定义一些超参数,创建环境、智能体以及经验回放池并准备训练。
num_episodes = 5000
episode_length = 25 ## 每条序列的最大长度
buffer_size = 100000
hidden_dim = 64
actor_lr = 1e-2
critic_lr = 1e-2
gamma = 0.95
tau = 1e-2
batch_size = 1024
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
update_interval = 100
minimal_size = 4000
env_id = "simple_adversary"
env = make_env(env_id)
replay_buffer = rl_utils.ReplayBuffer(buffer_size)
state_dims = []
action_dims = []
for action_space in env.action_space:
action_dims.append(action_space.n)
for state_space in env.observation_space:
state_dims.append(state_space.shape[0])
critic_input_dim = sum(state_dims) + sum(action_dims)
maddpg = MADDPG(env, device, actor_lr, critic_lr, hidden_dim, state_dims,
action_dims, critic_input_dim, gamma, tau)
接下来实现以下评估策略的方法,之后就可以开始训练了!
def evaluate(env_id, maddpg, n_episode=10, episode_length=25):
## 对学习的策略进行评估,此时不会进行探索
env = make_env(env_id)
returns = np.zeros(len(env.agents))
for _ in range(n_episode):
obs = env.reset()
for t_i in range(episode_length):
actions = maddpg.take_action(obs, explore=False)
obs, rew, done, info = env.step(actions)
rew = np.array(rew)
returns += rew / n_episode
return returns.tolist()
return_list = [] ## 记录每一轮的回报(return)
total_step = 0
for i_episode in range(num_episodes):
state = env.reset()
## ep_returns = np.zeros(len(env.agents))
for e_i in range(episode_length):
actions = maddpg.take_action(state, explore=True)
next_state, reward, done, _ = env.step(actions)
replay_buffer.add(state, actions, reward, next_state, done)
state = next_state
total_step += 1
if replay_buffer.size(
) >= minimal_size and total_step % update_interval == 0:
sample = replay_buffer.sample(batch_size)
def stack_array(x):
rearranged = [[sub_x[i] for sub_x in x]
for i in range(len(x[0]))]
return [
torch.FloatTensor(np.vstack(aa)).to(device)
for aa in rearranged
]
sample = [stack_array(x) for x in sample]
for a_i in range(len(env.agents)):
maddpg.update(sample, a_i)
maddpg.update_all_targets()
if (i_episode + 1) % 100 == 0:
ep_returns = evaluate(env_id, maddpg, n_episode=100)
return_list.append(ep_returns)
print(f"Episode: {i_episode+1}, {ep_returns}")
Episode: 100, [-162.09349111961225, 9.000666921056728, 9.000666921056728]
Episode: 200, [-121.85087049356082, 20.082544683591127, 20.082544683591127]
Episode: 300, [-28.086124816732802, -23.51493605339695, -23.51493605339695]
Episode: 400, [-35.91437846570877, -6.574264880829929, -6.574264880829929]
Episode: 500, [-12.83238365700212, -5.402338391212475, -5.402338391212475]
Episode: 600, [-11.692053500921567, 2.904343355450921, 2.904343355450921]
Episode: 700, [-11.21261001095729, 6.13003213658482, 6.13003213658482]
Episode: 800, [-12.581086056359824, 7.13450533137511, 7.13450533137511]
Episode: 900, [-10.932824468382302, 7.534917449533213, 7.534917449533213]
Episode: 1000, [-10.454432036663551, 7.467940904661571, 7.467940904661571]
Episode: 1100, [-10.099017183836345, 6.764091427064233, 6.764091427064233]
Episode: 1200, [-9.970202627245511, 6.839233648010857, 6.839233648010857]
Episode: 1300, [-8.23988889957424, 5.928539785965939, 5.928539785965939]
Episode: 1400, [-7.618319791914515, 5.4721657785273665, 5.4721657785273665]
Episode: 1500, [-9.528028248906292, 6.716548343395567, 6.716548343395567]
Episode: 1600, [-9.27198788506915, 6.25794360791615, 6.25794360791615]
Episode: 1700, [-9.439913314907297, 6.552076175517556, 6.552076175517556]
Episode: 1800, [-9.41018120255451, 6.170898260988019, 6.170898260988019]
Episode: 1900, [-8.293080671760299, 5.710058304479939, 5.710058304479939]
Episode: 2000, [-8.876670052284371, 5.804116304916539, 5.804116304916539]
Episode: 2100, [-8.20415531215746, 5.170909738207094, 5.170909738207094]
Episode: 2200, [-8.773275999321958, 4.961748911238369, 4.961748911238369]
Episode: 2300, [-8.06474017837516, 5.223795184183733, 5.223795184183733]
Episode: 2400, [-6.587706872401325, 4.366625235204875, 4.366625235204875]
Episode: 2500, [-7.691312056289927, 4.856855290592445, 4.856855290592445]
Episode: 2600, [-8.813560406139358, 5.508815842509804, 5.508815842509804]
Episode: 2700, [-7.056761924960759, 4.758538712873507, 4.758538712873507]
Episode: 2800, [-8.68842389422384, 5.661161581099521, 5.661161581099521]
Episode: 2900, [-7.930406418494052, 4.366106102743839, 4.366106102743839]
Episode: 3000, [-8.114850902595816, 5.1274853968197265, 5.1274853968197265]
Episode: 3100, [-8.381402942461598, 5.093518450135181, 5.093518450135181]
Episode: 3200, [-9.493930234055618, 5.472500034114433, 5.472500034114433]
Episode: 3300, [-8.53312311113189, 4.963767973071618, 4.963767973071618]
Episode: 3400, [-9.229941671093316, 5.555036222150763, 5.555036222150763]
Episode: 3500, [-10.67973248813069, 6.0258368192309115, 6.0258368192309115]
Episode: 3600, [-8.785648619797922, 5.360050159370962, 5.360050159370962]
Episode: 3700, [-10.050750001897885, 5.962048108721202, 5.962048108721202]
Episode: 3800, [-6.673053043055956, 3.732181204778823, 3.732181204778823]
Episode: 3900, [-10.567190838130202, 5.705831860427992, 5.705831860427992]
Episode: 4000, [-9.288291495674969, 5.298166543261745, 5.298166543261745]
Episode: 4100, [-9.433352212890984, 6.016868802323455, 6.016868802323455]
Episode: 4200, [-8.573388252905312, 4.673785791835532, 4.673785791835532]
Episode: 4300, [-8.466209564326363, 5.482892841309288, 5.482892841309288]
Episode: 4400, [-9.988322102926736, 5.5203824927807155, 5.5203824927807155]
Episode: 4500, [-7.4937676078180155, 4.730897948468445, 4.730897948468445]
Episode: 4600, [-8.755589567322176, 5.494709505886223, 5.494709505886223]
Episode: 4700, [-9.16743075823155, 5.234841527940852, 5.234841527940852]
Episode: 4800, [-8.597439825247829, 4.615078133167369, 4.615078133167369]
Episode: 4900, [-9.918505853931377, 5.08561749388552, 5.08561749388552]
Episode: 5000, [-10.16405662517592, 5.43335871613719, 5.43335871613719]
训练结束,我们来看看训练效果如何。
return_array = np.array(return_list)
for i, agent_name in enumerate(["adversary_0", "agent_0", "agent_1"]):
plt.figure()
plt.plot(
np.arange(return_array.shape[0]) * 100,
rl_utils.moving_average(return_array[:, i], 9))
plt.xlabel("Episodes")
plt.ylabel("Returns")
plt.title(f"{agent_name} by MADDPG")
可以看到,正常智能体agent_0
和agent_1
的回报结果完全一致,这是因为它们的奖励函数完全一样。正常智能体最终保持了正向的回报,说明它们通过合作成功地占领了两个不同的地点,进而让对抗智能体无法知道哪个地点是目标地点。另外,我们也可以发现 MADDPG 的收敛速度和稳定性都比较不错。
本章讲解了多智能体强化学习 CTDE 范式下的经典算法 MADDPG,MADDPG 后续也衍生了不少多智能体强化学习算法。因此,理解 MADDPG 对深入探究多智能体算法非常关键,有兴趣的读者可阅读 MADDPG 原论文加深理解。
[1] LOWE R, WU Y, TAMAR A, et al. Multi-agent actor-critic for mixed cooperative-competitive environments [J]. Advances in neural information processing systems 2017, 30: 6379-6390.
[2] MPE benchmarks(参见 GitHub 网站中 google/maddpg-replication 项目的 maddpg_replication.ipynb 文件).
亲爱的读者,你已经完成了对本书内容的学习,包括:
至此,你已经掌握了强化学习的基本知识,更拥有了第一手的强化学习代码实践经验。 但我们要知道,对于强化学习的学习是无止境的,本书只是探索强化学习浩瀚世界的开始。近年来强化学习的科研进展极快,主流的机器学习和人工智能顶级学术会议超过五分之一的论文都是关于强化学习的,计算机视觉、智能语音、自然语言处理、数据挖掘、信息检索、计算机图形学、计算机网络等方向越来越多的学术会议和期刊的研究工作在使用强化学习来解决其领域中的关键决策优化问题。越来越多的企业开始在实际业务中使用强化学习技术,让它们的决策系统变得越来越智能,而一些以强化学习为核心技术的国内外初创公司则开始在业界崭露头角。几乎每天,我们都可以从各种渠道了解到强化学习技术最新的科研进展和产业落地情况,其中的很多成果都会让人眼前一亮。
再厉害的技术都需要通过落地服务人民来创造真正的价值。强化学习技术发展的总目标就是有效落地,从而服务于广泛的决策任务。本书作者以浅薄的学识,对强化学习的技术发展做出一些展望,希望能为读者在未来对于强化学习的学习、科研和落地应用提供一些帮助。 首先我们给出在强化学习算法研究方面的展望。
(1)提升样本效率是强化学习一直以来的目标。 由于强化学习的交互式学习本质,策略或者价值函数是否能从交互得到的数据中获得有效的提升并没有保证,以至于强化学习算法总是存在样本效率低的问题(尤其是深度强化学习)。在本书的第三部分中,我们已经从多方面讨论了当前主流的提升强化学习样本效率的方法,包括模仿学习、基于模型的策略优化、目标导向的强化学习等。这些方法目前都是强化学习的前沿研究方向,但各自都具有较强的局限性。我们有理由相信,在未来的研究中,强化学习算法的样本效率会持续提升,最终在算力需求和数据采样需求方面都能降低到可观的水平。
(2)在奖励函数并不明确的场景下学习有效的策略。 在标准的强化学习任务中,奖励函数总是确定的。在不少现实场景中,甚至人类也无法确定什么样的奖励函数是好的,但可以给出一个不错的行为控制。对于这个问题,模仿学习目前是一类主流的解决方案,主要方法包括行为克隆、逆向强化学习和占用度量匹配。尽管逆向强化学习和占用度量匹配在模仿学习的研究中占主体,但其训练过程复杂、训练不稳定等问题限制了其在实际场景中的广泛应用。近年来自模仿学习(self-imitation learning)的一些研究开始进入人们的视野,其基本框架就是最简单的行为克隆,但需要对学习的目标行为做一些筛选或者权重分配,进而在训练十分简单的前提下使学习策略的性能获得可观的提升。类似这样的方法有望在各种强化学习的实际场景中落地。
(3)以离线的方式学习到一个较好的策略。 我们在本书中讨论到,离线强化学习使得智能体能从一个离线的经验数据中直接学习到一个较好的策略,在此过程中智能体不和环境交互。从理论上讲,这样的离线强化学习任务极大地拓展了强化学习适用的场景,但现在主流的离线强化学习研究仍然假设离线数据较为丰富,并且探索性较强,对学习到的策略总能完成在线的测试。这样的研究设定其实并不现实,根据离线强化学习评测平台NeoRL给出的评测结果,在离线数据较少、行为策略较为保守并且缺乏在线测试条件的情况下,大多数离线强化学习无法奏效。以学习的方式构建一个高仿真度的模拟器,进而对策略进行评测和训练,可能是一条有效的路线。
(4)真实世界中的分布式决策智能快速发展。 场景中经常出现不止一个智能体,例如多人游戏、无人驾驶、物品排名等场景。在多智能体场景下,有效训练智能体和其他智能体之间的协作和对抗策略具有很高的挑战性,而直接评估一个具体的策略则没有太大的意义,因为给定当前策略,对手总是能训练出专门克制该策略的策略。目前刚刚兴起的一种有效的解决方案是开放博弈中的种群训练,博弈双方或多方通过构建自己的策略池以及训练采样单个策略的元策略,在开放博弈中寻找元策略的均衡点,从而得到总体不败的元策略和对训练算法的总体评估。然而,此类方法的计算复杂度过高,对算法和算力都提出了更高的要求。近期出现的流水线PSRO算法以及MALib、OpenSpiel等计算框架有望让开放博弈下的多智能体强化学习取得突破,服务于真实世界中的分布式决策任务。
此外,我们也从工业落地的具体角度,浅谈强化学习落地的实际挑战。一方面,强化学习的技术门槛较高,具备成功落地强化学习完成智能决策任务能力的工程师较少;另一方面,对具体的场景任务的领域知识的了解程度对于有效落地强化学习算法十分重要。因此,我们认为克服强化学习落地的实际挑战有两种路线。
(1)自动化的强化学习。 在强化学习任务中需要进行选择,包括场景的设定、算法的选择、模型架构的设计、学习算法的超参数规划等。如果能设计一套自动搜索最佳选择的解决方案,则有望大幅度降低强化学习的落地使用门槛。在深度学习任务中,自动化的网络架构搜索、超参数调优等工作已经被证明能有效地自动学习超越人类设计的模型,这类方法被称为自动机器学习(automated machine learning,AutoML)。这样的思路对于强化学习的落地自然是很有希望的,但是自动化的强化学习很可能会耗费极大的算力,因为相比于深度有监督学习,深度强化学习中单个策略的训练已经需要高于一个数量级以上的算力了,那么自动化的强化学习则需要比AutoML耗费更多的算力。
(2)培养深入各个场景一线的强化学习工程师。 另外一个可以使强化学习平民化的路线其实更加直接,即积极培养针对不同实际场景的强化学习工程师。长期深入具体场景一线的工程师能够精准地把握强化学习问题的具体设定,例如奖励函数的设计、策略的限制、数据量是否足够、场景的探索是否充分等要素,从而通过人类智慧高效地完成强化学习的训练任务。本书则希望为强化学习工程师的培养略尽绵薄之力。