强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,专注于智能体(Agent)如何通过与环境(Environment)的交互来学习最优策略。从游戏AI到机器人控制,从推荐系统到自动驾驶,强化学习正在改变我们与智能系统的交互方式。本文将深入探讨强化学习的核心概念,从传统的Q-Learning算法到现代的深度强化学习方法。
强化学习系统由以下几个关键组件组成:
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import random
class Environment:
"""强化学习环境基类"""
def __init__(self):
self.state = None
self.action_space = None
self.observation_space = None
def reset(self):
"""重置环境到初始状态"""
raise NotImplementedError
def step(self, action):
"""执行动作,返回(next_state, reward, done, info)"""
raise NotImplementedError
def render(self):
"""可视化环境"""
raise NotImplementedError
# 示例:简单的网格世界环境
class GridWorld(Environment):
def __init__(self, width=5, height=5):
super().__init__()
self.width = width
self.height = height
self.action_space = 4 # 上、下、左、右
self.observation_space = width * height
# 定义特殊位置
self.start_pos = (0, 0)
self.goal_pos = (width-1, height-1)
self.obstacles = [(2, 2), (3, 2)]
self.reset()
def reset(self):
self.agent_pos = self.start_pos
self.steps = 0
return self._get_state()
def _get_state(self):
"""将位置转换为状态编号"""
return self.agent_pos[0] * self.height + self.agent_pos[1]
def step(self, action):
self.steps += 1
# 保存旧位置
old_pos = self.agent_pos
# 执行动作
if action == 0: # 上
new_pos = (self.agent_pos[0], max(0, self.agent_pos[1] - 1))
elif action == 1: # 下
new_pos = (self.agent_pos[0], min(self.height - 1, self.agent_pos[1] + 1))
elif action == 2: # 左
new_pos = (max(0, self.agent_pos[0] - 1), self.agent_pos[1])
elif action == 3: # 右
new_pos = (min(self.width - 1, self.agent_pos[0] + 1), self.agent_pos[1])
else:
new_pos = self.agent_pos
# 检查是否碰到障碍物
if new_pos not in self.obstacles:
self.agent_pos = new_pos
# 计算奖励
if self.agent_pos == self.goal_pos:
reward = 10 # 到达目标
done = True
elif self.agent_pos == old_pos: # 撞墙或障碍物
reward = -1
done = False
else:
reward = -0.1 # 每步的小惩罚,鼓励尽快到达目标
done = False
# 限制最大步数
if self.steps >= 100:
done = True
return self._get_state(), reward, done, {}
def render(self):
"""打印网格世界"""
grid = np.full((self.height, self.width), '.', dtype=str)
grid[self.goal_pos[1], self.goal_pos[0]] = 'G'
for obs in self.obstacles:
grid[obs[1], obs[0]] = '#'
grid[self.agent_pos[1], self.agent_pos[0]] = 'A'
print("\n".join(" ".join(row) for row in grid))
print()
# 创建环境并测试
env = GridWorld(5, 5)
state = env.reset()
print(f"初始状态: {state}")
env.render()
for i in range(10):
action = random.randint(0, 3)
state, reward, done, info = env.step(action)
print(f"步骤 {i+1}: 动作={action}, 奖励={reward}, 状态={state}, 完成={done}")
env.render()
if done:
print("任务完成!")
breakQ-Learning是最经典的强化学习算法之一,通过学习动作价值函数Q来找到最优策略。
Q-Learning使用贝尔曼方程来更新Q值:
其中:
是学习率
是折扣因子
是即时奖励
是下一个状态
class QLearningAgent:
def __init__(self, state_space, action_space, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
self.state_space = state_space
self.action_space = action_space
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
# Q表:状态-动作值函数
self.q_table = np.zeros((state_space, action_space))
# 统计信息
self.episode_rewards = []
def choose_action(self, state, training=True):
"""使用epsilon-贪心策略选择动作"""
if training and random.random() < self.epsilon:
# 探索:随机选择动作
return random.randint(0, self.action_space - 1)
else:
# 利用:选择Q值最高的动作
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state):
"""更新Q表"""
# 获取当前Q值
current_q = self.q_table[state, action]
# 计算最大Q值(用于下一状态)
max_next_q = np.max(self.q_table[next_state])
# 贝尔曼方程更新
new_q = current_q + self.learning_rate * (
reward + self.discount_factor * max_next_q - current_q
)
self.q_table[state, action] = new_q
def decay_epsilon(self, decay_rate=0.995, min_epsilon=0.01):
"""衰减探索率"""
self.epsilon = max(min_epsilon, self.epsilon * decay_rate)
def train(self, env, episodes=1000, max_steps=100):
"""训练智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
for step in range(max_steps):
# 选择动作
action = self.choose_action(state)
# 执行动作
next_state, reward, done, _ = env.step(action)
# 更新Q表
self.update(state, action, reward, next_state)
# 更新状态
state = next_state
total_reward += reward
if done:
break
# 记录总奖励
self.episode_rewards.append(total_reward)
# 衰减epsilon
self.decay_epsilon()
# 打印进度
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"Episode {episode}, Average Reward (last 100): {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")
def test(self, env, episodes=10):
"""测试训练好的智能体"""
total_rewards = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 100:
# 选择动作(不探索)
action = self.choose_action(state, training=False)
state, reward, done, _ = env.step(action)
total_reward += reward
steps += 1
total_rewards.append(total_reward)
print(f"Test Episode {episode + 1}: Reward = {total_reward}, Steps = {steps}")
print(f"Average Test Reward: {np.mean(total_rewards):.2f}")
# 训练Q-Learning智能体
env = GridWorld(5, 5)
agent = QLearningAgent(
state_space=env.observation_space,
action_space=env.action_space,
learning_rate=0.1,
discount_factor=0.95,
epsilon=0.1
)
print("开始训练Q-Learning智能体...")
agent.train(env, episodes=1000)
# 测试训练结果
print("\n测试训练好的智能体:")
agent.test(env, episodes=5)
# 可视化学习曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(agent.episode_rewards)
plt.title("Episode Rewards")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.subplot(1, 2, 2)
# 计算移动平均
window = 100
moving_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
plt.plot(moving_avg)
plt.title(f"Moving Average Reward (window={window})")
plt.xlabel("Episode")
plt.ylabel("Average Reward")
plt.show()SARSA是另一种价值迭代算法,与Q-Learning不同,它使用实际执行的下一个动作来更新Q值。
class SARSAAgent:
def __init__(self, state_space, action_space, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
self.state_space = state_space
self.action_space = action_space
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.q_table = np.zeros((state_space, action_space))
self.episode_rewards = []
def choose_action(self, state, training=True):
"""选择动作"""
if training and random.random() < self.epsilon:
return random.randint(0, self.action_space - 1)
else:
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state, next_action):
"""SARSA更新规则"""
current_q = self.q_table[state, action]
next_q = self.q_table[next_state, next_action]
new_q = current_q + self.learning_rate * (
reward + self.discount_factor * next_q - current_q
)
self.q_table[state, action] = new_q
def train(self, env, episodes=1000, max_steps=100):
"""训练SARSA智能体"""
for episode in range(episodes):
state = env.reset()
action = self.choose_action(state)
total_reward = 0
for step in range(max_steps):
# 执行动作
next_state, reward, done, _ = env.step(action)
# 选择下一个动作
next_action = self.choose_action(next_state)
# 更新Q表
self.update(state, action, reward, next_state, next_action)
# 更新状态和动作
state = next_state
action = next_action
total_reward += reward
if done:
break
self.episode_rewards.append(total_reward)
self.epsilon = max(0.01, self.epsilon * 0.995)
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"SARSA Episode {episode}, Average Reward: {avg_reward:.2f}")
# 比较Q-Learning和SARSA
print("\n比较Q-Learning和SARSA:")
# 训练SARSA智能体
sarsa_agent = SARSAAgent(
state_space=env.observation_space,
action_space=env.action_space,
learning_rate=0.1,
discount_factor=0.95,
epsilon=0.1
)
print("训练SARSA智能体...")
sarsa_agent.train(env, episodes=1000)
# 绘制比较图
plt.figure(figsize=(10, 5))
window = 100
# Q-Learning移动平均
q_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
plt.plot(q_avg, label='Q-Learning')
# SARSA移动平均
sarsa_avg = [np.mean(sarsa_agent.episode_rewards[i:i+window]) for i in range(len(sarsa_agent.episode_rewards)-window)]
plt.plot(sarsa_avg, label='SARSA')
plt.title("Q-Learning vs SARSA Performance")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.legend()
plt.show()当状态空间很大时,传统的Q表方法变得不可行。深度Q网络使用神经网络来近似Q函数。
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class DQN(nn.Module):
"""深度Q网络"""
def __init__(self, state_size, action_size, hidden_size=64):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
class ReplayBuffer:
"""经验回放缓冲区"""
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
return random.sample(self.buffer, batch_size)
def __len__(self):
return len(self.buffer)
class DQNAgent:
def __init__(self, state_size, action_size, learning_rate=0.001):
self.state_size = state_size
self.action_size = action_size
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 主网络和目标网络
self.q_network = DQN(state_size, action_size).to(self.device)
self.target_network = DQN(state_size, action_size).to(self.device)
# 复制主网络的权重到目标网络
self.update_target_network()
# 优化器
self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
# 经验回放
self.memory = ReplayBuffer(10000)
# 超参数
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.gamma = 0.95
self.batch_size = 64
self.target_update_freq = 100
self.episode_rewards = []
def update_target_network(self):
"""更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
def one_hot_encode(self, state):
"""将状态编号转换为one-hot编码"""
state_vector = np.zeros(self.state_size)
state_vector[state] = 1
return state_vector
def choose_action(self, state, training=True):
"""选择动作"""
if training and random.random() < self.epsilon:
return random.randint(0, self.action_size - 1)
else:
state_vector = self.one_hot_encode(state)
state_tensor = torch.FloatTensor(state_vector).unsqueeze(0).to(self.device)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def remember(self, state, action, reward, next_state, done):
"""存储经验"""
self.memory.push(state, action, reward, next_state, done)
def replay(self):
"""经验回放训练"""
if len(self.memory) < self.batch_size:
return
# 采样批次
batch = self.memory.sample(self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
# 转换为tensor
states = torch.FloatTensor([self.one_hot_encode(s) for s in states]).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
rewards = torch.FloatTensor(rewards).to(self.device)
next_states = torch.FloatTensor([self.one_hot_encode(s) for s in next_states]).to(self.device)
dones = torch.BoolTensor(dones).to(self.device)
# 计算当前Q值
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
# 计算目标Q值
next_q_values = self.target_network(next_states).max(1)[0].detach()
target_q_values = rewards + (self.gamma * next_q_values * ~dones)
# 计算损失
loss = F.mse_loss(current_q_values.squeeze(), target_q_values)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def train(self, env, episodes=1000):
"""训练DQN智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
steps = 0
while steps < 200: # 限制每集最大步数
# 选择动作
action = self.choose_action(state)
# 执行动作
next_state, reward, done, _ = env.step(action)
# 存储经验
self.remember(state, action, reward, next_state, done)
# 更新状态
state = next_state
total_reward += reward
steps += 1
# 经验回放
self.replay()
if done:
break
# 更新目标网络
if episode % self.target_update_freq == 0:
self.update_target_network()
# 衰减epsilon
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# 记录奖励
self.episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"DQN Episode {episode}, Average Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")
# 训练DQN智能体
dqn_agent = DQNAgent(
state_size=env.observation_space,
action_space=env.action_space,
learning_rate=0.001
)
print("\n开始训练DQN智能体...")
dqn_agent.train(env, episodes=1000)
# 可视化DQN性能
plt.figure(figsize=(10, 5))
window = 100
dqn_avg = [np.mean(dqn_agent.episode_rewards[i:i+window]) for i in range(len(dqn_agent.episode_rewards)-window)]
plt.plot(dqn_avg)
plt.title("DQN Learning Curve")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.show()class DoubleDQNAgent(DQNAgent):
"""Double DQN解决过估计问题"""
def __init__(self, state_size, action_size, learning_rate=0.001):
super().__init__(state_size, action_size, learning_rate)
def replay(self):
"""Double DQN经验回放"""
if len(self.memory) < self.batch_size:
return
batch = self.memory.sample(self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor([self.one_hot_encode(s) for s in states]).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
rewards = torch.FloatTensor(rewards).to(self.device)
next_states = torch.FloatTensor([self.one_hot_encode(s) for s in next_states]).to(self.device)
dones = torch.BoolTensor(dones).to(self.device)
# 当前Q值
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
# Double DQN计算目标Q值
# 使用主网络选择动作,使用目标网络评估
next_actions = self.q_network(next_states).argmax(1)[0]
next_q_values = self.target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze()
target_q_values = rewards + (self.gamma * next_q_values * ~dones)
loss = F.mse_loss(current_q_values.squeeze(), target_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()REINFORCE是最简单的策略梯度算法,直接优化策略函数。
class PolicyNetwork(nn.Module):
"""策略网络"""
def __init__(self, state_size, action_size, hidden_size=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return F.softmax(self.fc3(x), dim=-1)
class REINFORCEAgent:
def __init__(self, state_size, action_size, learning_rate=0.01, gamma=0.99):
self.state_size = state_size
self.action_size = action_size
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 策略网络
self.policy_net = PolicyNetwork(state_size, action_size).to(self.device)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
self.gamma = gamma
self.episode_rewards = []
self.saved_log_probs = []
self.rewards = []
def one_hot_encode(self, state):
"""one-hot编码"""
state_vector = np.zeros(self.state_size)
state_vector[state] = 1
return state_vector
def choose_action(self, state):
"""根据策略选择动作"""
state_vector = self.one_hot_encode(state)
state_tensor = torch.FloatTensor(state_vector).to(self.device)
probs = self.policy_net(state_tensor)
# 从概率分布中采样动作
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
# 保存对数概率用于训练
self.saved_log_probs.append(action_dist.log_prob(action))
return action.item()
def update_policy(self):
"""更新策略"""
R = 0
policy_loss = []
returns = []
# 计算折扣回报
for r in self.rewards[::-1]:
R = r + self.gamma * R
returns.insert(0, R)
# 标准化回报
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算策略损失
for log_prob, R in zip(self.saved_log_probs, returns):
policy_loss.append(-log_prob * R)
# 更新策略
self.optimizer.zero_grad()
policy_loss = torch.stack(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
# 清空缓存
self.saved_log_probs = []
self.rewards = []
def train(self, env, episodes=1000):
"""训练REINFORCE智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# 选择动作
action = self.choose_action(state)
# 执行动作
state, reward, done, _ = env.step(action)
# 记录奖励
self.rewards.append(reward)
total_reward += reward
# 更新策略
self.update_policy()
# 记录总奖励
self.episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"REINFORCE Episode {episode}, Average Reward: {avg_reward:.2f}")
# 训练REINFORCE智能体
reinforce_agent = REINFORCEAgent(
state_size=env.observation_space,
action_size=env.action_space,
learning_rate=0.01,
gamma=0.99
)
print("\n开始训练REINFORCE智能体...")
reinforce_agent.train(env, episodes=1000)Actor-Critic结合了价值方法和策略方法的优点。
class ActorCritic(nn.Module):
"""Actor-Critic网络"""
def __init__(self, state_size, action_size, hidden_size=128):
super(ActorCritic, self).__init__()
self.shared = nn.Linear(state_size, hidden_size)
# Actor头(策略)
self.actor = nn.Linear(hidden_size, action_size)
# Critic头(价值)
self.critic = nn.Linear(hidden_size, 1)
def forward(self, x):
x = F.relu(self.shared(x))
policy = F.softmax(self.actor(x), dim=-1)
value = self.critic(x)
return policy, value
class ActorCriticAgent:
def __init__(self, state_size, action_size, learning_rate=0.01, gamma=0.99):
self.state_size = state_size
self.action_size = action_size
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Actor-Critic网络
self.ac_net = ActorCritic(state_size, action_size).to(self.device)
self.optimizer = optim.Adam(self.ac_net.parameters(), lr=learning_rate)
self.gamma = gamma
self.episode_rewards = []
self.log_probs = []
self.values = []
self.rewards = []
def one_hot_encode(self, state):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1
return state_vector
def choose_action(self, state):
state_vector = self.one_hot_encode(state)
state_tensor = torch.FloatTensor(state_vector).to(self.device)
policy, value = self.ac_net(state_tensor)
# 采样动作
action_dist = torch.distributions.Categorical(policy)
action = action_dist.sample()
# 保存用于训练
self.log_probs.append(action_dist.log_prob(action))
self.values.append(value)
return action.item()
def update(self):
"""更新Actor-Critic网络"""
# 计算回报
returns = []
R = 0
for r in self.rewards[::-1]:
R = r + self.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns).float()
# 计算优势
values = torch.cat(self.values).squeeze()
advantages = returns - values
# 计算Actor损失(策略梯度)
actor_loss = []
for log_prob, advantage in zip(self.log_probs, advantages):
actor_loss.append(-log_prob * advantage)
# 计算Critic损失(价值函数)
critic_loss = F.mse_loss(values, returns)
# 总损失
loss = torch.stack(actor_loss).sum() + critic_loss
# 更新网络
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 清空缓存
self.log_probs = []
self.values = []
self.rewards = []
def train(self, env, episodes=1000):
"""训练Actor-Critic智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# 选择动作
action = self.choose_action(state)
# 执行动作
state, reward, done, _ = env.step(action)
# 记录
self.rewards.append(reward)
total_reward += reward
# 更新网络
self.update()
# 记录奖励
self.episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"Actor-Critic Episode {episode}, Average Reward: {avg_reward:.2f}")
# 训练Actor-Critic智能体
ac_agent = ActorCriticAgent(
state_size=env.observation_space,
action_size=env.action_space,
learning_rate=0.01,
gamma=0.99
)
print("\n开始训练Actor-Critic智能体...")
ac_agent.train(env, episodes=1000)# 比较所有算法的性能
plt.figure(figsize=(15, 8))
# 计算所有算法的移动平均
window = 50
# Q-Learning
if len(agent.episode_rewards) > window:
q_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
plt.plot(q_avg, label='Q-Learning', alpha=0.7)
# SARSA
if len(sarsa_agent.episode_rewards) > window:
sarsa_avg = [np.mean(sarsa_agent.episode_rewards[i:i+window]) for i in range(len(sarsa_agent.episode_rewards)-window)]
plt.plot(sarsa_avg, label='SARSA', alpha=0.7)
# DQN
if len(dqn_agent.episode_rewards) > window:
dqn_avg = [np.mean(dqn_agent.episode_rewards[i:i+window]) for i in range(len(dqn_agent.episode_rewards)-window)]
plt.plot(dqn_avg, label='DQN', alpha=0.7)
# REINFORCE
if len(reinforce_agent.episode_rewards) > window:
reinforce_avg = [np.mean(reinforce_agent.episode_rewards[i:i+window]) for i in range(len(reinforce_agent.episode_rewards)-window)]
plt.plot(reinforce_avg, label='REINFORCE', alpha=0.7)
# Actor-Critic
if len(ac_agent.episode_rewards) > window:
ac_avg = [np.mean(ac_agent.episode_rewards[i:i+window]) for i in range(len(ac_agent.episode_rewards)-window)]
plt.plot(ac_avg, label='Actor-Critic', alpha=0.7)
plt.title("Comparison of Different RL Algorithms")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()让我们使用经典的CartPole环境来测试我们的算法。
import gym
class CartPoleAgent:
def __init__(self, algorithm='dqn'):
self.env = gym.make('CartPole-v1')
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
self.algorithm = algorithm
# 根据算法创建智能体
if algorithm == 'dqn':
self.agent = DQNAgent(self.state_size, self.action_size)
self.agent.epsilon = 0.1 # 降低探索率
elif algorithm == 'reinforce':
self.agent = REINFORCEAgent(self.state_size, self.action_size)
elif algorithm == 'actor-critic':
self.agent = ActorCriticAgent(self.state_size, self.action_size)
def train(self, episodes=500):
"""训练智能体"""
print(f"Training {self.algorithm} agent on CartPole...")
for episode in range(episodes):
state = self.env.reset()
total_reward = 0
done = False
while not done:
if self.algorithm == 'dqn':
action = self.agent.choose_action(state)
next_state, reward, done, _ = self.env.step(action)
self.agent.remember(state, action, reward, next_state, done)
self.agent.replay()
else:
action = self.agent.choose_action(state)
next_state, reward, done, _ = self.env.step(action)
self.agent.rewards.append(reward)
state = next_state
total_reward += reward
# 对于策略梯度方法,更新策略
if self.algorithm != 'dqn':
if self.algorithm == 'reinforce':
self.agent.update_policy()
else:
self.agent.update()
self.agent.episode_rewards.append(total_reward)
if episode % 50 == 0:
avg_reward = np.mean(self.agent.episode_rewards[-50:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")
# 如果平均奖励超过195,认为任务解决
if avg_reward >= 195:
print(f"CartPole solved in {episode} episodes!")
break
def test(self, episodes=10):
"""测试训练好的智能体"""
print(f"\nTesting {self.algorithm} agent...")
total_rewards = []
for episode in range(episodes):
state = self.env.reset()
total_reward = 0
done = False
while not done:
if self.algorithm == 'dqn':
action = self.agent.choose_action(state, training=False)
else:
action = self.agent.choose_action(state)
state, reward, done, _ = self.env.step(action)
total_reward += reward
total_rewards.append(total_reward)
print(f"Test Episode {episode + 1}: Reward = {total_reward}")
print(f"Average Test Reward: {np.mean(total_rewards):.2f}")
# 测试不同算法
algorithms = ['dqn', 'reinforce', 'actor-critic']
results = {}
for algo in algorithms:
cartpole = CartPoleAgent(algo)
cartpole.train(episodes=500)
cartpole.test()
results[algo] = cartpole.agent.episode_rewards
# 可视化结果
plt.figure(figsize=(12, 5))
for algo, rewards in results.items():
window = 50
if len(rewards) > window:
avg = [np.mean(rewards[i:i+window]) for i in range(len(rewards)-window)]
plt.plot(avg, label=algo)
plt.title("CartPole: Algorithm Comparison")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={50})")
plt.axhline(y=195, color='r', linestyle='--', label='Solved Threshold')
plt.legend()
plt.show()class MultiAgentEnvironment:
"""多智能体环境示例"""
def __init__(self, num_agents=2, grid_size=5):
self.num_agents = num_agents
self.grid_size = grid_size
self.agents_pos = [(0, i) for i in range(num_agents)]
self.goal_pos = (grid_size-1, grid_size-1)
self.obstacles = [(2, 2), (3, 2)]
def reset(self):
self.agents_pos = [(0, i) for i in range(self.num_agents)]
return [self._get_agent_state(i) for i in range(self.num_agents)]
def _get_agent_state(self, agent_id):
"""获取单个智能体的状态"""
x, y = self.agents_pos[agent_id]
return x * self.grid_size + y
def step(self, actions):
"""执行所有智能体的动作"""
rewards = []
dones = []
next_states = []
for i in range(self.num_agents):
old_pos = self.agents_pos[i]
action = actions[i]
# 执行动作
if action == 0: # 上
new_pos = (self.agents_pos[i][0], max(0, self.agents_pos[i][1] - 1))
elif action == 1: # 下
new_pos = (self.agents_pos[i][0], min(self.grid_size - 1, self.agents_pos[i][1] + 1))
elif action == 2: # 左
new_pos = (max(0, self.agents_pos[i][0] - 1), self.agents_pos[i][1])
elif action == 3: # 右
new_pos = (min(self.grid_size - 1, self.agents_pos[i][0] + 1), self.agents_pos[i][1])
else:
new_pos = self.agents_pos[i]
# 检查碰撞
if new_pos not in self.obstacles and new_pos not in self.agents_pos[:i] + self.agents_pos[i+1:]:
self.agents_pos[i] = new_pos
# 计算奖励
if self.agents_pos[i] == self.goal_pos:
reward = 10
done = True
elif self.agents_pos[i] == old_pos:
reward = -1
done = False
else:
# 团队奖励:靠近其他智能体
min_dist = min([abs(self.agents_pos[i][0] - ap[0]) + abs(self.agents_pos[i][1] - ap[1])
for j, ap in enumerate(self.agents_pos) if i != j])
reward = 0.1 * (4 - min_dist) # 越近奖励越高
done = False
rewards.append(reward)
dones.append(done)
next_states.append(self._get_agent_state(i))
return next_states, rewards, dones
# 多智能体训练
class MultiAgentTrainer:
def __init__(self, env, num_agents):
self.env = env
self.num_agents = num_agents
self.agents = []
# 为每个智能体创建独立的DQN
for _ in range(num_agents):
agent = DQNAgent(env.grid_size * env.grid_size, 4)
agent.epsilon = 0.1
self.agents.append(agent)
def train(self, episodes=1000):
"""训练多智能体系统"""
for episode in range(episodes):
states = self.env.reset()
total_rewards = [0] * self.num_agents
done = False
while not done:
actions = []
for i in range(self.num_agents):
action = self.agents[i].choose_action(states[i])
actions.append(action)
next_states, rewards, dones = self.env.step(actions)
for i in range(self.num_agents):
self.agents[i].remember(states[i], actions[i], rewards[i],
next_states[i], dones[i])
self.agents[i].replay()
total_rewards[i] += rewards[i]
states = next_states
done = all(dones) or any(dones) # 任意智能体完成或全部完成
if episode % 100 == 0:
avg_reward = np.mean(total_rewards)
print(f"Episode {episode}, Average Team Reward: {avg_reward:.2f}")
# 训练多智能体系统
multi_env = MultiAgentEnvironment(num_agents=2, grid_size=5)
trainer = MultiAgentTrainer(multi_env, 2)
trainer.train(episodes=500)本文全面介绍了强化学习从基础到高级的内容,包括:
强化学习是一个充满活力的研究领域,正在不断发展和完善。从简单的表格方法到复杂的深度网络,从单智能体到多智能体系统,强化学习正在解决越来越复杂的问题。