首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Q-Learning算法实现与迷宫求解

Q-Learning算法实现与迷宫求解

作者头像
心疼你的一切
发布2026-01-21 08:58:57
发布2026-01-21 08:58:57
840
举报
文章被收录于专栏:人工智能人工智能

引言

强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,它研究如何通过与环境的交互来学习最优行为策略。与监督学习和无监督学习不同,强化学习不需要标注的数据集,而是通过试错机制(Trial and Error)来最大化累积奖励。

Q-Learning是强化学习中最经典和基础的算法之一,它属于值函数方法(Value-based Methods)的范畴。本文将深入介绍Q-Learning算法的原理,并通过实现一个完整的迷宫求解游戏来展示其应用。

在这里插入图片描述
在这里插入图片描述

强化学习基础概念

核心要素

强化学习问题可以抽象为以下几个核心要素:

  1. 智能体(Agent):学习者和决策者,能够感知环境并采取行动
  2. 环境(Environment):智能体外部的一切,能够响应智能体的行动
  3. 状态(State):对环境的描述,表示当前的情况
  4. 动作(Action):智能体可以执行的操作
  5. 奖励(Reward):环境对智能体动作的即时反馈
  6. 策略(Policy):智能体在给定状态下选择动作的规则
马尔可夫决策过程(MDP)

强化学习问题通常被建模为马尔可夫决策过程(Markov Decision Process, MDP),由五元组 (S, A, P, R, γ) 定义:

  • S:状态空间
  • A:动作空间
  • P:状态转移概率函数
  • R:奖励函数
  • γ:折扣因子(0 < γ ≤ 1)
价值函数

价值函数用于评估状态或状态-动作对的优劣:

  1. 状态价值函数 V(s):从状态s出发遵循策略π的期望累积奖励
  2. 动作价值函数 Q(s, a):在状态s执行动作a后遵循策略π的期望累积奖励
Bellman方程

Q-Learning的核心是Bellman最优方程: Q*(s, a) = E[r + γ * max Q*(s’, a’) | s, a]

这个方程表明:最优动作价值等于即时奖励加上折扣后的最优下一状态价值。

Q-Learning算法详解

算法原理

Q-Learning是一种离策略(Off-policy)时序差分(Temporal Difference)学习算法。它通过以下更新规则来学习最优Q函数:

Q(s, a) ← Q(s, a) + α * [r + γ * max Q(s’, a’) - Q(s, a)]

其中:

  • α:学习率(Learning Rate)
  • r:获得的即时奖励
  • γ:折扣因子(Discount Factor)
  • s’:下一个状态
  • max Q(s’, a’):下一个状态的最大Q值
探索与利用平衡

强化学习中的核心挑战是在探索(Exploration)和利用(Exploitation)之间找到平衡:

  1. 探索:尝试新的动作以发现更好的策略
  2. 利用:选择已知最优的动作以获得最大奖励

常用的策略包括:

  • ε-贪心策略(ε-greedy)
  • Softmax策略
  • UCB(Upper Confidence Bound)
算法流程
  1. 初始化Q表为0或小随机值
  2. 对于每个回合:
    • 初始化状态s
    • 对于每个时间步:
      • 使用ε-贪心策略从Q表选择动作a
      • 执行动作a,观察奖励r和新状态s’
      • 更新Q值:Q(s, a) ← Q(s, a) + α * [r + γ * max Q(s’, a’) - Q(s, a)]
      • 更新状态:s ← s’
      • 如果到达终止状态,结束回合

迷宫环境实现

环境设计

我们将实现一个简单的迷宫环境,包含:

  • 网格世界(Grid World)
  • 起点、终点和障碍物
  • 四个基本动作:上、下、左、右
代码语言:javascript
复制
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import random

