首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >强化学习_从Q-Learning到深度强化学习

强化学习_从Q-Learning到深度强化学习

作者头像
心疼你的一切
发布2026-01-21 08:50:21
发布2026-01-21 08:50:21
2100
举报
文章被收录于专栏:人工智能人工智能

引言

强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,专注于智能体(Agent)如何通过与环境(Environment)的交互来学习最优策略。从游戏AI到机器人控制,从推荐系统到自动驾驶,强化学习正在改变我们与智能系统的交互方式。本文将深入探讨强化学习的核心概念,从传统的Q-Learning算法到现代的深度强化学习方法。

强化学习基础

强化学习的核心要素

强化学习系统由以下几个关键组件组成:

  1. 智能体(Agent):学习并做出决策的实体
  2. 环境(Environment):智能体交互的外部世界
  3. 状态(State):环境的当前情况
  4. 动作(Action):智能体可以执行的操作
  5. 奖励(Reward):执行动作后获得的反馈
  6. 策略(Policy):智能体的决策规则
代码语言:javascript
复制
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import random

class Environment:
    """强化学习环境基类"""
    def __init__(self):
        self.state = None
        self.action_space = None
        self.observation_space = None

    def reset(self):
        """重置环境到初始状态"""
        raise NotImplementedError

    def step(self, action):
        """执行动作,返回(next_state, reward, done, info)"""
        raise NotImplementedError

    def render(self):
        """可视化环境"""
        raise NotImplementedError

# 示例:简单的网格世界环境
class GridWorld(Environment):
    def __init__(self, width=5, height=5):
        super().__init__()
        self.width = width
        self.height = height
        self.action_space = 4  # 上、下、左、右
        self.observation_space = width * height

        # 定义特殊位置
        self.start_pos = (0, 0)
        self.goal_pos = (width-1, height-1)
        self.obstacles = [(2, 2), (3, 2)]

        self.reset()

    def reset(self):
        self.agent_pos = self.start_pos
        self.steps = 0
        return self._get_state()

    def _get_state(self):
        """将位置转换为状态编号"""
        return self.agent_pos[0] * self.height + self.agent_pos[1]

    def step(self, action):
        self.steps += 1

        # 保存旧位置
        old_pos = self.agent_pos

        # 执行动作
        if action == 0:  # 上
            new_pos = (self.agent_pos[0], max(0, self.agent_pos[1] - 1))
        elif action == 1:  # 下
            new_pos = (self.agent_pos[0], min(self.height - 1, self.agent_pos[1] + 1))
        elif action == 2:  # 左
            new_pos = (max(0, self.agent_pos[0] - 1), self.agent_pos[1])
        elif action == 3:  # 右
            new_pos = (min(self.width - 1, self.agent_pos[0] + 1), self.agent_pos[1])
        else:
            new_pos = self.agent_pos

        # 检查是否碰到障碍物
        if new_pos not in self.obstacles:
            self.agent_pos = new_pos

        # 计算奖励
        if self.agent_pos == self.goal_pos:
            reward = 10  # 到达目标
            done = True
        elif self.agent_pos == old_pos:  # 撞墙或障碍物
            reward = -1
            done = False
        else:
            reward = -0.1  # 每步的小惩罚,鼓励尽快到达目标
            done = False

        # 限制最大步数
        if self.steps >= 100:
            done = True

        return self._get_state(), reward, done, {}

    def render(self):
        """打印网格世界"""
        grid = np.full((self.height, self.width), '.', dtype=str)
        grid[self.goal_pos[1], self.goal_pos[0]] = 'G'
        for obs in self.obstacles:
            grid[obs[1], obs[0]] = '#'
        grid[self.agent_pos[1], self.agent_pos[0]] = 'A'

        print("\n".join(" ".join(row) for row in grid))
        print()

# 创建环境并测试
env = GridWorld(5, 5)
state = env.reset()
print(f"初始状态: {state}")

