Skip to content

强化学习入门与实践:从 Q-Learning 到深度强化学习

概述

强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,它研究如何让智能体(Agent)通过与环境交互来学习最优行为策略。与监督学习不同,强化学习不需要标注数据,而是通过试错和奖励信号来学习。

从 AlphaGo 击败人类围棋冠军,到机器人学会走路,再到游戏 AI 超越人类玩家,强化学习在近年来取得了令人瞩目的成就。本教程将带你从零开始学习强化学习的核心概念和算法,通过 5 个实战案例,掌握从经典 Q-Learning 到现代深度强化学习的完整技能树。

你将学到:

  • 强化学习基础概念和数学原理
  • 马尔可夫决策过程(MDP)建模
  • Q-Learning 和 SARSA 算法实现
  • 深度 Q 网络(DQN)原理与实战
  • 策略梯度方法(Policy Gradient)
  • Actor-Critic 架构与 PPO 算法
  • 多智能体强化学习基础

第一章 强化学习基础

1.1 什么是强化学习?

强化学习是一种通过试错来学习的技术。智能体在环境中采取行动,根据行动结果获得奖励或惩罚,目标是学习一个策略来最大化长期累积奖励。

与监督学习的区别:

特性监督学习强化学习
数据来源标注数据集环境交互
反馈类型正确答案奖励信号
学习目标最小化预测误差最大化累积奖励
时间维度独立样本序列决策
应用场景分类、回归游戏、机器人控制

核心要素:

  1. 智能体(Agent):学习和决策的主体
  2. 环境(Environment):智能体交互的外部世界
  3. 状态(State):环境的当前情况
  4. 动作(Action):智能体可以采取的行为
  5. 奖励(Reward):环境对动作的反馈
  6. 策略(Policy):状态到动作的映射规则

1.2 马尔可夫决策过程(MDP)