class MazeEnvironment:
    def __init__(self, width=10, height=10, obstacles=None):
        self.width = width
        self.height = height
        self.obstacles = obstacles if obstacles else []

        # 定义动作:0=上, 1=下, 2=左, 3=右
        self.actions = [0, 1, 2, 3]
        self.action_names = ['上', '下', '左', '右']

        # 设置起点和终点
        self.start = (0, 0)
        self.goal = (width-1, height-1)

        # 初始化智能体位置
        self.reset()

    def reset(self):
        """重置环境到初始状态"""
        self.agent_pos = self.start
        self.done = False
        return self.get_state()

    def get_state(self):
        """获取当前状态"""
        return self.agent_pos[0] * self.width + self.agent_pos[1]

    def step(self, action):
        """执行动作并返回新状态、奖励和是否结束"""
        if self.done:
            return self.get_state(), 0, True

        # 保存当前位置
        old_pos = self.agent_pos

        # 根据动作更新位置
        if action == 0:  # 上
            new_pos = (self.agent_pos[0], max(0, self.agent_pos[1] - 1))
        elif action == 1:  # 下
            new_pos = (self.agent_pos[0], min(self.height - 1, self.agent_pos[1] + 1))
        elif action == 2:  # 左
            new_pos = (max(0, self.agent_pos[0] - 1), self.agent_pos[1])
        elif action == 3:  # 右
            new_pos = (min(self.width - 1, self.agent_pos[0] + 1), self.agent_pos[1])

        # 检查是否碰到障碍物
        if new_pos in self.obstacles:
            new_pos = old_pos  # 如果碰到障碍物,位置不变

        self.agent_pos = new_pos

        # 计算奖励
        if self.agent_pos == self.goal:
            reward = 100  # 到达终点的奖励
            self.done = True
        else:
            # 距离终点的曼哈顿距离作为负奖励
            distance = abs(self.agent_pos[0] - self.goal[0]) + \
                      abs(self.agent_pos[1] - self.goal[1])
            reward = -1 - distance * 0.1  # 基础惩罚 + 距离惩罚

        return self.get_state(), reward, self.done

    def render(self, ax=None):
        """可视化环境"""
        if ax is None:
            fig, ax = plt.subplots(figsize=(8, 8))

        # 清空画布
        ax.clear()

        # 绘制网格
        for i in range(self.width + 1):
            ax.axhline(i, color='black', linewidth=0.5)
            ax.axvline(i, color='black', linewidth=0.5)

        # 绘制障碍物
        for obs in self.obstacles:
            rect = patches.Rectangle(obs, 1, 1, linewidth=1,
                                    edgecolor='black', facecolor='gray')
            ax.add_patch(rect)

        # 绘制起点
        start_rect = patches.Rectangle(self.start, 1, 1, linewidth=1,
                                      edgecolor='green', facecolor='lightgreen')
        ax.add_patch(start_rect)
        ax.text(self.start[0] + 0.5, self.start[1] + 0.5, 'S',
                ha='center', va='center', fontsize=12, fontweight='bold')

        # 绘制终点
        goal_rect = patches.Rectangle(self.goal, 1, 1, linewidth=1,
                                     edgecolor='red', facecolor='lightcoral')
        ax.add_patch(goal_rect)
        ax.text(self.goal[0] + 0.5, self.goal[1] + 0.5, 'G',
                ha='center', va='center', fontsize=12, fontweight='bold')

        # 绘制智能体
        agent_circle = plt.Circle((self.agent_pos[0] + 0.5, self.agent_pos[1] + 0.5),
                                 0.3, color='blue')
        ax.add_patch(agent_circle)

        ax.set_xlim(0, self.width)
        ax.set_ylim(0, self.height)
        ax.set_aspect('equal')
        ax.set_title('迷宫环境')

        return ax
Q-Learning智能体实现
代码语言:javascript
复制
class QLearningAgent:
    def __init__(self, state_space_size, action_space_size, learning_rate=0.1,
                 discount_factor=0.9, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        # 初始化Q表
        self.q_table = np.zeros((state_space_size, action_space_size))

    def choose_action(self, state, training=True):
        """使用ε-贪心策略选择动作"""
        if training and random.random() < self.epsilon:
            return random.choice(range(self.action_space_size))
        else:
            return np.argmax(self.q_table[state])

    def learn(self, state, action, reward, next_state, done):
        """更新Q表"""
        current_q = self.q_table[state, action]

        if done:
            max_next_q = 0
        else:
            max_next_q = np.max(self.q_table[next_state])

        new_q = current_q + self.learning_rate * \
                (reward + self.discount_factor * max_next_q - current_q)

        self.q_table[state, action] = new_q

        # 更新ε值
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def get_policy(self):
        """获取学习到的策略"""
        return np.argmax(self.q_table, axis=1)
训练过程
代码语言:javascript
复制
def train_agent(env, agent, episodes=1000, max_steps_per_episode=100):
    """训练Q-Learning智能体"""
    episode_rewards = []
    episode_lengths = []
    epsilon_values = []

    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        steps = 0

        for step in range(max_steps_per_episode):
            # 选择动作
            action = agent.choose_action(state)

            # 执行动作
            next_state, reward, done = env.step(action)

            # 学习
            agent.learn(state, action, reward, next_state, done)

            # 更新状态
            state = next_state
            total_reward += reward
            steps += 1

            if done:
                break

        episode_rewards.append(total_reward)
        episode_lengths.append(steps)
        epsilon_values.append(agent.epsilon)

        # 打印进度
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}/{episodes}")
            print(f"Average Reward (last 100): {avg_reward:.2f}")
            print(f"Epsilon: {agent.epsilon:.3f}")
            print("-" * 50)

    return episode_rewards, episode_lengths, epsilon_values