env.render()
for i in range(10):
    action = random.randint(0, 3)
    state, reward, done, info = env.step(action)
    print(f"步骤 {i+1}: 动作={action}, 奖励={reward}, 状态={state}, 完成={done}")
    env.render()
    if done:
        print("任务完成!")
        break

Q-Learning算法

Q-Learning是最经典的强化学习算法之一,通过学习动作价值函数Q来找到最优策略。

Q-Learning原理

Q-Learning使用贝尔曼方程来更新Q值:

Q(s,a) = Q(s,a) + \alpha[r + \gamma \max_{a'} Q(s',a') - Q(s,a)]

其中:

\alpha

是学习率

\gamma

是折扣因子

r

是即时奖励

s'

是下一个状态

代码语言:javascript
复制
class QLearningAgent:
    def __init__(self, state_space, action_space, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
        self.state_space = state_space
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon

        # Q表:状态-动作值函数
        self.q_table = np.zeros((state_space, action_space))

        # 统计信息
        self.episode_rewards = []

    def choose_action(self, state, training=True):
        """使用epsilon-贪心策略选择动作"""
        if training and random.random() < self.epsilon:
            # 探索:随机选择动作
            return random.randint(0, self.action_space - 1)
        else:
            # 利用:选择Q值最高的动作
            return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        """更新Q表"""
        # 获取当前Q值
        current_q = self.q_table[state, action]

        # 计算最大Q值(用于下一状态)
        max_next_q = np.max(self.q_table[next_state])

        # 贝尔曼方程更新
        new_q = current_q + self.learning_rate * (
            reward + self.discount_factor * max_next_q - current_q
        )

        self.q_table[state, action] = new_q

    def decay_epsilon(self, decay_rate=0.995, min_epsilon=0.01):
        """衰减探索率"""
        self.epsilon = max(min_epsilon, self.epsilon * decay_rate)

    def train(self, env, episodes=1000, max_steps=100):
        """训练智能体"""
        for episode in range(episodes):
            state = env.reset()
            total_reward = 0

            for step in range(max_steps):
                # 选择动作
                action = self.choose_action(state)

                # 执行动作
                next_state, reward, done, _ = env.step(action)

                # 更新Q表
                self.update(state, action, reward, next_state)

                # 更新状态
                state = next_state
                total_reward += reward

                if done:
                    break

            # 记录总奖励
            self.episode_rewards.append(total_reward)

            # 衰减epsilon
            self.decay_epsilon()

            # 打印进度
            if episode % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"Episode {episode}, Average Reward (last 100): {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")

    def test(self, env, episodes=10):
        """测试训练好的智能体"""
        total_rewards = []

        for episode in range(episodes):
            state = env.reset()
            total_reward = 0
            done = False
            steps = 0

            while not done and steps < 100:
                # 选择动作(不探索)
                action = self.choose_action(state, training=False)
                state, reward, done, _ = env.step(action)
                total_reward += reward
                steps += 1

            total_rewards.append(total_reward)
            print(f"Test Episode {episode + 1}: Reward = {total_reward}, Steps = {steps}")

        print(f"Average Test Reward: {np.mean(total_rewards):.2f}")

# 训练Q-Learning智能体
env = GridWorld(5, 5)
agent = QLearningAgent(
    state_space=env.observation_space,
    action_space=env.action_space,
    learning_rate=0.1,
    discount_factor=0.95,
    epsilon=0.1
)

print("开始训练Q-Learning智能体...")
agent.train(env, episodes=1000)

# 测试训练结果
print("\n测试训练好的智能体:")
agent.test(env, episodes=5)

# 可视化学习曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(agent.episode_rewards)
plt.title("Episode Rewards")
plt.xlabel("Episode")
plt.ylabel("Total Reward")

plt.subplot(1, 2, 2)
# 计算移动平均
window = 100
moving_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
plt.plot(moving_avg)
plt.title(f"Moving Average Reward (window={window})")
plt.xlabel("Episode")
plt.ylabel("Average Reward")
plt.show()
SARSA算法

SARSA是另一种价值迭代算法,与Q-Learning不同,它使用实际执行的下一个动作来更新Q值。

代码语言:javascript
复制
class SARSAAgent:
    def __init__(self, state_space, action_space, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
        self.state_space = state_space
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon

        self.q_table = np.zeros((state_space, action_space))
        self.episode_rewards = []

    def choose_action(self, state, training=True):
        """选择动作"""
        if training and random.random() < self.epsilon:
            return random.randint(0, self.action_space - 1)
        else:
            return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state, next_action):
        """SARSA更新规则"""
        current_q = self.q_table[state, action]
        next_q = self.q_table[next_state, next_action]

        new_q = current_q + self.learning_rate * (
            reward + self.discount_factor * next_q - current_q
        )

        self.q_table[state, action] = new_q

    def train(self, env, episodes=1000, max_steps=100):
        """训练SARSA智能体"""
        for episode in range(episodes):
            state = env.reset()
            action = self.choose_action(state)
            total_reward = 0

            for step in range(max_steps):
                # 执行动作
                next_state, reward, done, _ = env.step(action)

                # 选择下一个动作
                next_action = self.choose_action(next_state)

                # 更新Q表
                self.update(state, action, reward, next_state, next_action)

                # 更新状态和动作
                state = next_state
                action = next_action
                total_reward += reward

                if done:
                    break

            self.episode_rewards.append(total_reward)
            self.epsilon = max(0.01, self.epsilon * 0.995)

            if episode % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"SARSA Episode {episode}, Average Reward: {avg_reward:.2f}")

# 比较Q-Learning和SARSA
print("\n比较Q-Learning和SARSA:")

# 训练SARSA智能体
sarsa_agent = SARSAAgent(
    state_space=env.observation_space,
    action_space=env.action_space,
    learning_rate=0.1,
    discount_factor=0.95,
    epsilon=0.1
)

print("训练SARSA智能体...")
sarsa_agent.train(env, episodes=1000)

# 绘制比较图
plt.figure(figsize=(10, 5))
window = 100

# Q-Learning移动平均
q_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
plt.plot(q_avg, label='Q-Learning')

# SARSA移动平均
sarsa_avg = [np.mean(sarsa_agent.episode_rewards[i:i+window]) for i in range(len(sarsa_agent.episode_rewards)-window)]
plt.plot(sarsa_avg, label='SARSA')

plt.title("Q-Learning vs SARSA Performance")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.legend()
plt.show()

深度Q网络(Deep Q Network, DQN)

当状态空间很大时,传统的Q表方法变得不可行。深度Q网络使用神经网络来近似Q函数。

DQN架构
代码语言:javascript
复制
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class DQN(nn.Module):
    """深度Q网络"""
    def __init__(self, state_size, action_size, hidden_size=64):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    """经验回放缓冲区"""
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate=0.001):
        self.state_size = state_size
        self.action_size = action_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # 主网络和目标网络
        self.q_network = DQN(state_size, action_size).to(self.device)
        self.target_network = DQN(state_size, action_size).to(self.device)

        # 复制主网络的权重到目标网络
        self.update_target_network()

        # 优化器
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)

        # 经验回放
        self.memory = ReplayBuffer(10000)

        # 超参数
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.gamma = 0.95
        self.batch_size = 64
        self.target_update_freq = 100

        self.episode_rewards = []

    def update_target_network(self):
        """更新目标网络"""
        self.target_network.load_state_dict(self.q_network.state_dict())

    def one_hot_encode(self, state):
        """将状态编号转换为one-hot编码"""
        state_vector = np.zeros(self.state_size)
        state_vector[state] = 1
        return state_vector

    def choose_action(self, state, training=True):
        """选择动作"""
        if training and random.random() < self.epsilon:
            return random.randint(0, self.action_size - 1)
        else:
            state_vector = self.one_hot_encode(state)
            state_tensor = torch.FloatTensor(state_vector).unsqueeze(0).to(self.device)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def remember(self, state, action, reward, next_state, done):
        """存储经验"""
        self.memory.push(state, action, reward, next_state, done)

    def replay(self):
        """经验回放训练"""
        if len(self.memory) < self.batch_size:
            return

        # 采样批次
        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # 转换为tensor
        states = torch.FloatTensor([self.one_hot_encode(s) for s in states]).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor([self.one_hot_encode(s) for s in next_states]).to(self.device)
        dones = torch.BoolTensor(dones).to(self.device)

        # 计算当前Q值
        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

        # 计算目标Q值
        next_q_values = self.target_network(next_states).max(1)[0].detach()
        target_q_values = rewards + (self.gamma * next_q_values * ~dones)

        # 计算损失
        loss = F.mse_loss(current_q_values.squeeze(), target_q_values)

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, env, episodes=1000):
        """训练DQN智能体"""
        for episode in range(episodes):
            state = env.reset()
            total_reward = 0
            steps = 0

            while steps < 200:  # 限制每集最大步数
                # 选择动作
                action = self.choose_action(state)

                # 执行动作
                next_state, reward, done, _ = env.step(action)

                # 存储经验
                self.remember(state, action, reward, next_state, done)

                # 更新状态
                state = next_state
                total_reward += reward
                steps += 1

                # 经验回放
                self.replay()

                if done:
                    break

            # 更新目标网络
            if episode % self.target_update_freq == 0:
                self.update_target_network()

            # 衰减epsilon
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

            # 记录奖励
            self.episode_rewards.append(total_reward)

            if episode % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"DQN Episode {episode}, Average Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")