MDP 是强化学习的数学框架,由五元组 (S, A, P, R, γ) 定义:

  • S:状态空间
  • A:动作空间
  • P:状态转移概率 P(s'|s,a)
  • R:奖励函数 R(s,a,s')
  • γ:折扣因子(0 ≤ γ ≤ 1)

贝尔曼方程:

状态价值函数:

V(s) = E[R(t+1) + γV(S(t+1)) | S(t) = s]

动作价值函数(Q 函数):

Q(s,a) = E[R(t+1) + γmax_a'Q(S(t+1),a') | S(t) = s, A(t) = a]

1.3 探索与利用的权衡

强化学习面临一个核心挑战:探索(Exploration)与利用(Exploitation)的权衡。

  • 探索:尝试新动作以发现更好的策略
  • 利用:使用当前已知的最佳动作

常用策略:

  1. ε-greedy:以概率ε随机探索,以概率 1-ε 利用
  2. Softmax:根据动作价值的概率分布选择
  3. UCB:上界置信区间,平衡不确定性和价值
python
def epsilon_greedy(q_values, epsilon):
    """ε-greedy 策略"""
    if np.random.random() < epsilon:
        return np.random.randint(len(q_values))  # 探索
    else:
        return np.argmax(q_values)  # 利用

def softmax(q_values, temperature=1.0):
    """Softmax 策略"""
    exp_values = np.exp(q_values / temperature)
    probabilities = exp_values / np.sum(exp_values)
    return np.random.choice(len(q_values), p=probabilities)

1.4 强化学习算法分类

按学习方法分类:

  1. 基于价值(Value-based)

    • 学习价值函数,间接得到策略
    • 代表算法:Q-Learning、DQN
    • 适合离散动作空间
  2. 基于策略(Policy-based)

    • 直接学习策略函数
    • 代表算法:Policy Gradient、PPO
    • 适合连续动作空间
  3. Actor-Critic

    • 结合价值和策略方法
    • 代表算法:A3C、SAC、TD3
    • 兼具两者优点

按模型分类:

  1. 无模型(Model-free):不学习环境动态
  2. 有模型(Model-based):学习环境动态模型

第二章 实战案例一:Q-Learning 解决迷宫问题

2.1 项目概述

我们将实现经典的 Q-Learning 算法,让智能体学会走出迷宫。这个项目将帮助你理解强化学习的基本概念和 Q-Learning 的工作原理。

学习目标:

  • 理解 Q-Learning 算法原理
  • 实现 Q 表更新规则
  • 掌握ε-greedy 探索策略
  • 可视化学习过程

2.2 环境定义

python
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

class MazeEnvironment:
    """迷宫环境"""
    
    def __init__(self, maze_size=10):
        self.maze_size = maze_size
        self.start = (0, 0)
        self.goal = (maze_size - 1, maze_size - 1)
        
        # 创建迷宫(0=通路,1=墙壁)
        self.maze = np.zeros((maze_size, maze_size), dtype=int)
        self._generate_maze()
        
        self.state = self.start
        self.actions = ['up', 'down', 'left', 'right']
    
    def _generate_maze(self):
        """生成随机迷宫"""
        # 随机添加墙壁(20% 的概率)
        for i in range(self.maze_size):
            for j in range(self.maze_size):
                if (i, j) != self.start and (i, j) != self.goal:
                    if np.random.random() < 0.2:
                        self.maze[i, j] = 1
    
    def reset(self):
        """重置环境"""
        self.state = self.start
        return self.state
    
    def step(self, action):
        """执行动作"""
        x, y = self.state
        
        # 计算新位置
        if action == 'up':
            new_state = (max(0, x - 1), y)
        elif action == 'down':
            new_state = (min(self.maze_size - 1, x + 1), y)
        elif action == 'left':
            new_state = (x, max(0, y - 1))
        elif action == 'right':
            new_state = (x, min(self.maze_size - 1, y + 1))
        
        # 检查是否撞墙
        if self.maze[new_state] == 1:
            new_state = self.state  # 保持原位
            reward = -10  # 撞墙惩罚
        else:
            self.state = new_state
            # 检查是否到达终点
            if self.state == self.goal:
                reward = 100  # 到达终点奖励
            else:
                reward = -1  # 每步惩罚(鼓励最短路径)
        
        done = (self.state == self.goal)
        return self.state, reward, done
    
    def get_action_index(self, action):
        return self.actions.index(action)
    
    def render(self):
        """可视化迷宫"""
        grid = np.zeros((self.maze_size, self.maze_size, 3))
        
        # 绘制墙壁
        grid[self.maze == 1] = [0.5, 0.5, 0.5]
        
        # 绘制起点
        grid[self.start] = [0, 1, 0]
        
        # 绘制终点
        grid[self.goal] = [1, 0, 0]
        
        # 绘制智能体
        grid[self.state] = [0, 0, 1]
        
        plt.imshow(grid)
        plt.title(f'位置:{self.state}')
        plt.axis('off')
        plt.show()

2.3 Q-Learning 算法实现

python
class QLearningAgent:
    """Q-Learning 智能体"""
    
    def __init__(self, env, learning_rate=0.1, discount_factor=0.95, 
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.env = env
        self.lr = learning_rate  # 学习率
        self.gamma = discount_factor  # 折扣因子
        self.epsilon = epsilon  # 探索率
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # Q 表:Q[state][action]
        self.q_table = defaultdict(lambda: np.zeros(len(env.actions)))
    
    def get_action(self, state):
        """ε-greedy 策略选择动作"""
        if np.random.random() < self.epsilon:
            return np.random.randint(len(self.env.actions))  # 探索
        else:
            return np.argmax(self.q_table[state])  # 利用
    
    def update(self, state, action, reward, next_state, done):
        """更新 Q 值"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.q_table[next_state])
        
        # Q-Learning 更新公式
        self.q_table[state][action] += self.lr * (
            target - self.q_table[state][action]
        )
    
    def decay_epsilon(self):
        """衰减探索率"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def get_best_action(self, state):
        """获取最优动作(用于评估)"""
        return np.argmax(self.q_table[state])

2.4 训练过程

python
def train_q_learning(env, agent, num_episodes=1000, max_steps=100):
    """训练 Q-Learning 智能体"""
    
    rewards_per_episode = []
    steps_per_episode = []
    
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        steps = 0
        
        for step in range(max_steps):
            # 选择动作
            action = agent.get_action(state)
            
            # 执行动作
            next_state, reward, done = env.step(env.actions[action])
            
            # 更新 Q 值
            agent.update(state, action, reward, next_state, done)
            
            state = next_state
            total_reward += reward
            steps += 1
            
            if done:
                break
        
        # 记录统计
        rewards_per_episode.append(total_reward)
        steps_per_episode.append(steps)
        
        # 衰减探索率
        agent.decay_epsilon()
        
        # 打印进度
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards_per_episode[-100:])
            avg_steps = np.mean(steps_per_episode[-100:])
            print(f'Episode {episode+1}/{num_episodes} | '
                  f'Avg Reward: {avg_reward:.2f} | '
                  f'Avg Steps: {avg_steps:.2f} | '
                  f'Epsilon: {agent.epsilon:.3f}')
    
    return rewards_per_episode, steps_per_episode

# 创建环境和智能体
env = MazeEnvironment(maze_size=10)
agent = QLearningAgent(
    env, 
    learning_rate=0.1, 
    discount_factor=0.95,
    epsilon=1.0, 
    epsilon_decay=0.995, 
    epsilon_min=0.01
)

# 训练
rewards, steps = train_q_learning(env, agent, num_episodes=1000)

2.5 结果可视化

python
def plot_training_results(rewards, steps, window=50):
    """绘制训练结果"""
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 奖励曲线
    axes[0].plot(rewards, alpha=0.3, label='每集奖励')
    axes[0].plot(np.convolve(rewards, np.ones(window)/window, mode='valid'), 
                color='red', linewidth=2, label=f'{window}集平均')
    axes[0].set_xlabel('Episode')
    axes[0].set_ylabel('Total Reward')
    axes[0].set_title('训练奖励曲线')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 步数曲线
    axes[1].plot(steps, alpha=0.3, label='每集步数')
    axes[1].plot(np.convolve(steps, np.ones(window)/window, mode='valid'), 
                color='red', linewidth=2, label=f'{window}集平均')
    axes[1].set_xlabel('Episode')
    axes[1].set_ylabel('Steps')
    axes[1].set_title('训练步数曲线')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('q_learning_results.png', dpi=150)
    plt.show()

plot_training_results(rewards, steps)

# 可视化 Q 表
def visualize_q_table(agent, env):
    """可视化 Q 表(每个状态的最优动作)"""
    q_grid = np.zeros((env.maze_size, env.maze_size))
    
    for i in range(env.maze_size):
        for j in range(env.maze_size):
            state = (i, j)
            if env.maze[i, j] == 0:  # 通路
                best_action = agent.get_best_action(state)
                q_grid[i, j] = best_action
            else:
                q_grid[i, j] = -1  # 墙壁
    
    action_names = ['↑', '↓', '←', '→']
    
    plt.figure(figsize=(10, 10))
    plt.imshow(q_grid, cmap='viridis')
    
    # 添加动作箭头
    for i in range(env.maze_size):
        for j in range(env.maze_size):
            if q_grid[i, j] >= 0:
                plt.text(j, i, action_names[int(q_grid[i, j])], 
                        ha='center', va='center', fontsize=12, color='white')
    
    plt.title('Q 表最优策略')
    plt.colorbar(label='动作索引')
    plt.savefig('q_table_policy.png', dpi=150)
    plt.show()

visualize_q_table(agent, env)

2.6 智能体测试

python
def test_agent(agent, env, num_tests=10, max_steps=100, render=False):
    """测试训练好的智能体"""
    success_count = 0
    total_steps = 0
    
    for test in range(num_tests):
        state = env.reset()
        steps = 0
        
        for step in range(max_steps):
            # 使用最优策略(不探索)
            action = agent.get_best_action(state)
            state, reward, done = env.step(env.actions[action])
            steps += 1
            
            if render:
                env.render()
                plt.pause(0.1)
            
            if done:
                success_count += 1
                break
        
        total_steps += steps
    
    success_rate = success_count / num_tests
    avg_steps = total_steps / num_tests
    
    print(f"\n=== 测试结果 ===")
    print(f"测试次数:{num_tests}")
    print(f"成功率:{success_rate*100:.1f}%")
    print(f"平均步数:{avg_steps:.2f}")
    
    return success_rate, avg_steps

test_agent(agent, env, num_tests=10)

第三章 实战案例二:SARSA 算法与出租车问题

3.1 项目概述

SARSA 是另一种经典的时序差分学习算法,与 Q-Learning 不同,它使用实际采取的下一个动作来更新 Q 值(on-policy 学习)。我们将使用 Gym 的 Taxi 环境来比较 SARSA 和 Q-Learning 的差异。

Taxi 环境说明:

  • 4×4 网格世界
  • 1 个出租车、4 个乘客位置、1 个目的地
  • 动作:移动(上下左右)、接客、送客
  • 目标:将乘客从起点送到目的地

3.2 SARSA 算法实现

python
import gymnasium as gym

class SarsaAgent:
    """SARSA 智能体(On-policy)"""
    
    def __init__(self, num_states, num_actions, learning_rate=0.1, 
                 discount_factor=0.95, epsilon=1.0, epsilon_decay=0.995):
        self.num_states = num_states
        self.num_actions = num_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        
        # Q 表
        self.q_table = np.zeros((num_states, num_actions))
    
    def get_action(self, state):
        """ε-greedy 策略"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.num_actions)
        else:
            return np.argmax(self.q_table[state])
    
    def update(self, state, action, reward, next_state, next_action, done):
        """SARSA 更新公式"""
        if done:
            target = reward
        else:
            # 使用实际采取的下一个动作的 Q 值
            target = reward + self.gamma * self.q_table[next_state, next_action]
        
        self.q_table[state, action] += self.lr * (target - self.q_table[state, action])
    
    def decay_epsilon(self):
        self.epsilon = max(0.01, self.epsilon * self.epsilon_decay)

def train_sarsa(env, num_episodes=5000, max_steps=200):
    """训练 SARSA 智能体"""
    
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    
    agent = SarsaAgent(num_states, num_actions)
    
    rewards_per_episode = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        action = agent.get_action(state)
        total_reward = 0
        
        for step in range(max_steps):
            # 执行动作
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # 选择下一个动作(on-policy)
            next_action = agent.get_action(next_state)
            
            # 更新 Q 值
            agent.update(state, action, reward, next_state, next_action, done)
            
            state = next_state
            action = next_action
            total_reward += reward
            
            if done:
                break
        
        agent.decay_epsilon()
        rewards_per_episode.append(total_reward)
        
        if (episode + 1) % 500 == 0:
            avg_reward = np.mean(rewards_per_episode[-500:])
            print(f'Episode {episode+1}/{num_episodes} | Avg Reward: {avg_reward:.2f}')
    
    return agent, rewards_per_episode

# 创建环境
env = gym.make('Taxi-v3')

# 训练 SARSA
sarsa_agent, sarsa_rewards = train_sarsa(env, num_episodes=5000)

3.3 Q-Learning vs SARSA 对比

python
def train_q_learning_gym(env, num_episodes=5000, max_steps=200):
    """Gym 环境的 Q-Learning 训练"""
    
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    
    agent = SarsaAgent(num_states, num_actions)  # 复用类结构
    
    rewards_per_episode = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        
        for step in range(max_steps):
            action = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # Q-Learning 更新(使用 max)
            if done:
                target = reward
            else:
                target = reward + agent.gamma * np.max(agent.q_table[next_state])
            
            agent.q_table[state, action] += agent.lr * (target - agent.q_table[state, action])
            
            state = next_state
            total_reward += reward
            
            if done:
                break
        
        agent.decay_epsilon()
        rewards_per_episode.append(total_reward)
        
        if (episode + 1) % 500 == 0:
            avg_reward = np.mean(rewards_per_episode[-500:])
            print(f'Episode {episode+1}/{num_episodes} | Avg Reward: {avg_reward:.2f}')
    
    return agent, rewards_per_episode

# 训练 Q-Learning
q_agent, q_rewards = train_q_learning_gym(env, num_episodes=5000)

# 对比可视化
def compare_algorithms(sarsa_rewards, q_rewards, window=200):
    """对比 SARSA 和 Q-Learning"""
    plt.figure(figsize=(12, 6))
    
    # 平滑曲线
    sarsa_smooth = np.convolve(sarsa_rewards, np.ones(window)/window, mode='valid')
    q_smooth = np.convolve(q_rewards, np.ones(window)/window, mode='valid')
    
    plt.plot(sarsa_smooth, label='SARSA', color='blue', linewidth=2)
    plt.plot(q_smooth, label='Q-Learning', color='red', linewidth=2)
    
    plt.xlabel('Episode')
    plt.ylabel(f'Average Reward ({window} episodes)')
    plt.title('SARSA vs Q-Learning')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.savefig('sarsa_vs_qlearning.png', dpi=150)
    plt.show()

compare_algorithms(sarsa_rewards, q_rewards)

# 关键差异总结
print("""
=== SARSA vs Q-Learning 关键差异 ===

SARSA (On-policy):
- 使用实际采取的下一个动作更新
- 更保守,考虑探索的影响
- 在随机策略下学习
- 更适合安全关键应用

Q-Learning (Off-policy):
- 使用最优的下一个动作更新 (max)
- 更激进,直接学习最优策略
- 可以从历史数据学习
- 通常收敛更快

选择建议:
- 需要安全保守:选 SARSA
- 追求最优性能:选 Q-Learning
- 有历史数据:选 Q-Learning
""")

3.4 策略可视化

python
def visualize_taxi_policy(agent, env, num_samples=5):
    """可视化出租车策略"""
    
    for _ in range(num_samples):
        state, _ = env.reset()
        total_reward = 0
        steps = 0
        
        print(f"\n=== 测试回合 {_+1} ===")
        
        while steps < 100:
            # 渲染当前状态
            env.render()
            plt.pause(0.2)
            
            action = np.argmax(agent.q_table[state])
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            action_names = ['↓', '↑', '→', '←', '接客', '送客']
            print(f"动作:{action_names[action]}, 奖励:{reward}")
            
            state = next_state
            total_reward += reward
            steps += 1
            
            if terminated or truncated:
                print(f"回合结束!总奖励:{total_reward}, 步数:{steps}")
                break
        
        plt.show()

# 运行可视化(可选)
# visualize_taxi_policy(sarsa_agent, env)

第四章 实战案例三:深度 Q 网络(DQN)玩 CartPole

4.1 项目概述

当状态空间很大或连续时,Q 表方法不再适用。深度 Q 网络(DQN)使用神经网络来近似 Q 函数,能够处理高维状态空间。我们将实现 DQN 来解决 CartPole 平衡问题。

DQN 关键创新:

  1. 经验回放(Experience Replay):存储和随机采样历史经验
  2. 目标网络(Target Network):使用独立网络计算目标 Q 值
  3. 奖励裁剪:将奖励限制在 [-1, 1] 范围内

4.2 DQN 实现

python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random

class DQN(nn.Module):
    """深度 Q 网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DQN, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.network(x)

class ReplayBuffer:
    """经验回放缓冲区"""
    
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.FloatTensor(np.array(states)),
            torch.LongTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(np.array(next_states)),
            torch.FloatTensor(dones)
        )
    
    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    """DQN 智能体"""
    
    def __init__(self, state_dim, action_dim, learning_rate=0.001, 
                 discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01, buffer_size=10000, batch_size=64,
                 target_update=10):
        
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.action_dim = action_dim
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.batch_size = batch_size
        self.target_update = target_update
        
        # 网络
        self.policy_net = DQN(state_dim, action_dim).to(self.device)
        self.target_net = DQN(state_dim, action_dim).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        # 优化器
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        
        # 经验回放
        self.memory = ReplayBuffer(buffer_size)
        
        # 训练计数
        self.steps_done = 0
    
    def select_action(self, state):
        """ε-greedy 动作选择"""
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        else:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
                q_values = self.policy_net(state_tensor)
                return q_values.max(1)[1].item()
    
    def optimize_model(self):
        """优化模型"""
        if len(self.memory) < self.batch_size:
            return
        
        # 采样批次
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        states = states.to(self.device)
        actions = actions.to(self.device)
        rewards = rewards.to(self.device)
        next_states = next_states.to(self.device)
        dones = dones.to(self.device)
        
        # 计算当前 Q 值
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
        
        # 计算目标 Q 值
        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * next_q
        
        # 计算损失
        loss = F.smooth_l1_loss(current_q, target_q.unsqueeze(1))
        
        # 优化
        self.optimizer.zero_grad()
        loss.backward()
        # 梯度裁剪
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()
        
        return loss.item()
    
    def update_target_network(self):
        """更新目标网络"""
        self.target_net.load_state_dict(self.policy_net.state_dict())
    
    def decay_epsilon(self):
        """衰减探索率"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def remember(self, state, action, reward, next_state, done):
        """存储经验"""
        self.memory.push(state, action, reward, next_state, done)

4.3 训练 DQN

python
def train_dqn(env, num_episodes=500, max_steps=500):
    """训练 DQN 智能体"""
    
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    
    agent = DQNAgent(
        state_dim=state_dim,
        action_dim=action_dim,
        learning_rate=0.001,
        discount_factor=0.99,
        epsilon=1.0,
        epsilon_decay=0.995,
        epsilon_min=0.01,
        buffer_size=10000,
        batch_size=64,
        target_update=10
    )
    
    rewards_per_episode = []
    losses = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        
        for step in range(max_steps):
            # 选择动作
            action = agent.select_action(state)
            
            # 执行动作
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # 存储经验
            agent.remember(state, action, reward, next_state, done)
            
            # 优化模型
            loss = agent.optimize_model()
            if loss:
                losses.append(loss)
            
            state = next_state
            total_reward += reward
            
            if done:
                break
        
        # 更新目标网络
        if episode % agent.target_update == 0:
            agent.update_target_network()
        
        agent.decay_epsilon()
        rewards_per_episode.append(total_reward)
        
        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(rewards_per_episode[-50:])
            avg_loss = np.mean(losses[-100:]) if losses else 0
            print(f'Episode {episode+1}/{num_episodes} | '
                  f'Avg Reward: {avg_reward:.2f} | '
                  f'Avg Loss: {avg_loss:.4f} | '
                  f'Epsilon: {agent.epsilon:.3f}')
    
    return agent, rewards_per_episode, losses

# 创建环境
env = gym.make('CartPole-v1')

# 训练 DQN
dqn_agent, dqn_rewards, dqn_losses = train_dqn(env, num_episodes=500)

4.4 结果可视化

python
def plot_dqn_results(rewards, losses, window=50):
    """绘制 DQN 训练结果"""
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 奖励曲线
    axes[0].plot(rewards, alpha=0.3, label='每集奖励')
    axes[0].plot(np.convolve(rewards, np.ones(window)/window, mode='valid'), 
                color='red', linewidth=2, label=f'{window}集平均')
    axes[0].axhline(y=195, color='green', linestyle='--', label='成功阈值 (195)')
    axes[0].set_xlabel('Episode')
    axes[0].set_ylabel('Total Reward')
    axes[0].set_title('DQN 训练奖励曲线 (CartPole)')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 损失曲线
    if losses:
        axes[1].plot(losses, alpha=0.5, label='训练损失')
        axes[1].plot(np.convolve(losses, np.ones(100)/100, mode='valid'), 
                    color='red', linewidth=2, label='100 步平均')
    axes[1].set_xlabel('Step')
    axes[1].set_ylabel('Loss')
    axes[1].set_title('DQN 训练损失')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('dqn_results.png', dpi=150)
    plt.show()

plot_dqn_results(dqn_rewards, dqn_losses)

4.5 测试与演示

python
def test_dqn(agent, env, num_episodes=10, render=True):
    """测试 DQN 智能体"""
    
    success_count = 0
    total_steps = 0
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        steps = 0
        
        for step in range(500):
            if render:
                env.render()
                plt.pause(0.02)
            
            # 使用最优策略
            action = agent.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            state = next_state
            total_reward += reward
            steps += 1
            
            done = terminated or truncated
            if done:
                break
        
        total_steps += steps
        if steps >= 195:  # CartPole 成功标准
            success_count += 1
        
        print(f"回合 {episode+1}: {steps} 步,奖励 {total_reward}")
    
    env.close()
    
    print(f"\n=== 测试结果 ===")
    print(f"成功率:{success_count/num_episodes*100:.1f}%")
    print(f"平均步数:{total_steps/num_episodes:.1f}")
    
    return success_count / num_episodes

# 运行测试
test_dqn(dqn_agent, env, num_episodes=10)

4.6 DQN 变体与改进

python
class DuelingDQN(nn.Module):
    """Dueling DQN:分离状态价值和动作优势"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DuelingDQN, self).__init__()
        
        # 共享特征提取层
        self.features = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # 价值流(状态价值)
        self.value_stream = nn.Sequential(
            nn.Linear(hidden_dim, 1)
        )
        
        # 优势流(动作优势)
        self.advantage_stream = nn.Sequential(
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        features = self.features(x)
        value = self.value_stream(features)
        advantage = self.advantage_stream(features)
        
        # 聚合:Q = V + (A - mean(A))
        q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))
        return q_values