可视化训练过程
代码语言:javascript
复制
def plot_training_progress(rewards, lengths, epsilons):
    """绘制训练进度图表"""
    fig, axes = plt.subplots(3, 1, figsize=(10, 12))

    # 计算移动平均
    window_size = 100
    rewards_ma = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid')
    lengths_ma = np.convolve(lengths, np.ones(window_size)/window_size, mode='valid')

    # 绘制奖励
    axes[0].plot(rewards, alpha=0.3, label='Episode Reward')
    axes[0].plot(range(window_size-1, len(rewards)), rewards_ma, 'r-', label=f'Moving Average ({window_size} episodes)')
    axes[0].set_title('Episode Rewards')
    axes[0].set_xlabel('Episode')
    axes[0].set_ylabel('Reward')
    axes[0].legend()
    axes[0].grid(True)

    # 绘制步数
    axes[1].plot(lengths, alpha=0.3, label='Episode Length')
    axes[1].plot(range(window_size-1, len(lengths)), lengths_ma, 'g-', label=f'Moving Average ({window_size} episodes)')
    axes[1].set_title('Episode Lengths')
    axes[1].set_xlabel('Episode')
    axes[1].set_ylabel('Steps')
    axes[1].legend()
    axes[1].grid(True)

    # 绘制epsilon值
    axes[2].plot(epsilons)
    axes[2].set_title('Epsilon (Exploration Rate)')
    axes[2].set_xlabel('Episode')
    axes[2].set_ylabel('Epsilon')
    axes[2].grid(True)

    plt.tight_layout()
    plt.show()
测试训练好的智能体
代码语言:javascript
复制
def test_agent(env, agent, max_episodes=5, max_steps=100):
    """测试训练好的智能体"""
    success_count = 0

    for episode in range(max_episodes):
        state = env.reset()
        path = [state]
        done = False
        steps = 0

        while not done and steps < max_steps:
            # 不使用探索,直接选择最优动作
            action = agent.choose_action(state, training=False)
            next_state, reward, done = env.step(action)

            path.append(next_state)
            state = next_state
            steps += 1

        if done and env.agent_pos == env.goal:
            success_count += 1
            print(f"Episode {episode + 1}: 成功到达终点,用时 {steps} 步")
        else:
            print(f"Episode {episode + 1}: 未能在 {max_steps} 步内到达终点")

    success_rate = success_count / max_episodes
    print(f"\n成功率: {success_rate * 100:.1f}%")

    return success_rate
可视化学习到的策略
代码语言:javascript
复制
def visualize_policy(env, agent):
    """可视化学习到的策略"""
    fig, ax = plt.subplots(figsize=(10, 10))

    # 绘制环境
    env.render(ax)

    # 获取策略
    policy = agent.get_policy()

    # 绘制策略箭头
    arrow_props = dict(arrowstyle='->', lw=2, color='black')

    for i in range(env.width):
        for j in range(env.height):
            state = j * env.width + i
            action = policy[state]

            # 跳过障碍物、起点和终点
            if (i, j) in env.obstacles or (i, j) == env.start or (i, j) == env.goal:
                continue

            # 计算箭头方向
            if action == 0:  # 上
                dx, dy = 0, -0.3
            elif action == 1:  # 下
                dx, dy = 0, 0.3
            elif action == 2:  # 左
                dx, dy = -0.3, 0
            else:  # 右
                dx, dy = 0.3, 0

            # 绘制箭头
            ax.arrow(i + 0.5, j + 0.5, dx, dy, **arrow_props)

    ax.set_title('学习到的策略')
    plt.show()