# 训练DQN智能体
dqn_agent = DQNAgent(
    state_size=env.observation_space,
    action_space=env.action_space,
    learning_rate=0.001
)

print("\n开始训练DQN智能体...")
dqn_agent.train(env, episodes=1000)

# 可视化DQN性能
plt.figure(figsize=(10, 5))
window = 100
dqn_avg = [np.mean(dqn_agent.episode_rewards[i:i+window]) for i in range(len(dqn_agent.episode_rewards)-window)]
plt.plot(dqn_avg)
plt.title("DQN Learning Curve")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.show()
改进的DQN算法
Double DQN
代码语言:javascript
复制
class DoubleDQNAgent(DQNAgent):
    """Double DQN解决过估计问题"""
    def __init__(self, state_size, action_size, learning_rate=0.001):
        super().__init__(state_size, action_size, learning_rate)

    def replay(self):
        """Double DQN经验回放"""
        if len(self.memory) < self.batch_size:
            return

        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor([self.one_hot_encode(s) for s in states]).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor([self.one_hot_encode(s) for s in next_states]).to(self.device)
        dones = torch.BoolTensor(dones).to(self.device)

        # 当前Q值
        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

        # Double DQN计算目标Q值
        # 使用主网络选择动作,使用目标网络评估
        next_actions = self.q_network(next_states).argmax(1)[0]
        next_q_values = self.target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze()
        target_q_values = rewards + (self.gamma * next_q_values * ~dones)

        loss = F.mse_loss(current_q_values.squeeze(), target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

策略梯度方法

REINFORCE算法

REINFORCE是最简单的策略梯度算法,直接优化策略函数。

代码语言:javascript
复制
class PolicyNetwork(nn.Module):
    """策略网络"""
    def __init__(self, state_size, action_size, hidden_size=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return F.softmax(self.fc3(x), dim=-1)

class REINFORCEAgent:
    def __init__(self, state_size, action_size, learning_rate=0.01, gamma=0.99):
        self.state_size = state_size
        self.action_size = action_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # 策略网络
        self.policy_net = PolicyNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)

        self.gamma = gamma
        self.episode_rewards = []
        self.saved_log_probs = []
        self.rewards = []

    def one_hot_encode(self, state):
        """one-hot编码"""
        state_vector = np.zeros(self.state_size)
        state_vector[state] = 1
        return state_vector

    def choose_action(self, state):
        """根据策略选择动作"""
        state_vector = self.one_hot_encode(state)
        state_tensor = torch.FloatTensor(state_vector).to(self.device)
        probs = self.policy_net(state_tensor)

        # 从概率分布中采样动作
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()

        # 保存对数概率用于训练
        self.saved_log_probs.append(action_dist.log_prob(action))

        return action.item()

    def update_policy(self):
        """更新策略"""
        R = 0
        policy_loss = []
        returns = []

        # 计算折扣回报
        for r in self.rewards[::-1]:
            R = r + self.gamma * R
            returns.insert(0, R)

        # 标准化回报
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # 计算策略损失
        for log_prob, R in zip(self.saved_log_probs, returns):
            policy_loss.append(-log_prob * R)

        # 更新策略
        self.optimizer.zero_grad()
        policy_loss = torch.stack(policy_loss).sum()
        policy_loss.backward()
        self.optimizer.step()

        # 清空缓存
        self.saved_log_probs = []
        self.rewards = []

    def train(self, env, episodes=1000):
        """训练REINFORCE智能体"""
        for episode in range(episodes):
            state = env.reset()
            total_reward = 0
            done = False

            while not done:
                # 选择动作
                action = self.choose_action(state)

                # 执行动作
                state, reward, done, _ = env.step(action)

                # 记录奖励
                self.rewards.append(reward)
                total_reward += reward

            # 更新策略
            self.update_policy()

            # 记录总奖励
            self.episode_rewards.append(total_reward)

            if episode % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"REINFORCE Episode {episode}, Average Reward: {avg_reward:.2f}")