class DoubleDQNAgent(DQNAgent):
    """Double DQN:解决 Q-Learning 过估计问题"""
    
    def optimize_model(self):
        if len(self.memory) < self.batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        states = states.to(self.device)
        actions = actions.to(self.device)
        rewards = rewards.to(self.device)
        next_states = next_states.to(self.device)
        dones = dones.to(self.device)
        
        # 当前 Q 值
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
        
        # Double DQN:用 policy_net 选择动作,target_net 评估价值
        with torch.no_grad():
            next_actions = self.policy_net(next_states).max(1)[1]
            next_q = self.target_net(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)
            target_q = rewards + (1 - dones) * self.gamma * next_q
        
        loss = F.smooth_l1_loss(current_q, target_q.unsqueeze(1))
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

第五章 实战案例四:策略梯度方法(Policy Gradient)

5.1 项目概述

策略梯度方法直接学习策略函数,而不是通过价值函数间接得到策略。这种方法特别适合连续动作空间,并且可以学习随机策略。我们将实现 REINFORCE 算法来解决 CartPole 问题。

策略梯度核心思想:

  • 直接参数化策略 π(a|s; θ)
  • 使用梯度上升最大化期望奖励
  • 无需价值函数近似

5.2 REINFORCE 算法实现

python
class PolicyNetwork(nn.Module):
    """策略网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)  # 输出动作概率分布
        )
    
    def forward(self, x):
        return self.network(x)
    
    def select_action(self, state):
        """根据策略采样动作"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        probs = self.forward(state_tensor)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action)

