强化学习入门与实践:从 Q-Learning 到深度强化学习
概述
强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,它研究如何让智能体(Agent)通过与环境交互来学习最优行为策略。与监督学习不同,强化学习不需要标注数据,而是通过试错和奖励信号来学习。
从 AlphaGo 击败人类围棋冠军,到机器人学会走路,再到游戏 AI 超越人类玩家,强化学习在近年来取得了令人瞩目的成就。本教程将带你从零开始学习强化学习的核心概念和算法,通过 5 个实战案例,掌握从经典 Q-Learning 到现代深度强化学习的完整技能树。
你将学到:
- 强化学习基础概念和数学原理
- 马尔可夫决策过程(MDP)建模
- Q-Learning 和 SARSA 算法实现
- 深度 Q 网络(DQN)原理与实战
- 策略梯度方法(Policy Gradient)
- Actor-Critic 架构与 PPO 算法
- 多智能体强化学习基础
第一章 强化学习基础
1.1 什么是强化学习?
强化学习是一种通过试错来学习的技术。智能体在环境中采取行动,根据行动结果获得奖励或惩罚,目标是学习一个策略来最大化长期累积奖励。
与监督学习的区别:
| 特性 | 监督学习 | 强化学习 |
|---|---|---|
| 数据来源 | 标注数据集 | 环境交互 |
| 反馈类型 | 正确答案 | 奖励信号 |
| 学习目标 | 最小化预测误差 | 最大化累积奖励 |
| 时间维度 | 独立样本 | 序列决策 |
| 应用场景 | 分类、回归 | 游戏、机器人控制 |
核心要素:
- 智能体(Agent):学习和决策的主体
- 环境(Environment):智能体交互的外部世界
- 状态(State):环境的当前情况
- 动作(Action):智能体可以采取的行为
- 奖励(Reward):环境对动作的反馈
- 策略(Policy):状态到动作的映射规则
1.2 马尔可夫决策过程(MDP)
MDP 是强化学习的数学框架,由五元组 (S, A, P, R, γ) 定义:
- S:状态空间
- A:动作空间
- P:状态转移概率 P(s'|s,a)
- R:奖励函数 R(s,a,s')
- γ:折扣因子(0 ≤ γ ≤ 1)
贝尔曼方程:
状态价值函数:
V(s) = E[R(t+1) + γV(S(t+1)) | S(t) = s]动作价值函数(Q 函数):
Q(s,a) = E[R(t+1) + γmax_a'Q(S(t+1),a') | S(t) = s, A(t) = a]1.3 探索与利用的权衡
强化学习面临一个核心挑战:探索(Exploration)与利用(Exploitation)的权衡。
- 探索:尝试新动作以发现更好的策略
- 利用:使用当前已知的最佳动作
常用策略:
- ε-greedy:以概率ε随机探索,以概率 1-ε 利用
- Softmax:根据动作价值的概率分布选择
- UCB:上界置信区间,平衡不确定性和价值
def epsilon_greedy(q_values, epsilon):
"""ε-greedy 策略"""
if np.random.random() < epsilon:
return np.random.randint(len(q_values)) # 探索
else:
return np.argmax(q_values) # 利用
def softmax(q_values, temperature=1.0):
"""Softmax 策略"""
exp_values = np.exp(q_values / temperature)
probabilities = exp_values / np.sum(exp_values)
return np.random.choice(len(q_values), p=probabilities)1.4 强化学习算法分类
按学习方法分类:
基于价值(Value-based)
- 学习价值函数,间接得到策略
- 代表算法:Q-Learning、DQN
- 适合离散动作空间
基于策略(Policy-based)
- 直接学习策略函数
- 代表算法:Policy Gradient、PPO
- 适合连续动作空间
Actor-Critic
- 结合价值和策略方法
- 代表算法:A3C、SAC、TD3
- 兼具两者优点
按模型分类:
- 无模型(Model-free):不学习环境动态
- 有模型(Model-based):学习环境动态模型
第二章 实战案例一:Q-Learning 解决迷宫问题
2.1 项目概述
我们将实现经典的 Q-Learning 算法,让智能体学会走出迷宫。这个项目将帮助你理解强化学习的基本概念和 Q-Learning 的工作原理。
学习目标:
- 理解 Q-Learning 算法原理
- 实现 Q 表更新规则
- 掌握ε-greedy 探索策略
- 可视化学习过程
2.2 环境定义
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
class MazeEnvironment:
"""迷宫环境"""
def __init__(self, maze_size=10):
self.maze_size = maze_size
self.start = (0, 0)
self.goal = (maze_size - 1, maze_size - 1)
# 创建迷宫(0=通路,1=墙壁)
self.maze = np.zeros((maze_size, maze_size), dtype=int)
self._generate_maze()
self.state = self.start
self.actions = ['up', 'down', 'left', 'right']
def _generate_maze(self):
"""生成随机迷宫"""
# 随机添加墙壁(20% 的概率)
for i in range(self.maze_size):
for j in range(self.maze_size):
if (i, j) != self.start and (i, j) != self.goal:
if np.random.random() < 0.2:
self.maze[i, j] = 1
def reset(self):
"""重置环境"""
self.state = self.start
return self.state
def step(self, action):
"""执行动作"""
x, y = self.state
# 计算新位置
if action == 'up':
new_state = (max(0, x - 1), y)
elif action == 'down':
new_state = (min(self.maze_size - 1, x + 1), y)
elif action == 'left':
new_state = (x, max(0, y - 1))
elif action == 'right':
new_state = (x, min(self.maze_size - 1, y + 1))
# 检查是否撞墙
if self.maze[new_state] == 1:
new_state = self.state # 保持原位
reward = -10 # 撞墙惩罚
else:
self.state = new_state
# 检查是否到达终点
if self.state == self.goal:
reward = 100 # 到达终点奖励
else:
reward = -1 # 每步惩罚(鼓励最短路径)
done = (self.state == self.goal)
return self.state, reward, done
def get_action_index(self, action):
return self.actions.index(action)
def render(self):
"""可视化迷宫"""
grid = np.zeros((self.maze_size, self.maze_size, 3))
# 绘制墙壁
grid[self.maze == 1] = [0.5, 0.5, 0.5]
# 绘制起点
grid[self.start] = [0, 1, 0]
# 绘制终点
grid[self.goal] = [1, 0, 0]
# 绘制智能体
grid[self.state] = [0, 0, 1]
plt.imshow(grid)
plt.title(f'位置:{self.state}')
plt.axis('off')
plt.show()2.3 Q-Learning 算法实现
class QLearningAgent:
"""Q-Learning 智能体"""
def __init__(self, env, learning_rate=0.1, discount_factor=0.95,
epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
self.env = env
self.lr = learning_rate # 学习率
self.gamma = discount_factor # 折扣因子
self.epsilon = epsilon # 探索率
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
# Q 表:Q[state][action]
self.q_table = defaultdict(lambda: np.zeros(len(env.actions)))
def get_action(self, state):
"""ε-greedy 策略选择动作"""
if np.random.random() < self.epsilon:
return np.random.randint(len(self.env.actions)) # 探索
else:
return np.argmax(self.q_table[state]) # 利用
def update(self, state, action, reward, next_state, done):
"""更新 Q 值"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.q_table[next_state])
# Q-Learning 更新公式
self.q_table[state][action] += self.lr * (
target - self.q_table[state][action]
)
def decay_epsilon(self):
"""衰减探索率"""
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
def get_best_action(self, state):
"""获取最优动作(用于评估)"""
return np.argmax(self.q_table[state])2.4 训练过程
def train_q_learning(env, agent, num_episodes=1000, max_steps=100):
"""训练 Q-Learning 智能体"""
rewards_per_episode = []
steps_per_episode = []
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
steps = 0
for step in range(max_steps):
# 选择动作
action = agent.get_action(state)
# 执行动作
next_state, reward, done = env.step(env.actions[action])
# 更新 Q 值
agent.update(state, action, reward, next_state, done)
state = next_state
total_reward += reward
steps += 1
if done:
break
# 记录统计
rewards_per_episode.append(total_reward)
steps_per_episode.append(steps)
# 衰减探索率
agent.decay_epsilon()
# 打印进度
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_per_episode[-100:])
avg_steps = np.mean(steps_per_episode[-100:])
print(f'Episode {episode+1}/{num_episodes} | '
f'Avg Reward: {avg_reward:.2f} | '
f'Avg Steps: {avg_steps:.2f} | '
f'Epsilon: {agent.epsilon:.3f}')
return rewards_per_episode, steps_per_episode
# 创建环境和智能体
env = MazeEnvironment(maze_size=10)
agent = QLearningAgent(
env,
learning_rate=0.1,
discount_factor=0.95,
epsilon=1.0,
epsilon_decay=0.995,
epsilon_min=0.01
)
# 训练
rewards, steps = train_q_learning(env, agent, num_episodes=1000)2.5 结果可视化
def plot_training_results(rewards, steps, window=50):
"""绘制训练结果"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 奖励曲线
axes[0].plot(rewards, alpha=0.3, label='每集奖励')
axes[0].plot(np.convolve(rewards, np.ones(window)/window, mode='valid'),
color='red', linewidth=2, label=f'{window}集平均')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Total Reward')
axes[0].set_title('训练奖励曲线')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 步数曲线
axes[1].plot(steps, alpha=0.3, label='每集步数')
axes[1].plot(np.convolve(steps, np.ones(window)/window, mode='valid'),
color='red', linewidth=2, label=f'{window}集平均')
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('Steps')
axes[1].set_title('训练步数曲线')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('q_learning_results.png', dpi=150)
plt.show()
plot_training_results(rewards, steps)
# 可视化 Q 表
def visualize_q_table(agent, env):
"""可视化 Q 表(每个状态的最优动作)"""
q_grid = np.zeros((env.maze_size, env.maze_size))
for i in range(env.maze_size):
for j in range(env.maze_size):
state = (i, j)
if env.maze[i, j] == 0: # 通路
best_action = agent.get_best_action(state)
q_grid[i, j] = best_action
else:
q_grid[i, j] = -1 # 墙壁
action_names = ['↑', '↓', '←', '→']
plt.figure(figsize=(10, 10))
plt.imshow(q_grid, cmap='viridis')
# 添加动作箭头
for i in range(env.maze_size):
for j in range(env.maze_size):
if q_grid[i, j] >= 0:
plt.text(j, i, action_names[int(q_grid[i, j])],
ha='center', va='center', fontsize=12, color='white')
plt.title('Q 表最优策略')
plt.colorbar(label='动作索引')
plt.savefig('q_table_policy.png', dpi=150)
plt.show()
visualize_q_table(agent, env)2.6 智能体测试
def test_agent(agent, env, num_tests=10, max_steps=100, render=False):
"""测试训练好的智能体"""
success_count = 0
total_steps = 0
for test in range(num_tests):
state = env.reset()
steps = 0
for step in range(max_steps):
# 使用最优策略(不探索)
action = agent.get_best_action(state)
state, reward, done = env.step(env.actions[action])
steps += 1
if render:
env.render()
plt.pause(0.1)
if done:
success_count += 1
break
total_steps += steps
success_rate = success_count / num_tests
avg_steps = total_steps / num_tests
print(f"\n=== 测试结果 ===")
print(f"测试次数:{num_tests}")
print(f"成功率:{success_rate*100:.1f}%")
print(f"平均步数:{avg_steps:.2f}")
return success_rate, avg_steps
test_agent(agent, env, num_tests=10)第三章 实战案例二:SARSA 算法与出租车问题
3.1 项目概述
SARSA 是另一种经典的时序差分学习算法,与 Q-Learning 不同,它使用实际采取的下一个动作来更新 Q 值(on-policy 学习)。我们将使用 Gym 的 Taxi 环境来比较 SARSA 和 Q-Learning 的差异。
Taxi 环境说明:
- 4×4 网格世界
- 1 个出租车、4 个乘客位置、1 个目的地
- 动作:移动(上下左右)、接客、送客
- 目标:将乘客从起点送到目的地
3.2 SARSA 算法实现
import gymnasium as gym
class SarsaAgent:
"""SARSA 智能体(On-policy)"""
def __init__(self, num_states, num_actions, learning_rate=0.1,
discount_factor=0.95, epsilon=1.0, epsilon_decay=0.995):
self.num_states = num_states
self.num_actions = num_actions
self.lr = learning_rate
self.gamma = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
# Q 表
self.q_table = np.zeros((num_states, num_actions))
def get_action(self, state):
"""ε-greedy 策略"""
if np.random.random() < self.epsilon:
return np.random.randint(self.num_actions)
else:
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state, next_action, done):
"""SARSA 更新公式"""
if done:
target = reward
else:
# 使用实际采取的下一个动作的 Q 值
target = reward + self.gamma * self.q_table[next_state, next_action]
self.q_table[state, action] += self.lr * (target - self.q_table[state, action])
def decay_epsilon(self):
self.epsilon = max(0.01, self.epsilon * self.epsilon_decay)
def train_sarsa(env, num_episodes=5000, max_steps=200):
"""训练 SARSA 智能体"""
num_states = env.observation_space.n
num_actions = env.action_space.n
agent = SarsaAgent(num_states, num_actions)
rewards_per_episode = []
for episode in range(num_episodes):
state, _ = env.reset()
action = agent.get_action(state)
total_reward = 0
for step in range(max_steps):
# 执行动作
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# 选择下一个动作(on-policy)
next_action = agent.get_action(next_state)
# 更新 Q 值
agent.update(state, action, reward, next_state, next_action, done)
state = next_state
action = next_action
total_reward += reward
if done:
break
agent.decay_epsilon()
rewards_per_episode.append(total_reward)
if (episode + 1) % 500 == 0:
avg_reward = np.mean(rewards_per_episode[-500:])
print(f'Episode {episode+1}/{num_episodes} | Avg Reward: {avg_reward:.2f}')
return agent, rewards_per_episode
# 创建环境
env = gym.make('Taxi-v3')
# 训练 SARSA
sarsa_agent, sarsa_rewards = train_sarsa(env, num_episodes=5000)3.3 Q-Learning vs SARSA 对比
def train_q_learning_gym(env, num_episodes=5000, max_steps=200):
"""Gym 环境的 Q-Learning 训练"""
num_states = env.observation_space.n
num_actions = env.action_space.n
agent = SarsaAgent(num_states, num_actions) # 复用类结构
rewards_per_episode = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
for step in range(max_steps):
action = agent.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# Q-Learning 更新(使用 max)
if done:
target = reward
else:
target = reward + agent.gamma * np.max(agent.q_table[next_state])
agent.q_table[state, action] += agent.lr * (target - agent.q_table[state, action])
state = next_state
total_reward += reward
if done:
break
agent.decay_epsilon()
rewards_per_episode.append(total_reward)
if (episode + 1) % 500 == 0:
avg_reward = np.mean(rewards_per_episode[-500:])
print(f'Episode {episode+1}/{num_episodes} | Avg Reward: {avg_reward:.2f}')
return agent, rewards_per_episode
# 训练 Q-Learning
q_agent, q_rewards = train_q_learning_gym(env, num_episodes=5000)
# 对比可视化
def compare_algorithms(sarsa_rewards, q_rewards, window=200):
"""对比 SARSA 和 Q-Learning"""
plt.figure(figsize=(12, 6))
# 平滑曲线
sarsa_smooth = np.convolve(sarsa_rewards, np.ones(window)/window, mode='valid')
q_smooth = np.convolve(q_rewards, np.ones(window)/window, mode='valid')
plt.plot(sarsa_smooth, label='SARSA', color='blue', linewidth=2)
plt.plot(q_smooth, label='Q-Learning', color='red', linewidth=2)
plt.xlabel('Episode')
plt.ylabel(f'Average Reward ({window} episodes)')
plt.title('SARSA vs Q-Learning')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig('sarsa_vs_qlearning.png', dpi=150)
plt.show()
compare_algorithms(sarsa_rewards, q_rewards)
# 关键差异总结
print("""
=== SARSA vs Q-Learning 关键差异 ===
SARSA (On-policy):
- 使用实际采取的下一个动作更新
- 更保守,考虑探索的影响
- 在随机策略下学习
- 更适合安全关键应用
Q-Learning (Off-policy):
- 使用最优的下一个动作更新 (max)
- 更激进,直接学习最优策略
- 可以从历史数据学习
- 通常收敛更快
选择建议:
- 需要安全保守:选 SARSA
- 追求最优性能:选 Q-Learning
- 有历史数据:选 Q-Learning
""")3.4 策略可视化
def visualize_taxi_policy(agent, env, num_samples=5):
"""可视化出租车策略"""
for _ in range(num_samples):
state, _ = env.reset()
total_reward = 0
steps = 0
print(f"\n=== 测试回合 {_+1} ===")
while steps < 100:
# 渲染当前状态
env.render()
plt.pause(0.2)
action = np.argmax(agent.q_table[state])
next_state, reward, terminated, truncated, _ = env.step(action)
action_names = ['↓', '↑', '→', '←', '接客', '送客']
print(f"动作:{action_names[action]}, 奖励:{reward}")
state = next_state
total_reward += reward
steps += 1
if terminated or truncated:
print(f"回合结束!总奖励:{total_reward}, 步数:{steps}")
break
plt.show()
# 运行可视化(可选)
# visualize_taxi_policy(sarsa_agent, env)第四章 实战案例三:深度 Q 网络(DQN)玩 CartPole
4.1 项目概述
当状态空间很大或连续时,Q 表方法不再适用。深度 Q 网络(DQN)使用神经网络来近似 Q 函数,能够处理高维状态空间。我们将实现 DQN 来解决 CartPole 平衡问题。
DQN 关键创新:
- 经验回放(Experience Replay):存储和随机采样历史经验
- 目标网络(Target Network):使用独立网络计算目标 Q 值
- 奖励裁剪:将奖励限制在 [-1, 1] 范围内
4.2 DQN 实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
class DQN(nn.Module):
"""深度 Q 网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""经验回放缓冲区"""
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(np.array(states)),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(np.array(next_states)),
torch.FloatTensor(dones)
)
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""DQN 智能体"""
def __init__(self, state_dim, action_dim, learning_rate=0.001,
discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01, buffer_size=10000, batch_size=64,
target_update=10):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.action_dim = action_dim
self.gamma = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.batch_size = batch_size
self.target_update = target_update
# 网络
self.policy_net = DQN(state_dim, action_dim).to(self.device)
self.target_net = DQN(state_dim, action_dim).to(self.device)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.target_net.eval()
# 优化器
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# 经验回放
self.memory = ReplayBuffer(buffer_size)
# 训练计数
self.steps_done = 0
def select_action(self, state):
"""ε-greedy 动作选择"""
if random.random() < self.epsilon:
return random.randint(0, self.action_dim - 1)
else:
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.policy_net(state_tensor)
return q_values.max(1)[1].item()
def optimize_model(self):
"""优化模型"""
if len(self.memory) < self.batch_size:
return
# 采样批次
states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
states = states.to(self.device)
actions = actions.to(self.device)
rewards = rewards.to(self.device)
next_states = next_states.to(self.device)
dones = dones.to(self.device)
# 计算当前 Q 值
current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
# 计算目标 Q 值
with torch.no_grad():
next_q = self.target_net(next_states).max(1)[0]
target_q = rewards + (1 - dones) * self.gamma * next_q
# 计算损失
loss = F.smooth_l1_loss(current_q, target_q.unsqueeze(1))
# 优化
self.optimizer.zero_grad()
loss.backward()
# 梯度裁剪
for param in self.policy_net.parameters():
param.grad.data.clamp_(-1, 1)
self.optimizer.step()
return loss.item()
def update_target_network(self):
"""更新目标网络"""
self.target_net.load_state_dict(self.policy_net.state_dict())
def decay_epsilon(self):
"""衰减探索率"""
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
def remember(self, state, action, reward, next_state, done):
"""存储经验"""
self.memory.push(state, action, reward, next_state, done)4.3 训练 DQN
def train_dqn(env, num_episodes=500, max_steps=500):
"""训练 DQN 智能体"""
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(
state_dim=state_dim,
action_dim=action_dim,
learning_rate=0.001,
discount_factor=0.99,
epsilon=1.0,
epsilon_decay=0.995,
epsilon_min=0.01,
buffer_size=10000,
batch_size=64,
target_update=10
)
rewards_per_episode = []
losses = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
for step in range(max_steps):
# 选择动作
action = agent.select_action(state)
# 执行动作
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# 存储经验
agent.remember(state, action, reward, next_state, done)
# 优化模型
loss = agent.optimize_model()
if loss:
losses.append(loss)
state = next_state
total_reward += reward
if done:
break
# 更新目标网络
if episode % agent.target_update == 0:
agent.update_target_network()
agent.decay_epsilon()
rewards_per_episode.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(rewards_per_episode[-50:])
avg_loss = np.mean(losses[-100:]) if losses else 0
print(f'Episode {episode+1}/{num_episodes} | '
f'Avg Reward: {avg_reward:.2f} | '
f'Avg Loss: {avg_loss:.4f} | '
f'Epsilon: {agent.epsilon:.3f}')
return agent, rewards_per_episode, losses
# 创建环境
env = gym.make('CartPole-v1')
# 训练 DQN
dqn_agent, dqn_rewards, dqn_losses = train_dqn(env, num_episodes=500)4.4 结果可视化
def plot_dqn_results(rewards, losses, window=50):
"""绘制 DQN 训练结果"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 奖励曲线
axes[0].plot(rewards, alpha=0.3, label='每集奖励')
axes[0].plot(np.convolve(rewards, np.ones(window)/window, mode='valid'),
color='red', linewidth=2, label=f'{window}集平均')
axes[0].axhline(y=195, color='green', linestyle='--', label='成功阈值 (195)')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Total Reward')
axes[0].set_title('DQN 训练奖励曲线 (CartPole)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 损失曲线
if losses:
axes[1].plot(losses, alpha=0.5, label='训练损失')
axes[1].plot(np.convolve(losses, np.ones(100)/100, mode='valid'),
color='red', linewidth=2, label='100 步平均')
axes[1].set_xlabel('Step')
axes[1].set_ylabel('Loss')
axes[1].set_title('DQN 训练损失')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('dqn_results.png', dpi=150)
plt.show()
plot_dqn_results(dqn_rewards, dqn_losses)4.5 测试与演示
def test_dqn(agent, env, num_episodes=10, render=True):
"""测试 DQN 智能体"""
success_count = 0
total_steps = 0
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
steps = 0
for step in range(500):
if render:
env.render()
plt.pause(0.02)
# 使用最优策略
action = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
state = next_state
total_reward += reward
steps += 1
done = terminated or truncated
if done:
break
total_steps += steps
if steps >= 195: # CartPole 成功标准
success_count += 1
print(f"回合 {episode+1}: {steps} 步,奖励 {total_reward}")
env.close()
print(f"\n=== 测试结果 ===")
print(f"成功率:{success_count/num_episodes*100:.1f}%")
print(f"平均步数:{total_steps/num_episodes:.1f}")
return success_count / num_episodes
# 运行测试
test_dqn(dqn_agent, env, num_episodes=10)4.6 DQN 变体与改进
class DuelingDQN(nn.Module):
"""Dueling DQN:分离状态价值和动作优势"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DuelingDQN, self).__init__()
# 共享特征提取层
self.features = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 价值流(状态价值)
self.value_stream = nn.Sequential(
nn.Linear(hidden_dim, 1)
)
# 优势流(动作优势)
self.advantage_stream = nn.Sequential(
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
features = self.features(x)
value = self.value_stream(features)
advantage = self.advantage_stream(features)
# 聚合:Q = V + (A - mean(A))
q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))
return q_values
class DoubleDQNAgent(DQNAgent):
"""Double DQN:解决 Q-Learning 过估计问题"""
def optimize_model(self):
if len(self.memory) < self.batch_size:
return
states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
states = states.to(self.device)
actions = actions.to(self.device)
rewards = rewards.to(self.device)
next_states = next_states.to(self.device)
dones = dones.to(self.device)
# 当前 Q 值
current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
# Double DQN:用 policy_net 选择动作,target_net 评估价值
with torch.no_grad():
next_actions = self.policy_net(next_states).max(1)[1]
next_q = self.target_net(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)
target_q = rewards + (1 - dones) * self.gamma * next_q
loss = F.smooth_l1_loss(current_q, target_q.unsqueeze(1))
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()第五章 实战案例四:策略梯度方法(Policy Gradient)
5.1 项目概述
策略梯度方法直接学习策略函数,而不是通过价值函数间接得到策略。这种方法特别适合连续动作空间,并且可以学习随机策略。我们将实现 REINFORCE 算法来解决 CartPole 问题。
策略梯度核心思想:
- 直接参数化策略 π(a|s; θ)
- 使用梯度上升最大化期望奖励
- 无需价值函数近似
5.2 REINFORCE 算法实现
class PolicyNetwork(nn.Module):
"""策略网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1) # 输出动作概率分布
)
def forward(self, x):
return self.network(x)
def select_action(self, state):
"""根据策略采样动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
probs = self.forward(state_tensor)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
class ReinforceAgent:
"""REINFORCE 智能体"""
def __init__(self, state_dim, action_dim, learning_rate=0.001,
discount_factor=0.99):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.gamma = discount_factor
self.policy = PolicyNetwork(state_dim, action_dim).to(self.device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
def select_action(self, state):
return self.policy.select_action(state)
def update(self, rewards, log_probs):
"""更新策略网络"""
# 计算折扣回报
discounted_returns = []
G = 0
for reward in reversed(rewards):
G = reward + self.gamma * G
discounted_returns.insert(0, G)
# 标准化回报(减少方差)
discounted_returns = torch.FloatTensor(discounted_returns).to(self.device)
discounted_returns = (discounted_returns - discounted_returns.mean()) / (discounted_returns.std() + 1e-9)
# 策略梯度损失
policy_loss = []
for log_prob, G in zip(log_probs, discounted_returns):
policy_loss.append(-log_prob * G)
loss = torch.cat(policy_loss).sum()
# 优化
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def train_reinforce(env, num_episodes=1000, max_steps=500):
"""训练 REINFORCE 智能体"""
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = ReinforceAgent(state_dim, action_dim)
rewards_per_episode = []
for episode in range(num_episodes):
state, _ = env.reset()
rewards = []
log_probs = []
total_reward = 0
for step in range(max_steps):
# 选择动作并记录 log 概率
action, log_prob = agent.select_action(state)
log_probs.append(log_prob)
# 执行动作
next_state, reward, terminated, truncated, _ = env.step(action)
rewards.append(reward)
state = next_state
total_reward += reward
done = terminated or truncated
if done:
break
# 更新策略
if len(rewards) > 0:
loss = agent.update(rewards, log_probs)
rewards_per_episode.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(rewards_per_episode[-50:])
print(f'Episode {episode+1}/{num_episodes} | '
f'Avg Reward: {avg_reward:.2f}')
return agent, rewards_per_episode
# 训练 REINFORCE
env = gym.make('CartPole-v1')
reinforce_agent, reinforce_rewards = train_reinforce(env, num_episodes=1000)5.3 带基线的策略梯度
class ActorCriticNetwork(nn.Module):
"""Actor-Critic 网络:同时输出策略和价值"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ActorCriticNetwork, self).__init__()
# 共享特征层
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# Actor(策略)
self.actor = nn.Sequential(
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
# Critic(价值)
self.critic = nn.Sequential(
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
features = self.shared(x)
return self.actor(features), self.critic(features)
def select_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
probs, value = self.forward(state_tensor)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action), value
class ActorCriticAgent:
"""Actor-Critic 智能体"""
def __init__(self, state_dim, action_dim, learning_rate=0.001,
discount_factor=0.99):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.gamma = discount_factor
self.model = ActorCriticNetwork(state_dim, action_dim).to(self.device)
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
def select_action(self, state):
return self.model.select_action(state)
def update(self, rewards, log_probs, values):
"""更新 Actor-Critic"""
# 计算折扣回报
discounted_returns = []
G = 0
for reward in reversed(rewards):
G = reward + self.gamma * G
discounted_returns.insert(0, G)
discounted_returns = torch.FloatTensor(discounted_returns).to(self.device)
values = torch.cat(values).squeeze()
# 计算优势函数 A(s,a) = G - V(s)
advantages = discounted_returns - values
# Actor 损失(策略梯度)
actor_loss = -(torch.cat(log_probs) * advantages.detach()).sum()
# Critic 损失(价值预测误差)
critic_loss = F.smooth_l1_loss(values, discounted_returns)
# 总损失
loss = actor_loss + critic_loss
# 优化
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item(), actor_loss.item(), critic_loss.item()
def train_actor_critic(env, num_episodes=1000, max_steps=500):
"""训练 Actor-Critic 智能体"""
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = ActorCriticAgent(state_dim, action_dim)
rewards_per_episode = []
for episode in range(num_episodes):
state, _ = env.reset()
rewards = []
log_probs = []
values = []
total_reward = 0
for step in range(max_steps):
action, log_prob, value = agent.select_action(state)
log_probs.append(log_prob)
values.append(value)
next_state, reward, terminated, truncated, _ = env.step(action)
rewards.append(reward)
state = next_state
total_reward += reward
done = terminated or truncated
if done:
break
if len(rewards) > 0:
loss, actor_loss, critic_loss = agent.update(rewards, log_probs, values)
rewards_per_episode.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(rewards_per_episode[-50:])
print(f'Episode {episode+1}/{num_episodes} | '
f'Avg Reward: {avg_reward:.2f}')
return agent, rewards_per_episode
# 训练 Actor-Critic
ac_agent, ac_rewards = train_actor_critic(env, num_episodes=1000)5.4 算法对比
def compare_policy_methods(reinforce_rewards, ac_rewards, window=50):
"""对比 REINFORCE 和 Actor-Critic"""
plt.figure(figsize=(12, 6))
reinforce_smooth = np.convolve(reinforce_rewards, np.ones(window)/window, mode='valid')
ac_smooth = np.convolve(ac_rewards, np.ones(window)/window, mode='valid')
plt.plot(reinforce_smooth, label='REINFORCE', color='blue', linewidth=2)
plt.plot(ac_smooth, label='Actor-Critic', color='red', linewidth=2)
plt.xlabel('Episode')
plt.ylabel(f'Average Reward ({window} episodes)')
plt.title('REINFORCE vs Actor-Critic')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig('policy_gradient_comparison.png', dpi=150)
plt.show()
compare_policy_methods(reinforce_rewards, ac_rewards)
print("""
=== 策略梯度方法总结 ===
REINFORCE:
- 纯策略梯度,无基线
- 高方差,需要更多样本
- 实现简单
- 适合入门学习
Actor-Critic:
- 使用价值函数作为基线
- 方差更低,收敛更快
- 同时学习策略和价值
- 现代 RL 算法的基础
选择建议:
- 学习/演示:REINFORCE
- 实际应用:Actor-Critic 或其变体(PPO、A3C)
""")第六章 实战案例五:PPO 算法玩 LunarLander
6.1 项目概述
PPO(Proximal Policy Optimization)是目前最流行的强化学习算法之一,它通过限制策略更新幅度来保证训练稳定性。我们将实现 PPO 来解决 LunarLander 着陆问题。
PPO 核心创新:
- 截断目标函数:限制策略更新幅度
- GAE 优势估计:更准确的优势函数计算
- 多轮小批量更新:提高样本效率
6.2 PPO 算法实现
class PPOActor(nn.Module):
"""PPO Actor 网络"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(PPOActor, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
def get_action(self, state):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
logits = self.forward(state_tensor)
probs = F.softmax(logits, dim=-1)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action), probs
class PPOCritic(nn.Module):
"""PPO Critic 网络"""
def __init__(self, state_dim, hidden_dim=256):
super(PPOCritic, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
return self.network(x)
class PPOAgent:
"""PPO 智能体"""
def __init__(self, state_dim, action_dim, learning_rate=0.0003,
discount_factor=0.99, gae_lambda=0.95,
clip_epsilon=0.2, epochs=10, batch_size=64):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.gamma = discount_factor
self.lam = gae_lambda
self.clip_epsilon = clip_epsilon
self.epochs = epochs
self.batch_size = batch_size
# 网络
self.actor = PPOActor(state_dim, action_dim).to(self.device)
self.critic = PPOCritic(state_dim).to(self.device)
# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=learning_rate)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=learning_rate)
def select_action(self, state):
action, log_prob, probs = self.actor.get_action(state)
value = self.critic(torch.FloatTensor(state).unsqueeze(0).to(self.device))
return action, log_prob, value.item()
def compute_gae(self, rewards, values, dones):
"""计算 GAE 优势"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t]
gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
advantages = torch.FloatTensor(advantages).to(self.device)
returns = advantages + torch.FloatTensor(values).to(self.device)
return advantages, returns
def update(self, states, actions, old_log_probs, rewards, dones):
"""PPO 更新"""
states = torch.FloatTensor(np.array(states)).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
old_log_probs = torch.FloatTensor(old_log_probs).to(self.device)
# 计算价值
with torch.no_grad():
values = [self.critic(s.unsqueeze(0)).item() for s in states]
# 计算 GAE
advantages, returns = self.compute_gae(rewards, values, dones)
# 标准化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-9)
# 多轮更新
dataset_size = len(states)
for _ in range(self.epochs):
indices = np.random.permutation(dataset_size)
for start in range(0, dataset_size, self.batch_size):
end = start + self.batch_size
batch_indices = indices[start:end]
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_advantages = advantages[batch_indices]
batch_returns = returns[batch_indices]
# 计算新策略的 log 概率
_, new_log_probs, _ = self.actor.get_action(batch_states.cpu().numpy()[0])
new_log_probs = new_log_probs.gather(1, batch_actions.unsqueeze(1)).squeeze()
# 计算比率
ratio = torch.exp(new_log_probs - batch_old_log_probs)
# 截断目标
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic 损失
values_pred = self.critic(batch_states).squeeze()
critic_loss = F.smooth_l1_loss(values_pred, batch_returns)
# 熵正则化(鼓励探索)
_, probs, _ = self.actor.get_action(batch_states.cpu().numpy()[0])
entropy = -(probs * torch.log(probs + 1e-9)).sum(dim=-1).mean()
# 总损失
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy
# 优化
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
self.actor_optimizer.step()
self.critic_optimizer.step()
return loss.item()
def train_ppo(env, num_episodes=500, update_timestep=2000):
"""训练 PPO 智能体"""
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPOAgent(state_dim, action_dim)
rewards_per_episode = []
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
states, actions, log_probs, rewards, dones = [], [], [], [], []
for step in range(1000):
action, log_prob, value = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
states.append(state)
actions.append(action)
log_probs.append(log_prob.item())
rewards.append(reward)
dones.append(done)
state = next_state
episode_reward += reward
# 达到更新步数
if len(states) >= update_timestep or done:
agent.update(states, actions, log_probs, rewards, dones)
states, actions, log_probs, rewards, dones = [], [], [], [], []
if done:
break
rewards_per_episode.append(episode_reward)
if (episode + 1) % 20 == 0:
avg_reward = np.mean(rewards_per_episode[-20:])
print(f'Episode {episode+1}/{num_episodes} | '
f'Avg Reward: {avg_reward:.2f}')
return agent, rewards_per_episode
# 训练 PPO
env = gym.make('LunarLander-v2')
ppo_agent, ppo_rewards = train_ppo(env, num_episodes=500)6.3 结果可视化与测试
def plot_ppo_results(rewards, window=20):
"""绘制 PPO 训练结果"""
plt.figure(figsize=(12, 6))
plt.plot(rewards, alpha=0.3, label='每集奖励')
plt.plot(np.convolve(rewards, np.ones(window)/window, mode='valid'),
color='red', linewidth=2, label=f'{window}集平均')
plt.axhline(y=200, color='green', linestyle='--', label='成功阈值 (200)')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('PPO 训练结果 (LunarLander)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig('ppo_results.png', dpi=150)
plt.show()
plot_ppo_results(ppo_rewards)
def test_ppo(agent, env, num_episodes=10, render=True):
"""测试 PPO 智能体"""
success_count = 0
total_reward = 0
for episode in range(num_episodes):
state, _ = env.reset()
episode_reward = 0
for step in range(1000):
if render:
env.render()
plt.pause(0.02)
action, _, _ = agent.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
state = next_state
episode_reward += reward
done = terminated or truncated
if done:
if reward > 0: # 成功着陆
success_count += 1
break
total_reward += episode_reward
print(f"回合 {episode+1}: 奖励 {episode_reward}")
env.close()
print(f"\n=== 测试结果 ===")
print(f"成功率:{success_count/num_episodes*100:.1f}%")
print(f"平均奖励:{total_reward/num_episodes:.1f}")
# 运行测试
test_ppo(ppo_agent, env, num_episodes=10)第七章 强化学习进阶话题
7.1 连续动作空间
对于连续动作空间(如机器人控制),需要使用不同的方法:
class ContinuousActor(nn.Module):
"""连续动作空间的 Actor"""
def __init__(self, state_dim, action_dim, action_bounds, hidden_dim=256):
super(ContinuousActor, self).__init__()
self.action_bounds = action_bounds
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Tanh() # 输出 [-1, 1]
)
def forward(self, x):
return self.network(x) * self.action_bounds
def get_action(self, state, noise=0.1):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action = self.forward(state_tensor)
# 添加探索噪声
if noise > 0:
action += torch.randn_like(action) * noise
return action.clamp(-self.action_bounds, self.action_bounds).squeeze().numpy()
# 使用 TD3 或 SAC 处理连续动作
# TD3: Twin Delayed DDPG
# SAC: Soft Actor-Critic7.2 多智能体强化学习
# 多智能体场景
# 使用 MADDPG 或 QMIX 等算法
class MultiAgentEnv:
"""多智能体环境示例"""
def __init__(self, num_agents=2):
self.num_agents = num_agents
# 每个智能体有自己的观测和动作空间
def step(self, actions):
# 所有智能体同时执行动作
# 返回各自的观测、奖励和 done 信号
pass
# 推荐库:
# - PettingZoo: 多智能体环境集合
# - Ray RLlib: 分布式 RL 训练
# - Stable Baselines3: 单智能体 RL 库7.3 模仿学习
# 从专家演示中学习
class ImitationLearning:
"""行为克隆(Behavior Cloning)"""
def __init__(self, state_dim, action_dim):
self.policy = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
self.optimizer = optim.Adam(self.policy.parameters())
self.criterion = nn.MSELoss()
def train(self, expert_states, expert_actions, epochs=100):
"""从专家数据中学习"""
states = torch.FloatTensor(expert_states)
actions = torch.FloatTensor(expert_actions)
for epoch in range(epochs):
predictions = self.policy(states)
loss = self.criterion(predictions, actions)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
# 进阶:逆强化学习(IRL)
# 从专家行为中推断奖励函数7.4 模型基强化学习
# 学习环境动态模型
class WorldModel(nn.Module):
"""世界模型:预测下一个状态和奖励"""
def __init__(self, state_dim, action_dim):
super(WorldModel, self).__init__()
# 状态转移模型
self.transition = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, state_dim)
)
# 奖励模型
self.reward = nn.Sequential(
nn.Linear(state_dim + action_dim, 128),
nn.ReLU(),
nn.Linear(128, 1)
)
def predict(self, state, action):
x = torch.cat([state, action], dim=-1)
next_state = self.transition(x)
reward = self.reward(x)
return next_state, reward
def train(self, transitions, rewards):
"""从真实交互数据中学习世界模型"""
pass
# 使用世界模型进行规划
# Dreamer、MuZero 等算法第八章 最佳实践与常见问题
8.1 超参数调优指南
关键超参数:
| 参数 | 典型范围 | 影响 |
|---|---|---|
| 学习率 | 1e-5 ~ 1e-3 | 收敛速度和稳定性 |
| 折扣因子γ | 0.9 ~ 0.999 | 长期 vs 短期奖励 |
| 探索率ε | 1.0 → 0.01 | 探索与利用平衡 |
| 批次大小 | 32 ~ 512 | 梯度估计方差 |
| 回放缓冲区 | 1e4 ~ 1e6 | 样本效率 |
调优建议:
- 从文献中的默认值开始
- 先调学习率,再调其他参数
- 使用网格搜索或贝叶斯优化
- 记录所有实验配置
8.2 训练不稳定问题
常见问题及解决方案:
奖励不收敛
- 检查奖励缩放
- 增加探索
- 调整学习率
策略崩溃
- 使用 PPO 的截断机制
- 添加熵正则化
- 限制梯度范数
过拟合
- 增加环境随机性
- 使用领域随机化
- 正则化网络
8.3 实用技巧
# 1. 奖励缩放
class RewardScaler:
def __init__(self):
self.running_mean = 0
self.running_var = 1
self.count = 0
def normalize(self, reward):
self.count += 1
delta = reward - self.running_mean
self.running_mean += delta / self.count
self.running_var += delta * (reward - self.running_mean)
std = np.sqrt(self.running_var / self.count + 1e-8)
return reward / std
# 2. 课程学习
def curriculum_learning():
# 从简单任务开始,逐步增加难度
pass
# 3. 并行训练
# 使用 A3C、IMPALA 等异步算法
# 或使用 Ray 进行分布式训练8.4 推荐资源
书籍:
- 《Reinforcement Learning: An Introduction》(Sutton & Barto)
- 《Deep Reinforcement Learning Hands-On》
课程:
- David Silver RL Course (YouTube)
- Berkeley CS285
库和框架:
- Stable Baselines3
- Ray RLlib
- CleanRL
- Tianshou
环境:
- Gymnasium
- Procgen
- DeepMind Control Suite
- MetaWorld
总结
本教程系统介绍了强化学习的核心算法和应用:
- Q-Learning:经典的基于价值的算法,适合离散动作空间
- SARSA:on-policy 学习,更保守安全
- DQN:深度强化学习的里程碑,处理高维状态
- 策略梯度:直接学习策略,适合连续动作
- PPO:当前最流行的算法,稳定高效
强化学习应用前景:
- 游戏 AI(AlphaGo、AlphaStar)
- 机器人控制
- 自动驾驶
- 资源调度
- 推荐系统
- 金融交易
下一步学习建议:
- 深入理解每个算法的数学原理
- 在更多环境中实验(Atari、MuJoCo)
- 阅读经典论文(DQN、PPO、SAC)
- 参与 RL 竞赛和项目
记住,强化学习需要大量实验和调优。不要气馁,持续实践是成功的关键!
参考资料:
- Gymnasium 文档:https://gymnasium.farama.org/
- Stable Baselines3:https://stable-baselines3.readthedocs.io/
- Spinning Up in Deep RL:https://spinningup.openai.com/
- Papers With Code RL:https://paperswithcode.com/area/reinforcement-learning