# 训练REINFORCE智能体
reinforce_agent = REINFORCEAgent(
    state_size=env.observation_space,
    action_size=env.action_space,
    learning_rate=0.01,
    gamma=0.99
)

print("\n开始训练REINFORCE智能体...")
reinforce_agent.train(env, episodes=1000)

Actor-Critic方法

Actor-Critic结合了价值方法和策略方法的优点。

代码语言:javascript
复制
class ActorCritic(nn.Module):
    """Actor-Critic网络"""
    def __init__(self, state_size, action_size, hidden_size=128):
        super(ActorCritic, self).__init__()
        self.shared = nn.Linear(state_size, hidden_size)

        # Actor头(策略)
        self.actor = nn.Linear(hidden_size, action_size)

        # Critic头(价值)
        self.critic = nn.Linear(hidden_size, 1)

    def forward(self, x):
        x = F.relu(self.shared(x))
        policy = F.softmax(self.actor(x), dim=-1)
        value = self.critic(x)
        return policy, value

class ActorCriticAgent:
    def __init__(self, state_size, action_size, learning_rate=0.01, gamma=0.99):
        self.state_size = state_size
        self.action_size = action_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Actor-Critic网络
        self.ac_net = ActorCritic(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.ac_net.parameters(), lr=learning_rate)

        self.gamma = gamma
        self.episode_rewards = []
        self.log_probs = []
        self.values = []
        self.rewards = []

    def one_hot_encode(self, state):
        state_vector = np.zeros(self.state_size)
        state_vector[state] = 1
        return state_vector

    def choose_action(self, state):
        state_vector = self.one_hot_encode(state)
        state_tensor = torch.FloatTensor(state_vector).to(self.device)
        policy, value = self.ac_net(state_tensor)

        # 采样动作
        action_dist = torch.distributions.Categorical(policy)
        action = action_dist.sample()

        # 保存用于训练
        self.log_probs.append(action_dist.log_prob(action))
        self.values.append(value)

        return action.item()

    def update(self):
        """更新Actor-Critic网络"""
        # 计算回报
        returns = []
        R = 0
        for r in self.rewards[::-1]:
            R = r + self.gamma * R
            returns.insert(0, R)

        returns = torch.tensor(returns).float()

        # 计算优势
        values = torch.cat(self.values).squeeze()
        advantages = returns - values

        # 计算Actor损失(策略梯度)
        actor_loss = []
        for log_prob, advantage in zip(self.log_probs, advantages):
            actor_loss.append(-log_prob * advantage)

        # 计算Critic损失(价值函数)
        critic_loss = F.mse_loss(values, returns)

        # 总损失
        loss = torch.stack(actor_loss).sum() + critic_loss

        # 更新网络
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 清空缓存
        self.log_probs = []
        self.values = []
        self.rewards = []

    def train(self, env, episodes=1000):
        """训练Actor-Critic智能体"""
        for episode in range(episodes):
            state = env.reset()
            total_reward = 0
            done = False

            while not done:
                # 选择动作
                action = self.choose_action(state)

                # 执行动作
                state, reward, done, _ = env.step(action)

                # 记录
                self.rewards.append(reward)
                total_reward += reward

            # 更新网络
            self.update()

            # 记录奖励
            self.episode_rewards.append(total_reward)

            if episode % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"Actor-Critic Episode {episode}, Average Reward: {avg_reward:.2f}")