class ReinforceAgent:
    """REINFORCE 智能体"""
    
    def __init__(self, state_dim, action_dim, learning_rate=0.001, 
                 discount_factor=0.99):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.gamma = discount_factor
        
        self.policy = PolicyNetwork(state_dim, action_dim).to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
    
    def select_action(self, state):
        return self.policy.select_action(state)
    
    def update(self, rewards, log_probs):
        """更新策略网络"""
        # 计算折扣回报
        discounted_returns = []
        G = 0
        for reward in reversed(rewards):
            G = reward + self.gamma * G
            discounted_returns.insert(0, G)
        
        # 标准化回报(减少方差)
        discounted_returns = torch.FloatTensor(discounted_returns).to(self.device)
        discounted_returns = (discounted_returns - discounted_returns.mean()) / (discounted_returns.std() + 1e-9)
        
        # 策略梯度损失
        policy_loss = []
        for log_prob, G in zip(log_probs, discounted_returns):
            policy_loss.append(-log_prob * G)
        
        loss = torch.cat(policy_loss).sum()
        
        # 优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

def train_reinforce(env, num_episodes=1000, max_steps=500):
    """训练 REINFORCE 智能体"""
    
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    
    agent = ReinforceAgent(state_dim, action_dim)
    
    rewards_per_episode = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        rewards = []
        log_probs = []
        total_reward = 0
        
        for step in range(max_steps):
            # 选择动作并记录 log 概率
            action, log_prob = agent.select_action(state)
            log_probs.append(log_prob)
            
            # 执行动作
            next_state, reward, terminated, truncated, _ = env.step(action)
            rewards.append(reward)
            
            state = next_state
            total_reward += reward
            
            done = terminated or truncated
            if done:
                break
        
        # 更新策略
        if len(rewards) > 0:
            loss = agent.update(rewards, log_probs)
        
        rewards_per_episode.append(total_reward)
        
        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(rewards_per_episode[-50:])
            print(f'Episode {episode+1}/{num_episodes} | '
                  f'Avg Reward: {avg_reward:.2f}')
    
    return agent, rewards_per_episode