创建动画展示智能体行为
代码语言:javascript
复制
def animate_agent_path(env, agent, max_steps=100):
    """创建动画展示智能体的路径"""
    frames = []
    state = env.reset()
    done = False
    steps = 0

    while not done and steps < max_steps:
        # 记录当前帧
        fig, ax = plt.subplots(figsize=(8, 8))
        env.render(ax)

        # 保存图像
        fig.canvas.draw()
        image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
        image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
        frames.append(image)

        plt.close(fig)

        # 选择动作并执行
        action = agent.choose_action(state, training=False)
        state, _, done = env.step(action)
        steps += 1

    return frames

def create_animation(frames, interval=200):
    """创建动画"""
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.axis('off')

    im = ax.imshow(frames[0])

    def update(frame):
        im.set_array(frames[frame])
        return [im]

    anim = FuncAnimation(fig, update, frames=len(frames), interval=interval, blit=True)

    plt.close(fig)
    return anim
完整训练流程
代码语言:javascript
复制
# 创建迷宫环境
obstacles = [(2, 2), (2, 3), (2, 4), (5, 5), (5, 6), (5, 7), (7, 3), (7, 4)]
env = MazeEnvironment(width=10, height=10, obstacles=obstacles)

# 创建Q-Learning智能体
state_space_size = env.width * env.height
action_space_size = 4
agent = QLearningAgent(
    state_space_size=state_space_size,
    action_space_size=action_space_size,
    learning_rate=0.1,
    discount_factor=0.95,
    epsilon=1.0,
    epsilon_decay=0.995,
    epsilon_min=0.01
)

# 训练智能体
print("开始训练...")
rewards, lengths, epsilons = train_agent(env, agent, episodes=1000)

# 绘制训练进度
plot_training_progress(rewards, lengths, epsilons)

# 测试智能体
print("\n测试智能体...")
success_rate = test_agent(env, agent, max_episodes=10)

# 可视化策略
print("\n可视化学习到的策略...")
visualize_policy(env, agent)

# 创建动画展示
print("\n生成智能体路径动画...")
frames = animate_agent_path(env, agent)
anim = create_animation(frames)
# 在Jupyter notebook中显示动画
# HTML(anim.to_jshtml())
# 保存动画为GIF
# anim.save('agent_path.gif', writer='pillow')

Q-Learning的改进与变体

Double Q-Learning

标准Q-Learning存在最大值偏差问题。Double Q-Learning使用两个独立的Q表来缓解这个问题:

代码语言:javascript
复制
class DoubleQLearningAgent:
    def __init__(self, state_space_size, action_space_size, learning_rate=0.1,
                 discount_factor=0.9, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        # 初始化两个Q表
        self.q_table_a = np.zeros((state_space_size, action_space_size))
        self.q_table_b = np.zeros((state_space_size, action_space_size))

    def choose_action(self, state, training=True):
        """使用ε-贪心策略选择动作"""
        if training and random.random() < self.epsilon:
            return random.choice(range(self.action_space_size))
        else:
            # 使用两个Q表的平均值
            avg_q = (self.q_table_a[state] + self.q_table_b[state]) / 2
            return np.argmax(avg_q)

    def learn(self, state, action, reward, next_state, done):
        """更新Q表"""
        if random.random() < 0.5:
            # 更新表A
            current_q = self.q_table_a[state, action]
            best_action = np.argmax(self.q_table_a[next_state])
            max_next_q = self.q_table_b[next_state, best_action]
        else:
            # 更新表B
            current_q = self.q_table_b[state, action]
            best_action = np.argmax(self.q_table_b[next_state])
            max_next_q = self.q_table_a[next_state, best_action]

        if done:
            max_next_q = 0

        new_q = current_q + self.learning_rate * \
                (reward + self.discount_factor * max_next_q - current_q)

        if random.random() < 0.5:
            self.q_table_a[state, action] = new_q
        else:
            self.q_table_b[state, action] = new_q

        # 更新ε值
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
优先经验回放

虽然传统上与深度Q网络结合使用,但也可以在表格型Q-Learning中应用优先经验回放的思路:

代码语言:javascript
复制
class PrioritizedQLearningAgent:
    def __init__(self, state_space_size, action_space_size, learning_rate=0.1,
                 discount_factor=0.9, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01, buffer_size=10000):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        # 初始化Q表
        self.q_table = np.zeros((state_space_size, action_space_size))

        # 经验回放缓冲区
        self.buffer = []
        self.buffer_size = buffer_size
        self.priorities = np.zeros(buffer_size)
        self.alpha = 0.6  # 优先级指数

    def store_experience(self, state, action, reward, next_state, done):
        """存储经验并计算优先级"""
        # 计算TD误差作为优先级
        current_q = self.q_table[state, action]
        if done:
            target_q = reward
        else:
            target_q = reward + self.discount_factor * np.max(self.q_table[next_state])
        td_error = abs(target_q - current_q)

        experience = (state, action, reward, next_state, done)

        if len(self.buffer) < self.buffer_size:
            self.buffer.append(experience)
            self.priorities[len(self.buffer)-1] = td_error
        else:
            # 找到最小优先级的位置进行替换
            min_idx = np.argmin(self.priorities)
            if td_error > self.priorities[min_idx]:
                self.buffer[min_idx] = experience
                self.priorities[min_idx] = td_error

    def sample_experience(self):
        """根据优先级采样经验"""
        if len(self.buffer) == 0:
            return None

        # 将优先级转换为概率
        priorities = self.priorities[:len(self.buffer)]
        probs = priorities ** self.alpha
        probs /= probs.sum()

        # 根据概率采样
        idx = np.random.choice(len(self.buffer), p=probs)
        return self.buffer[idx]

    def learn(self):
        """从经验回放缓冲区中学习"""
        experience = self.sample_experience()
        if experience is None:
            return

        state, action, reward, next_state, done = experience

        current_q = self.q_table[state, action]
        if done:
            max_next_q = 0
        else:
            max_next_q = np.max(self.q_table[next_state])

        new_q = current_q + self.learning_rate * \
                (reward + self.discount_factor * max_next_q - current_q)

        self.q_table[state, action] = new_q

总结与展望

本文详细介绍了Q-Learning算法的原理和实现,并通过一个完整的迷宫求解项目展示了其应用。通过这个实践,你应该能够:

  1. 理解强化学习的基本概念和框架
  2. 掌握Q-Learning算法的核心原理和实现细节
  3. 学会设计简单的强化学习环境
  4. 了解如何评估和可视化强化学习智能体的性能
Q-Learning的优势与局限

优势:

  • 简单直观,易于理解和实现
  • 不需要环境模型(模型无关)
  • 保证收敛到最优Q函数(满足一定条件)
  • 适用于离散状态和动作空间

局限:

  • Q表大小随状态数量指数增长(维度灾难)
  • 不适用于连续动作空间
  • 训练效率可能较低
  • 对超参数敏感
进一步探索
  1. 深度Q网络(DQN):使用神经网络近似Q函数,处理大规模状态空间
  2. 策略梯度方法:直接学习策略函数,如REINFORCE、A2C、PPO等
  3. Actor-Critic方法:结合值函数和策略优化的优势
  4. 多智能体强化学习:研究多个智能体的协作与竞争
  5. 分层强化学习:学习层次化的策略
  6. 元强化学习:学习如何快速适应新任务
  7. 离线强化学习:从固定的数据集中学习策略

强化学习正在快速发展,在游戏、机器人控制、自动驾驶、推荐系统等领域都有广泛应用。掌握Q-Learning等基础算法是深入理解强化学习的重要第一步。希望本文能够为你打下坚实的基础,激发你进一步探索强化学习的兴趣。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-01-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
    • 强化学习基础概念
      • 核心要素
      • 马尔可夫决策过程(MDP)
      • 价值函数
      • Bellman方程
    • Q-Learning算法详解
      • 算法原理
      • 探索与利用平衡
      • 算法流程
    • 迷宫环境实现
      • 环境设计
      • Q-Learning智能体实现
      • 训练过程
      • 可视化训练过程
      • 测试训练好的智能体
      • 可视化学习到的策略
      • 创建动画展示智能体行为
      • 完整训练流程
    • Q-Learning的改进与变体
      • Double Q-Learning
      • 优先经验回放
    • 总结与展望
      • Q-Learning的优势与局限
      • 进一步探索
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档