# 训练Actor-Critic智能体
ac_agent = ActorCriticAgent(
    state_size=env.observation_space,
    action_size=env.action_space,
    learning_rate=0.01,
    gamma=0.99
)

print("\n开始训练Actor-Critic智能体...")
ac_agent.train(env, episodes=1000)

比较不同算法

代码语言:javascript
复制
# 比较所有算法的性能
plt.figure(figsize=(15, 8))

# 计算所有算法的移动平均
window = 50

# Q-Learning
if len(agent.episode_rewards) > window:
    q_avg = [np.mean(agent.episode_rewards[i:i+window]) for i in range(len(agent.episode_rewards)-window)]
    plt.plot(q_avg, label='Q-Learning', alpha=0.7)

# SARSA
if len(sarsa_agent.episode_rewards) > window:
    sarsa_avg = [np.mean(sarsa_agent.episode_rewards[i:i+window]) for i in range(len(sarsa_agent.episode_rewards)-window)]
    plt.plot(sarsa_avg, label='SARSA', alpha=0.7)

# DQN
if len(dqn_agent.episode_rewards) > window:
    dqn_avg = [np.mean(dqn_agent.episode_rewards[i:i+window]) for i in range(len(dqn_agent.episode_rewards)-window)]
    plt.plot(dqn_avg, label='DQN', alpha=0.7)