# 训练 REINFORCE
env = gym.make('CartPole-v1')
reinforce_agent, reinforce_rewards = train_reinforce(env, num_episodes=1000)

5.3 带基线的策略梯度

python
class ActorCriticNetwork(nn.Module):
    """Actor-Critic 网络:同时输出策略和价值"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ActorCriticNetwork, self).__init__()
        
        # 共享特征层
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # Actor(策略)
        self.actor = nn.Sequential(
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )
        
        # Critic(价值)
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, x):
        features = self.shared(x)
        return self.actor(features), self.critic(features)
    
    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        probs, value = self.forward(state_tensor)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), value

class ActorCriticAgent:
    """Actor-Critic 智能体"""
    
    def __init__(self, state_dim, action_dim, learning_rate=0.001, 
                 discount_factor=0.99):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.gamma = discount_factor
        
        self.model = ActorCriticNetwork(state_dim, action_dim).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
    
    def select_action(self, state):
        return self.model.select_action(state)
    
    def update(self, rewards, log_probs, values):
        """更新 Actor-Critic"""
        # 计算折扣回报
        discounted_returns = []
        G = 0
        for reward in reversed(rewards):
            G = reward + self.gamma * G
            discounted_returns.insert(0, G)
        
        discounted_returns = torch.FloatTensor(discounted_returns).to(self.device)
        values = torch.cat(values).squeeze()
        
        # 计算优势函数 A(s,a) = G - V(s)
        advantages = discounted_returns - values
        
        # Actor 损失(策略梯度)
        actor_loss = -(torch.cat(log_probs) * advantages.detach()).sum()
        
        # Critic 损失(价值预测误差)
        critic_loss = F.smooth_l1_loss(values, discounted_returns)
        
        # 总损失
        loss = actor_loss + critic_loss
        
        # 优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item(), actor_loss.item(), critic_loss.item()

def train_actor_critic(env, num_episodes=1000, max_steps=500):
    """训练 Actor-Critic 智能体"""
    
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    
    agent = ActorCriticAgent(state_dim, action_dim)
    
    rewards_per_episode = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        rewards = []
        log_probs = []
        values = []
        total_reward = 0
        
        for step in range(max_steps):
            action, log_prob, value = agent.select_action(state)
            log_probs.append(log_prob)
            values.append(value)
            
            next_state, reward, terminated, truncated, _ = env.step(action)
            rewards.append(reward)
            
            state = next_state
            total_reward += reward
            
            done = terminated or truncated
            if done:
                break
        
        if len(rewards) > 0:
            loss, actor_loss, critic_loss = agent.update(rewards, log_probs, values)
        
        rewards_per_episode.append(total_reward)
        
        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(rewards_per_episode[-50:])
            print(f'Episode {episode+1}/{num_episodes} | '
                  f'Avg Reward: {avg_reward:.2f}')
    
    return agent, rewards_per_episode

# 训练 Actor-Critic
ac_agent, ac_rewards = train_actor_critic(env, num_episodes=1000)

5.4 算法对比

python
def compare_policy_methods(reinforce_rewards, ac_rewards, window=50):
    """对比 REINFORCE 和 Actor-Critic"""
    plt.figure(figsize=(12, 6))
    
    reinforce_smooth = np.convolve(reinforce_rewards, np.ones(window)/window, mode='valid')
    ac_smooth = np.convolve(ac_rewards, np.ones(window)/window, mode='valid')
    
    plt.plot(reinforce_smooth, label='REINFORCE', color='blue', linewidth=2)
    plt.plot(ac_smooth, label='Actor-Critic', color='red', linewidth=2)
    
    plt.xlabel('Episode')
    plt.ylabel(f'Average Reward ({window} episodes)')
    plt.title('REINFORCE vs Actor-Critic')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.savefig('policy_gradient_comparison.png', dpi=150)
    plt.show()

compare_policy_methods(reinforce_rewards, ac_rewards)

print("""
=== 策略梯度方法总结 ===