# REINFORCE
if len(reinforce_agent.episode_rewards) > window:
    reinforce_avg = [np.mean(reinforce_agent.episode_rewards[i:i+window]) for i in range(len(reinforce_agent.episode_rewards)-window)]
    plt.plot(reinforce_avg, label='REINFORCE', alpha=0.7)

# Actor-Critic
if len(ac_agent.episode_rewards) > window:
    ac_avg = [np.mean(ac_agent.episode_rewards[i:i+window]) for i in range(len(ac_agent.episode_rewards)-window)]
    plt.plot(ac_avg, label='Actor-Critic', alpha=0.7)

plt.title("Comparison of Different RL Algorithms")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={window})")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

实战项目:CartPole环境

让我们使用经典的CartPole环境来测试我们的算法。

代码语言:javascript
复制
import gym

class CartPoleAgent:
    def __init__(self, algorithm='dqn'):
        self.env = gym.make('CartPole-v1')
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.algorithm = algorithm

        # 根据算法创建智能体
        if algorithm == 'dqn':
            self.agent = DQNAgent(self.state_size, self.action_size)
            self.agent.epsilon = 0.1  # 降低探索率
        elif algorithm == 'reinforce':
            self.agent = REINFORCEAgent(self.state_size, self.action_size)
        elif algorithm == 'actor-critic':
            self.agent = ActorCriticAgent(self.state_size, self.action_size)

    def train(self, episodes=500):
        """训练智能体"""
        print(f"Training {self.algorithm} agent on CartPole...")

        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0
            done = False

            while not done:
                if self.algorithm == 'dqn':
                    action = self.agent.choose_action(state)
                    next_state, reward, done, _ = self.env.step(action)
                    self.agent.remember(state, action, reward, next_state, done)
                    self.agent.replay()
                else:
                    action = self.agent.choose_action(state)
                    next_state, reward, done, _ = self.env.step(action)
                    self.agent.rewards.append(reward)

                state = next_state
                total_reward += reward

            # 对于策略梯度方法,更新策略
            if self.algorithm != 'dqn':
                if self.algorithm == 'reinforce':
                    self.agent.update_policy()
                else:
                    self.agent.update()

            self.agent.episode_rewards.append(total_reward)

            if episode % 50 == 0:
                avg_reward = np.mean(self.agent.episode_rewards[-50:])
                print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")

                # 如果平均奖励超过195,认为任务解决
                if avg_reward >= 195:
                    print(f"CartPole solved in {episode} episodes!")
                    break

    def test(self, episodes=10):
        """测试训练好的智能体"""
        print(f"\nTesting {self.algorithm} agent...")
        total_rewards = []

        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0
            done = False

            while not done:
                if self.algorithm == 'dqn':
                    action = self.agent.choose_action(state, training=False)
                else:
                    action = self.agent.choose_action(state)

                state, reward, done, _ = self.env.step(action)
                total_reward += reward

            total_rewards.append(total_reward)
            print(f"Test Episode {episode + 1}: Reward = {total_reward}")

        print(f"Average Test Reward: {np.mean(total_rewards):.2f}")

# 测试不同算法
algorithms = ['dqn', 'reinforce', 'actor-critic']
results = {}

for algo in algorithms:
    cartpole = CartPoleAgent(algo)
    cartpole.train(episodes=500)
    cartpole.test()
    results[algo] = cartpole.agent.episode_rewards

# 可视化结果
plt.figure(figsize=(12, 5))
for algo, rewards in results.items():
    window = 50
    if len(rewards) > window:
        avg = [np.mean(rewards[i:i+window]) for i in range(len(rewards)-window)]
        plt.plot(avg, label=algo)

plt.title("CartPole: Algorithm Comparison")
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (window={50})")
plt.axhline(y=195, color='r', linestyle='--', label='Solved Threshold')
plt.legend()
plt.show()

高级主题:多智能体强化学习

代码语言:javascript
复制
class MultiAgentEnvironment:
    """多智能体环境示例"""
    def __init__(self, num_agents=2, grid_size=5):
        self.num_agents = num_agents
        self.grid_size = grid_size
        self.agents_pos = [(0, i) for i in range(num_agents)]
        self.goal_pos = (grid_size-1, grid_size-1)
        self.obstacles = [(2, 2), (3, 2)]

    def reset(self):
        self.agents_pos = [(0, i) for i in range(self.num_agents)]
        return [self._get_agent_state(i) for i in range(self.num_agents)]

    def _get_agent_state(self, agent_id):
        """获取单个智能体的状态"""
        x, y = self.agents_pos[agent_id]
        return x * self.grid_size + y

    def step(self, actions):
        """执行所有智能体的动作"""
        rewards = []
        dones = []
        next_states = []

        for i in range(self.num_agents):
            old_pos = self.agents_pos[i]
            action = actions[i]

            # 执行动作
            if action == 0:  # 上
                new_pos = (self.agents_pos[i][0], max(0, self.agents_pos[i][1] - 1))
            elif action == 1:  # 下
                new_pos = (self.agents_pos[i][0], min(self.grid_size - 1, self.agents_pos[i][1] + 1))
            elif action == 2:  # 左
                new_pos = (max(0, self.agents_pos[i][0] - 1), self.agents_pos[i][1])
            elif action == 3:  # 右
                new_pos = (min(self.grid_size - 1, self.agents_pos[i][0] + 1), self.agents_pos[i][1])
            else:
                new_pos = self.agents_pos[i]

            # 检查碰撞
            if new_pos not in self.obstacles and new_pos not in self.agents_pos[:i] + self.agents_pos[i+1:]:
                self.agents_pos[i] = new_pos

            # 计算奖励
            if self.agents_pos[i] == self.goal_pos:
                reward = 10
                done = True
            elif self.agents_pos[i] == old_pos:
                reward = -1
                done = False
            else:
                # 团队奖励:靠近其他智能体
                min_dist = min([abs(self.agents_pos[i][0] - ap[0]) + abs(self.agents_pos[i][1] - ap[1])
                               for j, ap in enumerate(self.agents_pos) if i != j])
                reward = 0.1 * (4 - min_dist)  # 越近奖励越高
                done = False

            rewards.append(reward)
            dones.append(done)
            next_states.append(self._get_agent_state(i))

        return next_states, rewards, dones