REINFORCE:
- 纯策略梯度,无基线
- 高方差,需要更多样本
- 实现简单
- 适合入门学习

Actor-Critic:
- 使用价值函数作为基线
- 方差更低,收敛更快
- 同时学习策略和价值
- 现代 RL 算法的基础

选择建议:
- 学习/演示:REINFORCE
- 实际应用:Actor-Critic 或其变体(PPO、A3C)
""")

第六章 实战案例五:PPO 算法玩 LunarLander

6.1 项目概述

PPO(Proximal Policy Optimization)是目前最流行的强化学习算法之一,它通过限制策略更新幅度来保证训练稳定性。我们将实现 PPO 来解决 LunarLander 着陆问题。

PPO 核心创新:

  1. 截断目标函数:限制策略更新幅度
  2. GAE 优势估计:更准确的优势函数计算
  3. 多轮小批量更新:提高样本效率

6.2 PPO 算法实现

python
class PPOActor(nn.Module):
    """PPO Actor 网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(PPOActor, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.network(x)
    
    def get_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        logits = self.forward(state_tensor)
        probs = F.softmax(logits, dim=-1)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), probs

class PPOCritic(nn.Module):
    """PPO Critic 网络"""
    
    def __init__(self, state_dim, hidden_dim=256):
        super(PPOCritic, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, x):
        return self.network(x)