# 多智能体训练
class MultiAgentTrainer:
    def __init__(self, env, num_agents):
        self.env = env
        self.num_agents = num_agents
        self.agents = []

        # 为每个智能体创建独立的DQN
        for _ in range(num_agents):
            agent = DQNAgent(env.grid_size * env.grid_size, 4)
            agent.epsilon = 0.1
            self.agents.append(agent)

    def train(self, episodes=1000):
        """训练多智能体系统"""
        for episode in range(episodes):
            states = self.env.reset()
            total_rewards = [0] * self.num_agents
            done = False

            while not done:
                actions = []
                for i in range(self.num_agents):
                    action = self.agents[i].choose_action(states[i])
                    actions.append(action)

                next_states, rewards, dones = self.env.step(actions)

                for i in range(self.num_agents):
                    self.agents[i].remember(states[i], actions[i], rewards[i],
                                           next_states[i], dones[i])
                    self.agents[i].replay()
                    total_rewards[i] += rewards[i]

                states = next_states
                done = all(dones) or any(dones)  # 任意智能体完成或全部完成

            if episode % 100 == 0:
                avg_reward = np.mean(total_rewards)
                print(f"Episode {episode}, Average Team Reward: {avg_reward:.2f}")

# 训练多智能体系统
multi_env = MultiAgentEnvironment(num_agents=2, grid_size=5)
trainer = MultiAgentTrainer(multi_env, 2)
trainer.train(episodes=500)

总结

本文全面介绍了强化学习从基础到高级的内容,包括:

  1. 基础概念:强化学习的核心要素和框架
  2. 价值迭代方法:Q-Learning、SARSA等经典算法
  3. 深度强化学习:DQN及其改进算法
  4. 策略梯度方法:REINFORCE、Actor-Critic
  5. 实际应用:CartPole环境的实现
  6. 高级主题:多智能体强化学习

强化学习是一个充满活力的研究领域,正在不断发展和完善。从简单的表格方法到复杂的深度网络,从单智能体到多智能体系统,强化学习正在解决越来越复杂的问题。

实践建议

  1. 从简单的环境开始,逐步增加复杂度
  2. 理解算法背后的数学原理
  3. 实验不同的超参数组合
  4. 使用适当的可视化工具理解学习过程
  5. 参与OpenAI Gym、DeepMind Control等平台的挑战
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-01-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 强化学习基础
    • 强化学习的核心要素
  • Q-Learning算法
    • Q-Learning原理
    • SARSA算法
  • 深度Q网络(Deep Q Network, DQN)
    • DQN架构
    • 改进的DQN算法
      • Double DQN
  • 策略梯度方法
    • REINFORCE算法
  • Actor-Critic方法
  • 比较不同算法
  • 实战项目:CartPole环境
  • 高级主题:多智能体强化学习
  • 总结
  • 实践建议
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档