class PPOAgent:
    """PPO 智能体"""
    
    def __init__(self, state_dim, action_dim, learning_rate=0.0003,
                 discount_factor=0.99, gae_lambda=0.95, 
                 clip_epsilon=0.2, epochs=10, batch_size=64):
        
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.gamma = discount_factor
        self.lam = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.epochs = epochs
        self.batch_size = batch_size
        
        # 网络
        self.actor = PPOActor(state_dim, action_dim).to(self.device)
        self.critic = PPOCritic(state_dim).to(self.device)
        
        # 优化器
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=learning_rate)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=learning_rate)
    
    def select_action(self, state):
        action, log_prob, probs = self.actor.get_action(state)
        value = self.critic(torch.FloatTensor(state).unsqueeze(0).to(self.device))
        return action, log_prob, value.item()
    
    def compute_gae(self, rewards, values, dones):
        """计算 GAE 优势"""
        advantages = []
        gae = 0
        
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]
            
            delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        advantages = torch.FloatTensor(advantages).to(self.device)
        returns = advantages + torch.FloatTensor(values).to(self.device)
        
        return advantages, returns
    
    def update(self, states, actions, old_log_probs, rewards, dones):
        """PPO 更新"""
        states = torch.FloatTensor(np.array(states)).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        old_log_probs = torch.FloatTensor(old_log_probs).to(self.device)
        
        # 计算价值
        with torch.no_grad():
            values = [self.critic(s.unsqueeze(0)).item() for s in states]
        
        # 计算 GAE
        advantages, returns = self.compute_gae(rewards, values, dones)
        
        # 标准化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-9)
        
        # 多轮更新
        dataset_size = len(states)
        for _ in range(self.epochs):
            indices = np.random.permutation(dataset_size)
            
            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_indices = indices[start:end]
                
                batch_states = states[batch_indices]
                batch_actions = actions[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_advantages = advantages[batch_indices]
                batch_returns = returns[batch_indices]
                
                # 计算新策略的 log 概率
                _, new_log_probs, _ = self.actor.get_action(batch_states.cpu().numpy()[0])
                new_log_probs = new_log_probs.gather(1, batch_actions.unsqueeze(1)).squeeze()
                
                # 计算比率
                ratio = torch.exp(new_log_probs - batch_old_log_probs)
                
                # 截断目标
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
                actor_loss = -torch.min(surr1, surr2).mean()
                
                # Critic 损失
                values_pred = self.critic(batch_states).squeeze()
                critic_loss = F.smooth_l1_loss(values_pred, batch_returns)
                
                # 熵正则化(鼓励探索)
                _, probs, _ = self.actor.get_action(batch_states.cpu().numpy()[0])
                entropy = -(probs * torch.log(probs + 1e-9)).sum(dim=-1).mean()
                
                # 总损失
                loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy
                
                # 优化
                self.actor_optimizer.zero_grad()
                self.critic_optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
                nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
                self.actor_optimizer.step()
                self.critic_optimizer.step()
        
        return loss.item()

def train_ppo(env, num_episodes=500, update_timestep=2000):
    """训练 PPO 智能体"""
    
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    
    agent = PPOAgent(state_dim, action_dim)
    
    rewards_per_episode = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        
        states, actions, log_probs, rewards, dones = [], [], [], [], []
        
        for step in range(1000):
            action, log_prob, value = agent.select_action(state)
            
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            states.append(state)
            actions.append(action)
            log_probs.append(log_prob.item())
            rewards.append(reward)
            dones.append(done)
            
            state = next_state
            episode_reward += reward
            
            # 达到更新步数
            if len(states) >= update_timestep or done:
                agent.update(states, actions, log_probs, rewards, dones)
                states, actions, log_probs, rewards, dones = [], [], [], [], []
            
            if done:
                break
        
        rewards_per_episode.append(episode_reward)
        
        if (episode + 1) % 20 == 0:
            avg_reward = np.mean(rewards_per_episode[-20:])
            print(f'Episode {episode+1}/{num_episodes} | '
                  f'Avg Reward: {avg_reward:.2f}')
    
    return agent, rewards_per_episode

# 训练 PPO
env = gym.make('LunarLander-v2')
ppo_agent, ppo_rewards = train_ppo(env, num_episodes=500)

6.3 结果可视化与测试

python
def plot_ppo_results(rewards, window=20):
    """绘制 PPO 训练结果"""
    plt.figure(figsize=(12, 6))
    
    plt.plot(rewards, alpha=0.3, label='每集奖励')
    plt.plot(np.convolve(rewards, np.ones(window)/window, mode='valid'), 
            color='red', linewidth=2, label=f'{window}集平均')
    plt.axhline(y=200, color='green', linestyle='--', label='成功阈值 (200)')
    
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('PPO 训练结果 (LunarLander)')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.savefig('ppo_results.png', dpi=150)
    plt.show()

plot_ppo_results(ppo_rewards)

def test_ppo(agent, env, num_episodes=10, render=True):
    """测试 PPO 智能体"""
    
    success_count = 0
    total_reward = 0
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        
        for step in range(1000):
            if render:
                env.render()
                plt.pause(0.02)
            
            action, _, _ = agent.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            state = next_state
            episode_reward += reward
            
            done = terminated or truncated
            if done:
                if reward > 0:  # 成功着陆
                    success_count += 1
                break
        
        total_reward += episode_reward
        print(f"回合 {episode+1}: 奖励 {episode_reward}")
    
    env.close()
    
    print(f"\n=== 测试结果 ===")
    print(f"成功率:{success_count/num_episodes*100:.1f}%")
    print(f"平均奖励:{total_reward/num_episodes:.1f}")

# 运行测试
test_ppo(ppo_agent, env, num_episodes=10)

第七章 强化学习进阶话题

7.1 连续动作空间

对于连续动作空间(如机器人控制),需要使用不同的方法:

python
class ContinuousActor(nn.Module):
    """连续动作空间的 Actor"""
    
    def __init__(self, state_dim, action_dim, action_bounds, hidden_dim=256):
        super(ContinuousActor, self).__init__()
        self.action_bounds = action_bounds
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()  # 输出 [-1, 1]
        )
    
    def forward(self, x):
        return self.network(x) * self.action_bounds
    
    def get_action(self, state, noise=0.1):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action = self.forward(state_tensor)
        
        # 添加探索噪声
        if noise > 0:
            action += torch.randn_like(action) * noise
        
        return action.clamp(-self.action_bounds, self.action_bounds).squeeze().numpy()

# 使用 TD3 或 SAC 处理连续动作
# TD3: Twin Delayed DDPG
# SAC: Soft Actor-Critic

7.2 多智能体强化学习

python
# 多智能体场景
# 使用 MADDPG 或 QMIX 等算法

class MultiAgentEnv:
    """多智能体环境示例"""
    
    def __init__(self, num_agents=2):
        self.num_agents = num_agents
        # 每个智能体有自己的观测和动作空间
    
    def step(self, actions):
        # 所有智能体同时执行动作
        # 返回各自的观测、奖励和 done 信号
        pass

# 推荐库:
# - PettingZoo: 多智能体环境集合
# - Ray RLlib: 分布式 RL 训练
# - Stable Baselines3: 单智能体 RL 库

7.3 模仿学习

python
# 从专家演示中学习
class ImitationLearning:
    """行为克隆(Behavior Cloning)"""
    
    def __init__(self, state_dim, action_dim):
        self.policy = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
        self.optimizer = optim.Adam(self.policy.parameters())
        self.criterion = nn.MSELoss()
    
    def train(self, expert_states, expert_actions, epochs=100):
        """从专家数据中学习"""
        states = torch.FloatTensor(expert_states)
        actions = torch.FloatTensor(expert_actions)
        
        for epoch in range(epochs):
            predictions = self.policy(states)
            loss = self.criterion(predictions, actions)
            
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
        
        return loss.item()

# 进阶:逆强化学习(IRL)
# 从专家行为中推断奖励函数

7.4 模型基强化学习

python
# 学习环境动态模型
class WorldModel(nn.Module):
    """世界模型:预测下一个状态和奖励"""
    
    def __init__(self, state_dim, action_dim):
        super(WorldModel, self).__init__()
        
        # 状态转移模型
        self.transition = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, state_dim)
        )
        
        # 奖励模型
        self.reward = nn.Sequential(
            nn.Linear(state_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
    
    def predict(self, state, action):
        x = torch.cat([state, action], dim=-1)
        next_state = self.transition(x)
        reward = self.reward(x)
        return next_state, reward
    
    def train(self, transitions, rewards):
        """从真实交互数据中学习世界模型"""
        pass

# 使用世界模型进行规划
# Dreamer、MuZero 等算法

第八章 最佳实践与常见问题

8.1 超参数调优指南

关键超参数:

参数典型范围影响
学习率1e-5 ~ 1e-3收敛速度和稳定性
折扣因子γ0.9 ~ 0.999长期 vs 短期奖励
探索率ε1.0 → 0.01探索与利用平衡
批次大小32 ~ 512梯度估计方差
回放缓冲区1e4 ~ 1e6样本效率

调优建议:

  1. 从文献中的默认值开始
  2. 先调学习率,再调其他参数
  3. 使用网格搜索或贝叶斯优化
  4. 记录所有实验配置

8.2 训练不稳定问题

常见问题及解决方案:

  1. 奖励不收敛

    • 检查奖励缩放
    • 增加探索
    • 调整学习率
  2. 策略崩溃

    • 使用 PPO 的截断机制
    • 添加熵正则化
    • 限制梯度范数
  3. 过拟合

    • 增加环境随机性
    • 使用领域随机化
    • 正则化网络

8.3 实用技巧

python
# 1. 奖励缩放
class RewardScaler:
    def __init__(self):
        self.running_mean = 0
        self.running_var = 1
        self.count = 0
    
    def normalize(self, reward):
        self.count += 1
        delta = reward - self.running_mean
        self.running_mean += delta / self.count
        self.running_var += delta * (reward - self.running_mean)
        std = np.sqrt(self.running_var / self.count + 1e-8)
        return reward / std

# 2. 课程学习
def curriculum_learning():
    # 从简单任务开始,逐步增加难度
    pass

# 3. 并行训练
# 使用 A3C、IMPALA 等异步算法
# 或使用 Ray 进行分布式训练

8.4 推荐资源

书籍:

  • 《Reinforcement Learning: An Introduction》(Sutton & Barto)
  • 《Deep Reinforcement Learning Hands-On》

课程:

  • David Silver RL Course (YouTube)
  • Berkeley CS285

库和框架:

  • Stable Baselines3
  • Ray RLlib
  • CleanRL
  • Tianshou

环境:

  • Gymnasium
  • Procgen
  • DeepMind Control Suite
  • MetaWorld

总结

本教程系统介绍了强化学习的核心算法和应用:

  1. Q-Learning:经典的基于价值的算法,适合离散动作空间
  2. SARSA:on-policy 学习,更保守安全
  3. DQN:深度强化学习的里程碑,处理高维状态
  4. 策略梯度:直接学习策略,适合连续动作
  5. PPO:当前最流行的算法,稳定高效

强化学习应用前景:

  • 游戏 AI(AlphaGo、AlphaStar)
  • 机器人控制
  • 自动驾驶
  • 资源调度
  • 推荐系统
  • 金融交易

下一步学习建议:

  1. 深入理解每个算法的数学原理
  2. 在更多环境中实验(Atari、MuJoCo)
  3. 阅读经典论文(DQN、PPO、SAC)
  4. 参与 RL 竞赛和项目

记住,强化学习需要大量实验和调优。不要气馁,持续实践是成功的关键!


参考资料:

Released under the MIT License.