当前位置: 首页 > ai >正文

完整强化学习教程:基于4x4网格世界的智能体探索之旅(二)


第九章:连续动作深度Q网络 - 小智的精确控制

9.1 连续动作的挑战

之前我们的小智只能选择离散的动作:上、下、左、右、不动。但现实中的很多任务需要连续的动作

例子

  • 机器人控制:关节角度、力矩大小
  • 自动驾驶:方向盘角度、油门/刹车力度
  • 游戏AI:移动方向的精确角度和速度

问题:传统DQN无法处理连续动作空间,因为:

  1. 无法枚举所有可能的动作
  2. 无法计算 max ⁡ a Q ( s , a ) \max_a Q(s,a) maxaQ(s,a)
  3. 动作空间可能是无限的

9.2 连续动作的解决方案

9.2.1 动作离散化

最简单的方法:将连续动作空间离散化。

例子:假设小智可以选择移动的精确角度

  • 原始动作:角度 θ ∈ [ 0 , 2 π ) \theta \in [0, 2\pi) θ[0,2π)
  • 离散化:选择8个方向 { 0 ° , 45 ° , 90 ° , 135 ° , 180 ° , 225 ° , 270 ° , 315 ° } \{0°, 45°, 90°, 135°, 180°, 225°, 270°, 315°\} {,45°,90°,135°,180°,225°,270°,315°}

优点

  • 可以直接使用DQN
  • 实现简单

缺点

  • 失去了连续性
  • 离散化粒度难以选择
  • 高维动作空间会导致组合爆炸
9.2.2 函数近似方法

思路:用函数近似器来处理连续动作。

方法1:最大化优化

  • 对每个状态,用优化算法求解: a ∗ = arg ⁡ max ⁡ a Q ( s , a ; θ ) a^* = \arg\max_a Q(s,a;\theta) a=argmaxaQ(s,a;θ)
  • 问题:计算复杂度高,不适用于实时决策

方法2:NAF (Normalized Advantage Functions)

  • 将Q函数参数化为特殊形式,使得最大化有解析解
  • 问题:限制了Q函数的表达能力
9.2.3 演员-评论员方法

最成功的方法:结合策略梯度和价值函数!

  • 演员(Actor):策略网络,输出连续动作
  • 评论员(Critic):价值网络,评估动作好坏

这就是我们下一章要学习的内容!

9.3 网格世界中的连续动作

让我们扩展网格世界,使小智可以选择连续的移动方向和速度

9.3.1 连续动作定义

动作空间 A = { ( θ , v ) ∣ θ ∈ [ 0 , 2 π ) , v ∈ [ 0 , 1 ] } \mathcal{A} = \{(\theta, v) | \theta \in [0, 2\pi), v \in [0, 1]\} A={(θ,v)θ[0,2π),v[0,1]}

其中:

  • θ \theta θ:移动方向角度
  • v v v:移动速度(0表示不动,1表示最大速度)
9.3.2 动作到位置的映射

位置更新
x t + 1 = x t + v cos ⁡ ( θ ) ⋅ step_size x_{t+1} = x_t + v \cos(\theta) \cdot \text{step\_size} xt+1=xt+vcos(θ)step_size
y t + 1 = y t + v sin ⁡ ( θ ) ⋅ step_size y_{t+1} = y_t + v \sin(\theta) \cdot \text{step\_size} yt+1=yt+vsin(θ)step_size

9.4 动作离散化的DQN实现

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
import mathclass ContinuousGridWorldEnv:"""连续动作网格世界环境"""def __init__(self, grid_size=4, step_size=0.3):self.grid_size = grid_sizeself.step_size = step_size# 起点和终点(连续坐标)self.start_pos = np.array([0.5, 0.5])  # 左上角self.goal_pos = np.array([3.5, 3.5])   # 右下角# 障碍物(矩形区域)self.obstacles = [{'min': np.array([1.0, 1.0]), 'max': np.array([2.0, 2.0])},  # (2,2){'min': np.array([2.0, 2.0]), 'max': np.array([3.0, 3.0])}   # (3,3)]# 边界self.bounds = {'min': np.array([0.0, 0.0]), 'max': np.array([4.0, 4.0])}# 当前状态self.current_pos = self.start_pos.copy()def reset(self):"""重置环境"""self.current_pos = self.start_pos.copy()return self.get_state()def get_state(self):"""获取当前状态"""# 状态包含当前位置和目标位置的相对位置relative_pos = self.goal_pos - self.current_posstate = np.concatenate([self.current_pos, relative_pos])return statedef is_in_obstacle(self, pos):"""检查位置是否在障碍物中"""for obs in self.obstacles:if (obs['min'][0] <= pos[0] <= obs['max'][0] and obs['min'][1] <= pos[1] <= obs['max'][1]):return Truereturn Falsedef is_out_of_bounds(self, pos):"""检查位置是否超出边界"""return (pos[0] < self.bounds['min'][0] or pos[0] > self.bounds['max'][0] orpos[1] < self.bounds['min'][1] or pos[1] > self.bounds['max'][1])def step(self, action):"""执行动作"""theta, velocity = action# 计算新位置dx = velocity * math.cos(theta) * self.step_sizedy = velocity * math.sin(theta) * self.step_sizenew_pos = self.current_pos + np.array([dx, dy])# 检查碰撞if self.is_out_of_bounds(new_pos) or self.is_in_obstacle(new_pos):# 碰撞,不移动reward = -10.0done = Falseelse:# 合法移动old_dist = np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos = new_posnew_dist = np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist < 0.3:  # 到达目标reward = 100.0done = Trueelse:# 鼓励接近目标reward = (old_dist - new_dist) * 10 - 1  # 移动成本done = Falsereturn self.get_state(), reward, doneclass DiscretizedContinuousActionDQN:"""离散化连续动作的DQN"""def __init__(self, state_size, angle_bins=8, velocity_bins=3):self.state_size = state_sizeself.angle_bins = angle_binsself.velocity_bins = velocity_binsself.action_size = angle_bins * velocity_bins# 创建离散动作空间self.actions = self._create_discrete_actions()# DQN网络self.q_network = self._build_network()self.target_network = self._build_network()self.optimizer = optim.Adam(self.q_network.parameters(), lr=1e-3)# 训练参数self.epsilon = 1.0self.epsilon_min = 0.01self.epsilon_decay = 0.995self.gamma = 0.99# 经验回放self.memory = deque(maxlen=10000)self.batch_size = 32# 更新目标网络self.update_target_network()def _create_discrete_actions(self):"""创建离散化的动作空间"""actions = []for i in range(self.angle_bins):angle = 2 * math.pi * i / self.angle_binsfor j in range(self.velocity_bins):velocity = (j + 1) / self.velocity_bins  # 避免速度为0actions.append((angle, velocity))return actionsdef _build_network(self):"""构建Q网络"""return nn.Sequential(nn.Linear(self.state_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, self.action_size))def update_target_network(self):"""更新目标网络"""self.target_network.load_state_dict(self.q_network.state_dict())def select_action(self, state):"""选择动作"""if random.random() > self.epsilon:with torch.no_grad():state_tensor = torch.FloatTensor(state).unsqueeze(0)q_values = self.q_network(state_tensor)action_idx = q_values.argmax().item()else:action_idx = random.randrange(self.action_size)return action_idx, self.actions[action_idx]def store_transition(self, state, action_idx, reward, next_state, done):"""存储经验"""self.memory.append((state, action_idx, reward, next_state, done))def learn(self):"""学习"""if len(self.memory) < self.batch_size:return# 采样批次batch = random.sample(self.memory, self.batch_size)states, actions, rewards, next_states, dones = zip(*batch)states = torch.FloatTensor(states)actions = torch.LongTensor(actions)rewards = torch.FloatTensor(rewards)next_states = torch.FloatTensor(next_states)dones = torch.BoolTensor(dones)# 计算当前Q值current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))# 计算目标Q值with torch.no_grad():next_q_values = self.target_network(next_states).max(1)[0]target_q_values = rewards + (self.gamma * next_q_values * ~dones)# 计算损失loss = F.mse_loss(current_q_values.squeeze(), target_q_values)# 更新网络self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 更新探索率if self.epsilon > self.epsilon_min:self.epsilon *= self.epsilon_decaydef train_discretized_continuous_dqn():"""训练离散化连续动作DQN"""print("=== 离散化连续动作DQN训练 ===")env = ContinuousGridWorldEnv()agent = DiscretizedContinuousActionDQN(state_size=4)episode_rewards = []for episode in range(1000):state = env.reset()episode_reward = 0for step in range(200):# 选择动作action_idx, continuous_action = agent.select_action(state)# 执行动作next_state, reward, done = env.step(continuous_action)# 存储经验agent.store_transition(state, action_idx, reward, next_state, done)# 学习agent.learn()episode_reward += rewardstate = next_stateif done:breakepisode_rewards.append(episode_reward)# 更新目标网络if episode % 100 == 0:agent.update_target_network()if episode % 100 == 0:avg_reward = np.mean(episode_rewards[-100:])print(f"回合 {episode}: 平均奖励 = {avg_reward:.2f}, 探索率 = {agent.epsilon:.3f}")return agent, episode_rewards# 运行训练
# agent, rewards = train_discretized_continuous_dqn()

9.5 动作离散化的局限性

通过上面的实现,我们可以看到动作离散化的问题:

9.5.1 精度损失

问题:离散化会丢失精度

  • 8个角度只能表示45°的倍数
  • 无法精确控制移动方向
9.5.2 维度爆炸

问题:高维连续动作空间离散化后组合数激增

  • 2维动作(角度+速度):8×3=24个选择
  • 如果是机器人7个关节:每个关节10个离散值 = 10 7 10^7 107个动作!
9.5.3 不自然的动作

问题:真实世界的动作是连续的

  • 机器人关节角度应该是连续的
  • 汽车方向盘转角应该是平滑的

9.6 连续动作的真正解决方案

9.6.1 策略梯度方法

优势

  • 直接输出连续动作
  • 可以学习随机策略
  • 适用于高维动作空间
9.6.2 演员-评论员方法

结合两者优势

  • 演员:策略网络,输出连续动作
  • 评论员:价值网络,评估策略好坏

这就是我们下一章要深入学习的内容!

小结

连续动作深度Q网络面临的挑战:

  1. 理论挑战:无法直接最大化Q函数
  2. 计算挑战:动作空间可能无限大
  3. 实现挑战:需要新的算法框架

解决方向

  1. 动作离散化:简单但有局限性
  2. 特殊Q函数形式:如NAF,限制表达能力
  3. 演员-评论员:最成功的方法

下一章我们将学习演员-评论员算法,这是处理连续动作的主流方法!


第十章:演员-评论员算法 - 小智的双重人格

10.1 演员-评论员的核心思想

想象小智有两个大脑:

  • 演员大脑:负责做决策,选择动作
  • 评论员大脑:负责评价,判断动作好坏

这就是**演员-评论员(Actor-Critic)**算法的基本思想!

10.1.1 为什么需要两个网络?

单独使用策略梯度的问题

  • 方差大,学习不稳定
  • 需要完整回合才能计算回报

单独使用价值函数的问题

  • 连续动作空间难以处理 max ⁡ a Q ( s , a ) \max_a Q(s,a) maxaQ(s,a)
  • 只能评估,不能直接决策

演员-评论员的优势

  • 演员处理连续动作
  • 评论员提供即时反馈
  • 两者相互促进,共同提高

10.2 演员-评论员的数学原理

10.2.1 演员网络(策略网络)

参数化策略 π ( a ∣ s ; θ ) \pi(a|s;\theta) π(as;θ)

目标:最大化期望回报
J ( θ ) = E s ∼ ρ π , a ∼ π [ Q π ( s , a ) ] J(\theta) = \mathbb{E}_{s \sim \rho^\pi, a \sim \pi}[Q^\pi(s,a)] J(θ)=Esρπ,aπ[Qπ(s,a)]

策略梯度
∇ θ J ( θ ) = E s ∼ ρ π , a ∼ π [ Q π ( s , a ) ∇ θ ln ⁡ π ( a ∣ s ; θ ) ] \nabla_\theta J(\theta) = \mathbb{E}_{s \sim \rho^\pi, a \sim \pi}[Q^\pi(s,a) \nabla_\theta \ln \pi(a|s;\theta)] θJ(θ)=Esρπ,aπ[Qπ(s,a)θlnπ(as;θ)]

10.2.2 评论员网络(价值网络)

状态价值函数 V ( s ; ϕ ) V(s;\phi) V(s;ϕ)
目标:近似真实价值函数 V π ( s ) V^\pi(s) Vπ(s)

TD误差
δ = r + γ V ( s ′ ; ϕ ) − V ( s ; ϕ ) \delta = r + \gamma V(s';\phi) - V(s;\phi) δ=r+γV(s;ϕ)V(s;ϕ)

更新规则
ϕ ← ϕ + α v δ ∇ ϕ V ( s ; ϕ ) \phi \leftarrow \phi + \alpha_v \delta \nabla_\phi V(s;\phi) ϕϕ+αvδϕV(s;ϕ)

10.2.3 优势函数

关键思想:用TD误差估计优势函数

A ( s , a ) = Q ( s , a ) − V ( s ) ≈ r + γ V ( s ′ ) − V ( s ) = δ A(s,a) = Q(s,a) - V(s) \approx r + \gamma V(s') - V(s) = \delta A(s,a)=Q(s,a)V(s)r+γV(s)V(s)=δ

最终的演员-评论员更新

  • 评论员更新 ϕ ← ϕ + α v δ ∇ ϕ V ( s ; ϕ ) \phi \leftarrow \phi + \alpha_v \delta \nabla_\phi V(s;\phi) ϕϕ+αvδϕV(s;ϕ)
  • 演员更新 θ ← θ + α π δ ∇ θ ln ⁡ π ( a ∣ s ; θ ) \theta \leftarrow \theta + \alpha_\pi \delta \nabla_\theta \ln \pi(a|s;\theta) θθ+απδθlnπ(as;θ)

10.3 算法流程

算法:演员-评论员 (Actor-Critic)
输入:- 演员网络 π(a|s;θ)- 评论员网络 V(s;φ)- 学习率 α_π, α_v1. 初始化网络参数 θ, φ
2. for each episode:
3.     初始化状态 s
4.     for each step:
5.         根据 π(a|s;θ) 采样动作 a
6.         执行动作 a,获得 r, s'
7.         计算 TD误差:δ = r + γV(s';φ) - V(s;φ)
8.         更新评论员:φ ← φ + α_v δ ∇_φ V(s;φ)
9.         更新演员:θ ← θ + α_π δ ∇_θ ln π(a|s;θ)
10.        s ← s'
11.    end for
12. end for

10.4 网格世界中的演员-评论员实现

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
import matplotlib.pyplot as plt
import mathclass ActorNetwork(nn.Module):"""演员网络:输出动作的概率分布"""def __init__(self, state_size, action_size, hidden_size=64):super(ActorNetwork, self).__init__()self.fc1 = nn.Linear(state_size, hidden_size)self.fc2 = nn.Linear(hidden_size, hidden_size)# 对于连续动作,我们输出高斯分布的参数self.mean_head = nn.Linear(hidden_size, action_size)self.std_head = nn.Linear(hidden_size, action_size)def forward(self, state):x = F.relu(self.fc1(state))x = F.relu(self.fc2(x))# 输出动作的均值和标准差mean = self.mean_head(x)std = F.softplus(self.std_head(x)) + 1e-5  # 确保std > 0return mean, stddef sample_action(self, state):"""采样动作"""mean, std = self.forward(state)normal_dist = torch.distributions.Normal(mean, std)action = normal_dist.sample()log_prob = normal_dist.log_prob(action).sum(dim=-1)return action, log_probclass CriticNetwork(nn.Module):"""评论员网络:估计状态价值"""def __init__(self, state_size, hidden_size=64):super(CriticNetwork, self).__init__()self.fc1 = nn.Linear(state_size, hidden_size)self.fc2 = nn.Linear(hidden_size, hidden_size)self.value_head = nn.Linear(hidden_size, 1)def forward(self, state):x = F.relu(self.fc1(state))x = F.relu(self.fc2(x))value = self.value_head(x)return valueclass ActorCriticAgent:"""演员-评论员智能体"""def __init__(self, state_size, action_size, lr_actor=1e-3, lr_critic=1e-3, gamma=0.99):self.state_size = state_sizeself.action_size = action_sizeself.gamma = gamma# 网络self.actor = ActorNetwork(state_size, action_size)self.critic = CriticNetwork(state_size)# 优化器self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)# 统计信息self.actor_losses = []self.critic_losses = []self.episode_rewards = []def select_action(self, state):"""选择动作"""state_tensor = torch.FloatTensor(state).unsqueeze(0)action, log_prob = self.actor.sample_action(state_tensor)value = self.critic(state_tensor)return action.squeeze().numpy(), log_prob.item(), value.item()def update(self, state, action, reward, next_state, done, log_prob):"""更新网络"""state_tensor = torch.FloatTensor(state).unsqueeze(0)next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)# 计算TD误差current_value = self.critic(state_tensor)if done:target_value = rewardelse:next_value = self.critic(next_state_tensor)target_value = reward + self.gamma * next_value.item()td_error = target_value - current_value.item()# 更新评论员critic_loss = F.mse_loss(current_value, torch.FloatTensor([target_value]))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 更新演员actor_loss = -log_prob * td_error  # 负号因为我们要最大化self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 记录损失self.actor_losses.append(abs(actor_loss.item()))self.critic_losses.append(critic_loss.item())return td_errorclass ContinuousActionGridWorld:"""连续动作网格世界(适用于演员-评论员)"""def __init__(self, grid_size=4, step_size=0.2):self.grid_size = grid_sizeself.step_size = step_size# 位置self.start_pos = np.array([0.5, 0.5])self.goal_pos = np.array([3.5, 3.5])# 障碍物self.obstacles = [{'center': np.array([1.5, 1.5]), 'radius': 0.4},  # (2,2){'center': np.array([2.5, 2.5]), 'radius': 0.4}   # (3,3)]self.current_pos = self.start_pos.copy()def reset(self):"""重置环境"""self.current_pos = self.start_pos.copy()return self.get_state()def get_state(self):"""获取状态"""# 归一化位置信息normalized_pos = self.current_pos / self.grid_sizerelative_goal = (self.goal_pos - self.current_pos) / self.grid_size# 计算到障碍物的距离obstacle_distances = []for obs in self.obstacles:dist = np.linalg.norm(self.current_pos - obs['center'])obstacle_distances.append(dist / self.grid_size)state = np.concatenate([normalized_pos, relative_goal, obstacle_distances])return statedef is_in_obstacle(self, pos):"""检查是否在障碍物中"""for obs in self.obstacles:if np.linalg.norm(pos - obs['center']) < obs['radius']:return Truereturn Falsedef is_out_of_bounds(self, pos):"""检查是否越界"""return (pos[0] < 0 or pos[0] > self.grid_size or pos[1] < 0 or pos[1] > self.grid_size)def step(self, action):"""执行动作"""# 动作是2维:[dx, dy],范围是[-1, 1]action = np.clip(action, -1, 1)  # 确保动作在合理范围内# 计算新位置new_pos = self.current_pos + action * self.step_size# 检查碰撞if self.is_out_of_bounds(new_pos) or self.is_in_obstacle(new_pos):# 碰撞,不移动reward = -10.0done = Falseelse:# 合法移动old_dist = np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos = new_posnew_dist = np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist < 0.2:  # 到达目标reward = 100.0done = Trueelse:# 鼓励接近目标,惩罚时间消耗reward = (old_dist - new_dist) * 20 - 0.1done = Falsereturn self.get_state(), reward, donedef train_actor_critic():"""训练演员-评论员算法"""print("=== 演员-评论员算法训练 ===")env = ContinuousActionGridWorld()state_size = len(env.get_state())action_size = 2  # 2D移动agent = ActorCriticAgent(state_size, action_size)episode_rewards = []for episode in range(1000):state = env.reset()episode_reward = 0for step in range(200):# 选择动作action, log_prob, value = agent.select_action(state)# 执行动作next_state, reward, done = env.step(action)# 更新网络td_error = agent.update(state, action, reward, next_state, done, log_prob)episode_reward += rewardstate = next_stateif done:breakepisode_rewards.append(episode_reward)agent.episode_rewards.append(episode_reward)if episode % 100 == 0:avg_reward = np.mean(episode_rewards[-100:])print(f"回合 {episode}: 平均奖励 = {avg_reward:.2f}")return agent, episode_rewards# 训练智能体
# agent, rewards = train_actor_critic()

10.5 演员-评论员的变种

10.5.1 A2C (Advantage Actor-Critic)

改进:使用优势函数而不是TD误差

优势函数估计
A ( s , a ) = Q ( s , a ) − V ( s ) ≈ r + γ V ( s ′ ) − V ( s ) A(s,a) = Q(s,a) - V(s) \approx r + \gamma V(s') - V(s) A(s,a)=Q(s,a)V(s)r+γV(s)V(s)

10.5.2 A3C (Asynchronous Advantage Actor-Critic)

核心思想:多个智能体并行训练

  • 每个智能体独立探索环境
  • 定期同步网络参数
  • 提高训练效率和稳定性
10.5.3 TRPO (Trust Region Policy Optimization)

问题:策略更新步长难以控制
解决:限制策略更新的KL散度

max ⁡ θ E [ A ( s , a ) π θ ( a ∣ s ) π θ o l d ( a ∣ s ) ] \max_\theta \mathbb{E}[A(s,a) \frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)}] θmaxE[A(s,a)πθold(as)πθ(as)]
s.t.  E [ D K L ( π θ o l d ( ⋅ ∣ s ) ∣ ∣ π θ ( ⋅ ∣ s ) ) ] ≤ δ \text{s.t. } \mathbb{E}[D_{KL}(\pi_{\theta_{old}}(\cdot|s) || \pi_\theta(\cdot|s))] \leq \delta s.t. E[DKL(πθold(s)∣∣πθ(s))]δ

10.6 演员-评论员的优缺点

10.6.1 优点
  1. 处理连续动作:直接输出连续动作分布
  2. 在线学习:每步都可以更新,不需要等待回合结束
  3. 低方差:评论员提供基线,减少方差
  4. 理论基础:有坚实的理论保证
10.6.2 缺点
  1. 两个网络:需要平衡两个网络的学习速度
  2. 超参数敏感:学习率、网络结构等需要仔细调节
  3. 收敛性:可能不如纯策略梯度方法稳定
  4. 计算复杂:比单一网络方法更复杂

10.7 实际应用技巧

10.7.1 网络架构设计
class SharedActorCritic(nn.Module):"""共享特征提取的演员-评论员网络"""def __init__(self, state_size, action_size):super().__init__()# 共享特征提取层self.shared_layers = nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 128),nn.ReLU())# 演员头self.actor_head = nn.Sequential(nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, action_size))# 评论员头self.critic_head = nn.Sequential(nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 1))
10.7.2 动作空间处理
def process_continuous_action(self, raw_action):"""处理连续动作输出"""# 使用tanh激活函数将输出限制在[-1, 1]action = torch.tanh(raw_action)# 或者使用更复杂的映射# action = torch.sigmoid(raw_action) * 2 - 1return action

小结

演员-评论员算法是连续控制的重要方法:

  1. 双网络架构:演员负责决策,评论员负责评价
  2. 理论优雅:结合了策略梯度和价值函数的优点
  3. 实用性强:可以处理连续动作空间
  4. 持续发展:衍生出PPO、SAC等先进算法

下一章我们将学习稀疏奖励环境中的挑战和解决方案!


第十一章:稀疏奖励 - 小智的艰难探索

11.1 稀疏奖励的挑战

想象一个更困难的场景:小智被放在一个巨大的迷宫中,只有到达终点才能获得奖励,其他时候都没有任何反馈。这就是稀疏奖励的挑战!

11.1.1 稀疏奖励的定义

稀疏奖励环境:奖励信号很少出现,大部分时间奖励为0。

例子

  • 迷宫游戏:只有到达出口才有奖励
  • 机器人抓取:只有成功抓取物体才有奖励
  • 游戏AI:只有游戏结束时才知道输赢
11.1.2 为什么稀疏奖励困难?

信号稀少
R ( s , a ) = { + 1 如果到达目标 0 其他所有情况 R(s,a) = \begin{cases} +1 & \text{如果到达目标} \\ 0 & \text{其他所有情况} \end{cases} R(s,a)={+10如果到达目标其他所有情况

探索困难

  • 随机策略很难找到奖励
  • 没有奖励信号指导学习方向
  • 容易陷入局部最优

信用分配问题

  • 一个回合有100步,只在最后获得奖励
  • 哪些动作真正对成功有贡献?

11.2 稀疏奖励网格世界

让我们创建一个稀疏奖励版本的网格世界:

11.2.1 环境设计
class SparseRewardGridWorld(MDPGridWorld):"""稀疏奖励网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.99):super().__init__(success_prob, slip_prob, gamma)# 重新设计奖励函数 - 只有成功才有奖励self.sparse_rewards = self._build_sparse_reward_matrix()def _build_sparse_reward_matrix(self):"""构建稀疏奖励矩阵"""R = np.zeros((self.num_states, self.num_actions, self.num_states))goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])goal_state_idx = self.state_to_idx[goal_state_id]# 只有到达目标才有奖励for state_idx in range(self.num_states):for action_id in range(self.num_actions):for next_state_idx in range(self.num_states):if (self.transition_probs[state_idx][action_id][next_state_idx] > 0 and next_state_idx == goal_state_idx):R[state_idx][action_id][next_state_idx] = 1.0else:R[state_idx][action_id][next_state_idx] = 0.0return R
11.1.2 传统方法的失败
def test_traditional_methods_on_sparse_reward():"""测试传统方法在稀疏奖励下的表现"""print("=== 传统方法在稀疏奖励下的表现 ===")env = SparseRewardGridWorld()env.rewards = env.sparse_rewards  # 使用稀疏奖励# 测试Q-learningprint("1. Q-learning:")q_agent = QLearningGridWorld()q_agent.rewards = env.sparse_rewards# 训练更长时间q_agent.train_q_learning(num_episodes=5000, verbose=False)q_result = q_agent.evaluate_learned_policy(num_test_episodes=100)print(f"   成功率: {q_result['success_rate']:.2%}")print(f"   平均奖励: {q_result['avg_reward']:.2f}")# 大多数情况下成功率会很低,因为很难探索到目标return q_result# 运行测试
# result = test_traditional_methods_on_sparse_reward()

11.3 解决稀疏奖励的方法

11.3.1 奖励整形 (Reward Shaping)

基本思想:为环境添加额外的奖励信号,引导智能体探索。

距离奖励
R shaped ( s , a , s ′ ) = R ( s , a , s ′ ) + F ( s , s ′ ) R_{\text{shaped}}(s,a,s') = R(s,a,s') + F(s,s') Rshaped(s,a,s)=R(s,a,s)+F(s,s)

其中 F ( s , s ′ ) F(s,s') F(s,s)是整形函数。

常用整形函数
F ( s , s ′ ) = γ Φ ( s ′ ) − Φ ( s ) F(s,s') = \gamma \Phi(s') - \Phi(s) F(s,s)=γΦ(s)Φ(s)

其中 Φ ( s ) \Phi(s) Φ(s)是势函数(potential function)。

例子:距离势函数
Φ ( s ) = − ∥ s − s goal ∥ 2 \Phi(s) = -\|s - s_{\text{goal}}\|_2 Φ(s)=ssgoal2

class RewardShapingGridWorld(SparseRewardGridWorld):"""奖励整形网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.99, shaping_weight=0.1):super().__init__(success_prob, slip_prob, gamma)self.shaping_weight = shaping_weight# 构建整形奖励矩阵self.shaped_rewards = self._build_shaped_reward_matrix()def _build_shaped_reward_matrix(self):"""构建整形奖励矩阵"""R = self.sparse_rewards.copy()goal_pos = np.array(self.goal_pos)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]state_pos = np.array(self.state_id_to_pos(state_id))for action_id in range(self.num_actions):for next_state_idx in range(self.num_states):if self.transition_probs[state_idx][action_id][next_state_idx] > 0:next_state_id = self.idx_to_state[next_state_idx]next_state_pos = np.array(self.state_id_to_pos(next_state_id))# 计算势函数差old_potential = -np.linalg.norm(state_pos - goal_pos)new_potential = -np.linalg.norm(next_state_pos - goal_pos)# 添加整形奖励shaping_reward = self.gamma * new_potential - old_potentialR[state_idx][action_id][next_state_idx] += self.shaping_weight * shaping_rewardreturn Rdef test_reward_shaping():"""测试奖励整形"""print("=== 奖励整形测试 ===")# 原始稀疏奖励sparse_env = SparseRewardGridWorld()sparse_env.rewards = sparse_env.sparse_rewards# 奖励整形shaped_env = RewardShapingGridWorld(shaping_weight=0.1)shaped_env.rewards = shaped_env.shaped_rewards# 比较结果results = {}for name, env in [("稀疏奖励", sparse_env), ("奖励整形", shaped_env)]:q_agent = QLearningGridWorld()q_agent.rewards = env.rewardsq_agent.transition_probs = env.transition_probsq_agent.train_q_learning(num_episodes=2000, verbose=False)result = q_agent.evaluate_learned_policy(num_test_episodes=100)results[name] = resultprint(f"{name}:")print(f"  成功率: {result['success_rate']:.2%}")print(f"  平均奖励: {result['avg_reward']:.2f}")return results# 运行测试
# results = test_reward_shaping()
11.3.2 课程学习 (Curriculum Learning)

基本思想:从简单任务开始,逐渐增加难度。

例子

  1. 第一阶段:目标在距离起点1步的位置
  2. 第二阶段:目标在距离起点2步的位置
  3. 最终阶段:目标在原始位置
class CurriculumGridWorld:"""课程学习网格世界"""def __init__(self, start_pos=(1, 1), final_goal=(4, 4)):self.start_pos = start_posself.final_goal = final_goal# 设计课程:从近到远的目标位置self.curriculum = [(2, 2),  # 第一阶段:简单目标(3, 3),  # 第二阶段:中等难度(4, 4)   # 第三阶段:最终目标]self.current_stage = 0def get_current_environment(self):"""获取当前阶段的环境"""current_goal = self.curriculum[self.current_stage]env = SparseRewardGridWorld()# 修改目标位置env.goal_pos = current_goalenv.sparse_rewards = env._build_sparse_reward_matrix()env.rewards = env.sparse_rewardsreturn envdef should_advance_stage(self, success_rate):"""判断是否应该进入下一阶段"""return success_rate > 0.8  # 成功率超过80%就进入下一阶段def advance_stage(self):"""进入下一阶段"""if self.current_stage < len(self.curriculum) - 1:self.current_stage += 1return Truereturn Falsedef train_with_curriculum():"""使用课程学习训练"""print("=== 课程学习训练 ===")curriculum = CurriculumGridWorld()agent = QLearningGridWorld()while True:# 获取当前阶段环境env = curriculum.get_current_environment()agent.rewards = env.rewardsagent.transition_probs = env.transition_probsprint(f"阶段 {curriculum.current_stage + 1}: 目标位置 {env.goal_pos}")# 在当前阶段训练agent.train_q_learning(num_episodes=1000, verbose=False)# 评估性能result = agent.evaluate_learned_policy(num_test_episodes=100)success_rate = result['success_rate']print(f"  成功率: {success_rate:.2%}")# 判断是否进入下一阶段if curriculum.should_advance_stage(success_rate):if not curriculum.advance_stage():print("课程学习完成!")breakelse:print("  继续当前阶段训练...")return agent# 运行课程学习
# trained_agent = train_with_curriculum()
11.3.3 好奇心驱动探索 (Curiosity-Driven Exploration)

核心思想:给智能体内在动机去探索新奇的状态。

ICM (Intrinsic Curiosity Module)

  1. 前向模型:预测下一个状态的特征
  2. 逆向模型:从状态特征预测动作
  3. 好奇心奖励:预测误差作为内在奖励

数学formulation
r i = η ∥ ϕ ( s t + 1 ) − ϕ ^ ( s t + 1 ) ∥ 2 r_i = \eta \|\phi(s_{t+1}) - \hat{\phi}(s_{t+1})\|^2 ri=ηϕ(st+1)ϕ^(st+1)2

其中 ϕ ( s ) \phi(s) ϕ(s)是状态特征, ϕ ^ ( s t + 1 ) \hat{\phi}(s_{t+1}) ϕ^(st+1)是预测的下一状态特征。

class CuriosityModule(nn.Module):"""好奇心模块"""def __init__(self, state_size, action_size, feature_size=64):super().__init__()# 特征提取网络self.feature_net = nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, feature_size))# 前向模型:从特征和动作预测下一状态特征self.forward_model = nn.Sequential(nn.Linear(feature_size + action_size, 128),nn.ReLU(),nn.Linear(128, feature_size))# 逆向模型:从特征变化预测动作self.inverse_model = nn.Sequential(nn.Linear(feature_size * 2, 128),nn.ReLU(),nn.Linear(128, action_size))self.optimizer = optim.Adam(self.parameters(), lr=1e-3)def forward(self, state, action, next_state):"""计算好奇心奖励"""# 提取特征phi_s = self.feature_net(state)phi_s_next = self.feature_net(next_state)# 前向模型预测action_onehot = F.one_hot(action.long(), num_classes=5).float()pred_phi_s_next = self.forward_model(torch.cat([phi_s, action_onehot], dim=-1))# 逆向模型预测pred_action = self.inverse_model(torch.cat([phi_s, phi_s_next], dim=-1))# 计算损失forward_loss = F.mse_loss(pred_phi_s_next, phi_s_next)inverse_loss = F.cross_entropy(pred_action, action.long())# 好奇心奖励(预测误差)curiosity_reward = forward_loss.detach()return curiosity_reward, forward_loss, inverse_lossdef update(self, forward_loss, inverse_loss):"""更新好奇心模块"""total_loss = forward_loss + inverse_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()class CuriosityDrivenAgent(QLearningGridWorld):"""好奇心驱动的Q-learning智能体"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.99, curiosity_weight=0.1):super().__init__(success_prob, slip_prob, gamma)self.curiosity_weight = curiosity_weightself.curiosity_module = CuriosityModule(state_size=self.num_states, action_size=self.num_actions)# 用于存储好奇心奖励self.curiosity_rewards = []def state_to_tensor(self, state_id):"""将状态转换为one-hot张量"""state_vector = np.zeros(self.num_states)if state_id in self.valid_states:state_vector[self.valid_states.index(state_id)] = 1.0return torch.FloatTensor(state_vector)def curiosity_q_learning_step(self, state_id, action_id, reward, next_state_id, done=False):"""带好奇心的Q-learning更新"""# 原始Q-learning更新td_error = super().q_learning_step(state_id, action_id, reward, next_state_id, done)# 计算好奇心奖励state_tensor = self.state_to_tensor(state_id).unsqueeze(0)next_state_tensor = self.state_to_tensor(next_state_id).unsqueeze(0)action_tensor = torch.tensor([action_id])curiosity_reward, forward_loss, inverse_loss = self.curiosity_module(state_tensor, action_tensor, next_state_tensor)# 更新好奇心模块self.curiosity_module.update(forward_loss, inverse_loss)# 将好奇心奖励添加到原始奖励total_reward = reward + self.curiosity_weight * curiosity_reward.item()self.curiosity_rewards.append(curiosity_reward.item())# 用总奖励更新Q值state_idx = self.state_to_idx[state_id]next_state_idx = self.state_to_idx[next_state_id]current_q = self.Q_table[state_idx][action_id]if done:target_q = total_rewardelse:next_max_q = np.max(self.Q_table[next_state_idx])target_q = total_reward + self.gamma * next_max_qtd_error = target_q - current_qself.Q_table[state_idx][action_id] += self.alpha * td_errorreturn abs(td_error)def test_curiosity_driven_learning():"""测试好奇心驱动学习"""print("=== 好奇心驱动学习测试 ===")# 创建稀疏奖励环境env = SparseRewardGridWorld()# 普通Q-learningnormal_agent = QLearningGridWorld()normal_agent.rewards = env.sparse_rewardsnormal_agent.train_q_learning(num_episodes=3000, verbose=False)normal_result = normal_agent.evaluate_learned_policy(num_test_episodes=100)# 好奇心驱动Q-learningcurious_agent = CuriosityDrivenAgent(curiosity_weight=0.1)curious_agent.rewards = env.sparse_rewards# 重写训练循环以使用好奇心for episode in range(3000):current_state_id = curious_agent.pos_to_state_id(curious_agent.start_pos[0], curious_agent.start_pos[1])goal_state_id = curious_agent.pos_to_state_id(curious_agent.goal_pos[0], curious_agent.goal_pos[1])episode_reward = 0for step in range(1000):action_id = curious_agent.epsilon_greedy_action(current_state_id)next_state_id = curious_agent.simulate_transition(current_state_id, action_id)current_state_idx = curious_agent.state_to_idx[current_state_id]next_state_idx = curious_agent.state_to_idx[next_state_id]reward = curious_agent.rewards[current_state_idx][action_id][next_state_idx]done = (next_state_id == goal_state_id)# 使用好奇心Q-learning更新curious_agent.curiosity_q_learning_step(current_state_id, action_id, reward, next_state_id, done)episode_reward += rewardif done:breakcurrent_state_id = next_state_idcurious_agent.episode_rewards.append(episode_reward)curious_agent.epsilon = max(curious_agent.epsilon_min, curious_agent.epsilon * curious_agent.epsilon_decay)curious_result = curious_agent.evaluate_learned_policy(num_test_episodes=100)# 比较结果print("普通Q-learning:")print(f"  成功率: {normal_result['success_rate']:.2%}")print(f"  平均奖励: {normal_result['avg_reward']:.2f}")print("好奇心驱动Q-learning:")print(f"  成功率: {curious_result['success_rate']:.2%}")print(f"  平均奖励: {curious_result['avg_reward']:.2f}")return normal_result, curious_result# 运行测试
# normal_result, curious_result = test_curiosity_driven_learning()

11.4 其他稀疏奖励解决方法

11.4.1 HER (Hindsight Experience Replay)

核心思想:从失败中学习,将失败的轨迹重新标记为成功。

例子

  • 原始目标:到达(4,4)
  • 实际到达:(3,2)
  • HER重新标记:假设目标就是(3,2),这次就是成功的!
11.4.2 探索策略

ε-贪心的改进

  • Upper Confidence Bound (UCB):考虑不确定性的探索
  • Thompson Sampling:贝叶斯探索
  • Count-based Exploration:访问次数较少的状态-动作对
11.4.3 分层强化学习

思想:将复杂任务分解为子任务

  • 高层策略:选择子目标
  • 底层策略:执行具体动作

11.5 稀疏奖励的理论分析

11.5.1 探索复杂度

定理:在稀疏奖励环境中,随机探索需要指数级时间找到奖励。

证明思路

  • 状态空间大小: ∣ S ∣ |\mathcal{S}| S
  • 随机策略找到目标的概率: 1 ∣ S ∣ \frac{1}{|\mathcal{S}|} S1
  • 期望时间: ∣ S ∣ |\mathcal{S}| S(对于我们的4×4网格,约14步)
11.5.2 奖励整形的理论保证

定理:势函数形式的奖励整形不改变最优策略。

证明
对于整形奖励 R ′ ( s , a , s ′ ) = R ( s , a , s ′ ) + γ Φ ( s ′ ) − Φ ( s ) R'(s,a,s') = R(s,a,s') + \gamma\Phi(s') - \Phi(s) R(s,a,s)=R(s,a,s)+γΦ(s)Φ(s)

新的Q函数: Q ′ ( s , a ) = Q ( s , a ) + Φ ( s ) Q'(s,a) = Q(s,a) + \Phi(s) Q(s,a)=Q(s,a)+Φ(s)

最优动作: arg ⁡ max ⁡ a Q ′ ( s , a ) = arg ⁡ max ⁡ a ( Q ( s , a ) + Φ ( s ) ) = arg ⁡ max ⁡ a Q ( s , a ) \arg\max_a Q'(s,a) = \arg\max_a (Q(s,a) + \Phi(s)) = \arg\max_a Q(s,a) argmaxaQ(s,a)=argmaxa(Q(s,a)+Φ(s))=argmaxaQ(s,a)

因此最优策略不变。

小结

稀疏奖励是强化学习中的重大挑战:

  1. 探索困难:缺乏奖励信号指导
  2. 学习缓慢:样本效率低下
  3. 信用分配:难以确定关键动作

解决方案

  1. 奖励整形:添加引导信号
  2. 课程学习:从简单到复杂
  3. 好奇心驱动:内在动机探索
  4. 经验重用:HER等技术

下一章我们将学习模仿学习,看看如何从专家演示中学习!


第十二章:模仿学习 - 小智的学徒之路

12.1 模仿学习的动机

想象小智有一位经验丰富的导师,这位导师已经知道如何完美地完成任务。我们能否让小智直接从导师的演示中学习,而不是从零开始探索?

这就是**模仿学习(Imitation Learning)**的核心思想!

12.1.1 为什么需要模仿学习?

传统强化学习的问题

  • 探索代价高:机器人在学习过程中可能损坏
  • 时间成本大:需要大量试错才能学会
  • 安全风险:自动驾驶不能在学习过程中撞车

模仿学习的优势

  • 快速启动:从专家经验开始,而不是随机策略
  • 安全高效:避免危险的探索
  • 样本效率:专家演示信息量大
12.1.2 模仿学习的类型

监督学习视角

  • 输入:状态 s s s
  • 输出:专家动作 a ∗ a^* a
  • 目标:学习映射 s → a ∗ s \rightarrow a^* sa

强化学习视角

  • 从专家轨迹中推断奖励函数
  • 使用推断的奖励训练强化学习智能体

12.2 行为克隆 (Behavioral Cloning)

12.2.1 基本思想

行为克隆:将模仿学习看作监督学习问题。

数学公式
给定专家轨迹数据集 D = { ( s i , a i ) } i = 1 N \mathcal{D} = \{(s_i, a_i)\}_{i=1}^N D={(si,ai)}i=1N,学习策略 π θ ( a ∣ s ) \pi_\theta(a|s) πθ(as)最小化:

L ( θ ) = ∑ i = 1 N ℓ ( π θ ( a i ∣ s i ) , a i ) L(\theta) = \sum_{i=1}^N \ell(\pi_\theta(a_i|s_i), a_i) L(θ)=i=1N(πθ(aisi),ai)

对于离散动作: ℓ = − log ⁡ π θ ( a i ∣ s i ) \ell = -\log \pi_\theta(a_i|s_i) =logπθ(aisi)(交叉熵损失)
对于连续动作: ℓ = ∥ a i − π θ ( s i ) ∥ 2 \ell = \|a_i - \pi_\theta(s_i)\|^2 =aiπθ(si)2(均方误差损失)

12.2.2 分布偏移问题

核心问题:训练分布与测试分布不匹配。

原因

  • 训练时:在专家状态分布下学习
  • 测试时:在学到的策略状态分布下执行
  • 小错误会累积,导致分布偏移

数学分析
假设单步错误概率为 ϵ \epsilon ϵ T T T步后期望错误为 O ( ϵ T 2 ) O(\epsilon T^2) O(ϵT2)

12.3 网格世界中的行为克隆

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as pltclass ExpertGridWorld(MDPGridWorld):"""生成专家演示的网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.99):super().__init__(success_prob, slip_prob, gamma)# 计算最优策略作为专家策略self.expert_policy = self._compute_expert_policy()def _compute_expert_policy(self):"""计算专家策略(最优策略)"""optimal_V = self.value_iteration()optimal_policy = self.extract_optimal_policy(optimal_V)return optimal_policydef generate_expert_demonstrations(self, num_episodes=100):"""生成专家演示数据"""demonstrations = []for episode in range(num_episodes):trajectory = []current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])for step in range(1000):  # 防止无限循环# 专家选择最优动作action_id = self.expert_policy[current_state_id]# 记录状态-动作对trajectory.append((current_state_id, action_id))# 执行动作next_state_id = self.simulate_transition(current_state_id, action_id)if next_state_id == goal_state_id:breakcurrent_state_id = next_state_iddemonstrations.extend(trajectory)return demonstrationsclass BehavioralCloningAgent:"""行为克隆智能体"""def __init__(self, state_size, action_size, lr=1e-3):self.state_size = state_sizeself.action_size = action_size# 策略网络self.policy_net = nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, action_size),nn.Softmax(dim=-1))self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)self.losses = []def state_to_tensor(self, state_id, valid_states):"""将状态ID转换为one-hot张量"""state_vector = np.zeros(len(valid_states))if state_id in valid_states:state_vector[valid_states.index(state_id)] = 1.0return torch.FloatTensor(state_vector)def train(self, demonstrations, valid_states, num_epochs=100, batch_size=32):"""训练行为克隆模型"""print(f"开始行为克隆训练,样本数: {len(demonstrations)}")# 转换数据格式states = []actions = []for state_id, action_id in demonstrations:state_tensor = self.state_to_tensor(state_id, valid_states)states.append(state_tensor)actions.append(action_id)states = torch.stack(states)actions = torch.LongTensor(actions)# 训练dataset = torch.utils.data.TensorDataset(states, actions)dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)for epoch in range(num_epochs):epoch_loss = 0for batch_states, batch_actions in dataloader:# 前向传播action_probs = self.policy_net(batch_states)# 计算交叉熵损失loss = F.cross_entropy(action_probs, batch_actions)# 反向传播self.optimizer.zero_grad()loss.backward()self.optimizer.step()epoch_loss += loss.item()avg_loss = epoch_loss / len(dataloader)self.losses.append(avg_loss)if (epoch + 1) % 10 == 0:print(f"  Epoch {epoch + 1}: Loss = {avg_loss:.4f}")print("行为克隆训练完成!")def select_action(self, state_tensor):"""选择动作"""with torch.no_grad():action_probs = self.policy_net(state_tensor)action = torch.argmax(action_probs).item()return actiondef evaluate(self, env, valid_states, num_episodes=100):"""评估学到的策略"""rewards = []lengths = []for _ in range(num_episodes):current_state_id = env.pos_to_state_id(env.start_pos[0], env.start_pos[1])goal_state_id = env.pos_to_state_id(env.goal_pos[0], env.goal_pos[1])episode_reward = 0steps = 0for step in range(1000):state_tensor = self.state_to_tensor(current_state_id, valid_states)action_id = self.select_action(state_tensor)next_state_id = env.simulate_transition(current_state_id, action_id)# 计算奖励current_state_idx = env.state_to_idx[current_state_id]next_state_idx = env.state_to_idx[next_state_id]reward = env.rewards[current_state_idx][action_id][next_state_idx]episode_reward += rewardsteps += 1if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idrewards.append(episode_reward)lengths.append(steps)avg_reward = np.mean(rewards)avg_length = np.mean(lengths)success_rate = sum(1 for r in rewards if r > 50) / len(rewards)return {'avg_reward': avg_reward,'avg_length': avg_length,'success_rate': success_rate}def test_behavioral_cloning():"""测试行为克隆"""print("=== 行为克隆测试 ===")# 创建专家环境expert_env = ExpertGridWorld()# 生成专家演示print("生成专家演示...")demonstrations = expert_env.generate_expert_demonstrations(num_episodes=50)print(f"收集到 {len(demonstrations)} 个状态-动作对")# 分析专家演示action_counts = defaultdict(int)for _, action_id in demonstrations:action_counts[action_id] += 1print("专家动作分布:")for action_id, count in action_counts.items():action_name = expert_env.actions[action_id]percentage = count / len(demonstrations) * 100print(f"  {action_name}: {count} ({percentage:.1f}%)")# 训练行为克隆智能体bc_agent = BehavioralCloningAgent(state_size=expert_env.num_states,action_size=expert_env.num_actions)bc_agent.train(demonstrations, expert_env.valid_states, num_epochs=50)# 评估行为克隆智能体bc_result = bc_agent.evaluate(expert_env, expert_env.valid_states)print(f"\n行为克隆结果:")print(f"  平均奖励: {bc_result['avg_reward']:.2f}")print(f"  平均步数: {bc_result['avg_length']:.1f}")print(f"  成功率: {bc_result['success_rate']:.2%}")# 与专家策略比较expert_result = expert_env.evaluate_policy_exactly(expert_env.expert_policy)print(f"\n专家策略(理论值):")print(f"  平均价值: {np.mean(expert_result):.2f}")return bc_agent, bc_result# 运行测试
# bc_agent, bc_result = test_behavioral_cloning()

12.4 DAgger算法

12.4.1 DAgger的基本思想

Dataset Aggregation (DAgger):解决分布偏移问题的在线方法。

核心思想

  1. 用当前策略收集数据
  2. 专家为这些状态提供标签
  3. 将新数据加入训练集
  4. 重新训练策略

算法流程

DAgger算法:
1. 初始化 D = ∅, π̂₁ = π*(专家策略)
2. for i = 1 to N:
3.     用 π̂ᵢ 收集轨迹,获得状态分布
4.     请专家为这些状态标注动作
5.     将新数据加入 D
6.     训练 π̂ᵢ₊₁ 在 D 上
7. end for
12.4.2 DAgger的理论分析

定理:DAgger的性能界限

如果专家策略是最优的,单步错误率为 ϵ \epsilon ϵ,则DAgger在 T T T步后的期望成本为:
E [ c T ] ≤ c π ∗ + O ( ϵ T ) \mathbb{E}[c_T] \leq c_{\pi^*} + O(\epsilon T) E[cT]cπ+O(ϵT)

相比行为克隆的 O ( ϵ T 2 ) O(\epsilon T^2) O(ϵT2),这是显著的改进!

12.5 DAgger实现

class DAggerAgent(BehavioralCloningAgent):"""DAgger智能体"""def __init__(self, state_size, action_size, lr=1e-3):super().__init__(state_size, action_size, lr)self.all_demonstrations = []def collect_trajectories(self, env, valid_states, num_episodes=20):"""用当前策略收集轨迹"""new_states = []for episode in range(num_episodes):current_state_id = env.pos_to_state_id(env.start_pos[0], env.start_pos[1])goal_state_id = env.pos_to_state_id(env.goal_pos[0], env.goal_pos[1])for step in range(1000):new_states.append(current_state_id)# 用当前策略选择动作state_tensor = self.state_to_tensor(current_state_id, valid_states)action_id = self.select_action(state_tensor)# 执行动作next_state_id = env.simulate_transition(current_state_id, action_id)if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idreturn new_statesdef get_expert_labels(self, states, expert_policy):"""获取专家对这些状态的动作标签"""expert_actions = []for state_id in states:expert_action = expert_policy[state_id]expert_actions.append(expert_action)return list(zip(states, expert_actions))def dagger_train(self, env, expert_policy, valid_states, num_iterations=10):"""DAgger训练"""print("=== DAgger训练 ===")for iteration in range(num_iterations):print(f"DAgger迭代 {iteration + 1}")# 1. 收集轨迹new_states = self.collect_trajectories(env, valid_states)print(f"  收集了 {len(new_states)} 个新状态")# 2. 获取专家标签new_demonstrations = self.get_expert_labels(new_states, expert_policy)# 3. 添加到数据集self.all_demonstrations.extend(new_demonstrations)print(f"  总数据集大小: {len(self.all_demonstrations)}")# 4. 重新训练self.train(self.all_demonstrations, valid_states, num_epochs=20, batch_size=32)# 5. 评估当前性能result = self.evaluate(env, valid_states, num_episodes=50)print(f"  成功率: {result['success_rate']:.2%}")print(f"  平均奖励: {result['avg_reward']:.2f}")print("DAgger训练完成!")def compare_bc_and_dagger():"""比较行为克隆和DAgger"""print("=== 行为克隆 vs DAgger 比较 ===")# 创建环境env = ExpertGridWorld()# 生成初始专家演示(较少)initial_demos = env.generate_expert_demonstrations(num_episodes=20)print(f"初始专家演示: {len(initial_demos)} 个样本")# 行为克隆print("\n1. 行为克隆:")bc_agent = BehavioralCloningAgent(env.num_states, env.num_actions)bc_agent.train(initial_demos, env.valid_states, num_epochs=50)bc_result = bc_agent.evaluate(env, env.valid_states)# DAggerprint("\n2. DAgger:")dagger_agent = DAggerAgent(env.num_states, env.num_actions)dagger_agent.all_demonstrations = initial_demos.copy()dagger_agent.train(initial_demos, env.valid_states, num_epochs=20)dagger_agent.dagger_train(env, env.expert_policy, env.valid_states, num_iterations=5)dagger_result = dagger_agent.evaluate(env, env.valid_states)# 结果比较print("\n=== 结果比较 ===")print(f"{'方法':<15} | {'成功率':<10} | {'平均奖励':<10} | {'平均步数':<10}")print("-" * 55)print(f"{'行为克隆':<15} | {bc_result['success_rate']:<10.2%} | "f"{bc_result['avg_reward']:<10.2f} | {bc_result['avg_length']:<10.1f}")print(f"{'DAgger':<15} | {dagger_result['success_rate']:<10.2%} | "f"{dagger_result['avg_reward']:<10.2f} | {dagger_result['avg_length']:<10.1f}")return bc_result, dagger_result# 运行比较
# bc_result, dagger_result = compare_bc_and_dagger()

12.6 逆向强化学习 (Inverse Reinforcement Learning)

12.6.1 核心思想

问题:我们能从专家行为中推断出奖励函数吗?

逆向强化学习:从专家演示中学习奖励函数,然后用这个奖励函数训练策略。

数学表述
给定专家轨迹 { ( s i , a i ) } \{(s_i, a_i)\} {(si,ai)},找到奖励函数 R ( s , a ) R(s,a) R(s,a)使得专家策略是最优的。

12.6.2 最大熵IRL

思想:在满足约束的奖励函数中,选择使策略熵最大的那个。

目标函数
max ⁡ R H ( π ∗ ) − λ ∥ E π ∗ [ ϕ ( s , a ) ] − E expert [ ϕ ( s , a ) ] ∥ \max_{R} H(\pi^*) - \lambda \|\mathbb{E}_{\pi^*}[\phi(s,a)] - \mathbb{E}_{\text{expert}}[\phi(s,a)]\| RmaxH(π)λEπ[ϕ(s,a)]Eexpert[ϕ(s,a)]

其中 ϕ ( s , a ) \phi(s,a) ϕ(s,a)是特征函数, H ( π ∗ ) H(\pi^*) H(π)是策略熵。

12.7 生成对抗模仿学习 (GAIL)

12.7.1 基本思想

GAIL:用生成对抗网络的思想来做模仿学习。

组件

  • 生成器:策略网络 π θ \pi_\theta πθ,生成轨迹
  • 判别器:判别网络 D ϕ D_\phi Dϕ,区分专家轨迹和生成轨迹

目标函数

  • 判别器: max ⁡ ϕ E expert [ log ⁡ D ϕ ( s , a ) ] + E π θ [ log ⁡ ( 1 − D ϕ ( s , a ) ) ] \max_\phi \mathbb{E}_{\text{expert}}[\log D_\phi(s,a)] + \mathbb{E}_{\pi_\theta}[\log(1-D_\phi(s,a))] maxϕEexpert[logDϕ(s,a)]+Eπθ[log(1Dϕ(s,a))]
  • 生成器: min ⁡ θ E π θ [ log ⁡ ( 1 − D ϕ ( s , a ) ) ] − λ H ( π θ ) \min_\theta \mathbb{E}_{\pi_\theta}[\log(1-D_\phi(s,a))] - \lambda H(\pi_\theta) minθEπθ[log(1Dϕ(s,a))]λH(πθ)
class GAILAgent:"""GAIL智能体(简化版)"""def __init__(self, state_size, action_size):self.state_size = state_sizeself.action_size = action_size# 策略网络(生成器)self.policy_net = PolicyNetwork(state_size, action_size)self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-3)# 判别器网络self.discriminator = nn.Sequential(nn.Linear(state_size + action_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 1),nn.Sigmoid())self.discriminator_optimizer = optim.Adam(self.discriminator.parameters(), lr=1e-3)def train_discriminator(self, expert_data, generated_data):"""训练判别器"""# 专家数据标签为1,生成数据标签为0expert_labels = torch.ones(len(expert_data), 1)generated_labels = torch.zeros(len(generated_data), 1)# 计算损失expert_pred = self.discriminator(expert_data)generated_pred = self.discriminator(generated_data)loss = F.binary_cross_entropy(expert_pred, expert_labels) + \F.binary_cross_entropy(generated_pred, generated_labels)# 更新判别器self.discriminator_optimizer.zero_grad()loss.backward()self.discriminator_optimizer.step()return loss.item()def train_policy(self, states, actions):"""训练策略网络"""# 使用判别器的输出作为奖励信号state_action = torch.cat([states, F.one_hot(actions, self.action_size).float()], dim=1)fake_reward = -torch.log(1 - self.discriminator(state_action) + 1e-8)  # 避免log(0)# 策略梯度更新(简化版)action_probs = self.policy_net(states)log_probs = torch.log(action_probs.gather(1, actions.unsqueeze(1)))policy_loss = -(log_probs * fake_reward.detach()).mean()self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()return policy_loss.item()

12.8 模仿学习的挑战与解决方案

12.8.1 主要挑战
  1. 分布偏移:训练和测试分布不匹配
  2. 专家次优:专家可能不是最优的
  3. 多模态行为:专家可能有多种合理策略
  4. 部分观测:观测可能不完整
12.8.2 解决方案
  1. 在线方法:DAgger等交互式学习
  2. 鲁棒性:考虑专家噪声和错误
  3. 多样性:学习行为的分布而不是单一策略
  4. 状态增强:增加历史信息

12.9 模仿学习的应用

12.9.1 成功案例
  1. 自动驾驶:从人类司机演示学习
  2. 机器人控制:从动作捕捉数据学习
  3. 游戏AI:从人类玩家录像学习
  4. 对话系统:从人类对话数据学习
12.9.2 实际考虑
  1. 数据质量:专家演示的质量很重要
  2. 计算成本:在线方法需要专家实时标注
  3. 安全性:确保学到的策略是安全的
  4. 泛化性:需要在不同环境下工作

小结

模仿学习为强化学习提供了新的范式:

  1. 快速启动:从专家知识开始,避免随机探索
  2. 安全高效:特别适用于安全关键应用
  3. 理论发展:从行为克隆到GAIL的理论进步
  4. 实际应用:在多个领域都有成功案例

主要方法对比

  • 行为克隆:简单但有分布偏移问题
  • DAgger:解决分布偏移,但需要在线专家
  • IRL:学习奖励函数,更灵活
  • GAIL:对抗学习,处理复杂行为

下一章我们将学习深度确定性策略梯度(DDPG),这是处理连续控制的重要算法!


第十三章:深度确定性策略梯度 (DDPG) - 小智的精确控制大师

13.1 DDPG的核心思想

想象小智现在需要学会精确控制:不是简单的"向左"或"向右",而是"向左偏上30度移动,速度为0.7"。这种连续、精确的控制就是DDPG要解决的问题!

**DDPG (Deep Deterministic Policy Gradient)**结合了:

  • DQN的经验回放和目标网络
  • 演员-评论员的架构
  • 确定性策略梯度理论

13.2 从随机到确定性策略

13.2.1 随机策略的问题

之前我们学的策略都是随机的: π ( a ∣ s ) \pi(a|s) π(as),输出动作的概率分布。

连续动作空间的挑战

  • 动作空间可能是无限的
  • 需要积分计算期望: E a ∼ π [ Q ( s , a ) ] \mathbb{E}_{a \sim \pi}[Q(s,a)] Eaπ[Q(s,a)]
  • 计算复杂度高
13.2.2 确定性策略的优势

确定性策略 μ ( s ) = a \mu(s) = a μ(s)=a,直接输出一个动作。

优势

  • 不需要积分,直接计算 Q ( s , μ ( s ) ) Q(s, \mu(s)) Q(s,μ(s))
  • 更适合连续控制任务
  • 计算效率高

挑战

  • 如何探索?确定性策略没有随机性
  • 如何保证学习稳定性?

13.3 确定性策略梯度定理

13.3.1 数学推导

目标:最大化期望回报
J ( θ ) = E s ∼ ρ μ θ [ Q ( s , μ θ ( s ) ) ] J(\theta) = \mathbb{E}_{s \sim \rho^{\mu_\theta}}[Q(s, \mu_\theta(s))] J(θ)=Esρμθ[Q(s,μθ(s))]

确定性策略梯度定理
∇ θ J ( θ ) = E s ∼ ρ μ θ [ ∇ θ μ θ ( s ) ∇ a Q ( s , a ) ∣ a = μ θ ( s ) ] \nabla_\theta J(\theta) = \mathbb{E}_{s \sim \rho^{\mu_\theta}}[\nabla_\theta \mu_\theta(s) \nabla_a Q(s,a)|_{a=\mu_\theta(s)}] θJ(θ)=Esρμθ[θμθ(s)aQ(s,a)a=μθ(s)]

直观理解

  • ∇ a Q ( s , a ) \nabla_a Q(s,a) aQ(s,a):动作的价值梯度(哪个方向动作更好)
  • ∇ θ μ θ ( s ) \nabla_\theta \mu_\theta(s) θμθ(s):策略参数的梯度(如何调整参数)
  • 两者相乘:参数应该朝着提高动作价值的方向调整
13.3.2 与随机策略梯度的比较

随机策略梯度
∇ θ J ( θ ) = E [ ∇ θ log ⁡ π θ ( a ∣ s ) Q ( s , a ) ] \nabla_\theta J(\theta) = \mathbb{E}[\nabla_\theta \log \pi_\theta(a|s) Q(s,a)] θJ(θ)=E[θlogπθ(as)Q(s,a)]

确定性策略梯度
∇ θ J ( θ ) = E [ ∇ θ μ θ ( s ) ∇ a Q ( s , a ) ] \nabla_\theta J(\theta) = \mathbb{E}[\nabla_\theta \mu_\theta(s) \nabla_a Q(s,a)] θJ(θ)=E[θμθ(s)aQ(s,a)]

确定性版本避免了对动作空间的积分!

13.4 DDPG算法架构

13.4.1 四个网络

DDPG使用四个神经网络:

  1. 演员网络 μ ( s ∣ θ μ ) \mu(s|\theta^\mu) μ(sθμ):确定性策略
  2. 评论员网络 Q ( s , a ∣ θ Q ) Q(s,a|\theta^Q) Q(s,aθQ):动作价值函数
  3. 目标演员网络 μ ′ ( s ∣ θ μ ′ ) \mu'(s|\theta^{\mu'}) μ(sθμ):用于计算目标
  4. 目标评论员网络 Q ′ ( s , a ∣ θ Q ′ ) Q'(s,a|\theta^{Q'}) Q(s,aθQ):用于计算目标
13.4.2 更新规则

评论员更新(类似DQN):
y t = r t + γ Q ′ ( s t + 1 , μ ′ ( s t + 1 ∣ θ μ ′ ) ∣ θ Q ′ ) y_t = r_t + \gamma Q'(s_{t+1}, \mu'(s_{t+1}|\theta^{\mu'})|\theta^{Q'}) yt=rt+γQ(st+1,μ(st+1θμ)θQ)
L = 1 N ∑ i ( y i − Q ( s i , a i ∣ θ Q ) ) 2 L = \frac{1}{N}\sum_i (y_i - Q(s_i, a_i|\theta^Q))^2 L=N1i(yiQ(si,aiθQ))2

演员更新(策略梯度):
∇ θ μ J ≈ 1 N ∑ i ∇ a Q ( s i , a ∣ θ Q ) ∣ a = μ ( s i ) ∇ θ μ μ ( s i ∣ θ μ ) \nabla_{\theta^\mu} J \approx \frac{1}{N}\sum_i \nabla_a Q(s_i, a|\theta^Q)|_{a=\mu(s_i)} \nabla_{\theta^\mu} \mu(s_i|\theta^\mu) θμJN1iaQ(si,aθQ)a=μ(si)θμμ(siθμ)

目标网络软更新
θ Q ′ ← τ θ Q + ( 1 − τ ) θ Q ′ \theta^{Q'} \leftarrow \tau \theta^Q + (1-\tau)\theta^{Q'} θQτθQ+(1τ)θQ
θ μ ′ ← τ θ μ + ( 1 − τ ) θ μ ′ \theta^{\mu'} \leftarrow \tau \theta^\mu + (1-\tau)\theta^{\mu'} θμτθμ+(1τ)θμ

13.5 探索策略

13.5.1 噪声探索

确定性策略需要添加噪声来探索:
a t = μ ( s t ∣ θ μ ) + N t a_t = \mu(s_t|\theta^\mu) + \mathcal{N}_t at=μ(stθμ)+Nt

常用噪声

  • 高斯噪声 N t ∼ N ( 0 , σ 2 ) \mathcal{N}_t \sim \mathcal{N}(0, \sigma^2) NtN(0,σ2)
  • OU噪声:有时间相关性的噪声,更适合物理系统
13.5.2 OU噪声 (Ornstein-Uhlenbeck Process)

公式
d x t = θ ( μ − x t ) d t + σ d W t dx_t = \theta(\mu - x_t)dt + \sigma dW_t dxt=θ(μxt)dt+σdWt

离散化
x t + 1 = x t + θ ( μ − x t ) + σ ϵ t x_{t+1} = x_t + \theta(\mu - x_t) + \sigma \epsilon_t xt+1=xt+θ(μxt)+σϵt

其中 ϵ t ∼ N ( 0 , 1 ) \epsilon_t \sim \mathcal{N}(0,1) ϵtN(0,1)

特点

  • 有均值回归特性
  • 适合需要平滑探索的连续控制

13.6 网格世界中的DDPG实现

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):"""DDPG演员网络"""def __init__(self, state_size, action_size, hidden_size=256):super(ActorNetwork, self).__init__()self.fc1 = nn.Linear(state_size, hidden_size)self.fc2 = nn.Linear(hidden_size, hidden_size)self.fc3 = nn.Linear(hidden_size, action_size)# 初始化最后一层权重self.fc3.weight.data.uniform_(-3e-3, 3e-3)def forward(self, state):x = F.relu(self.fc1(state))x = F.relu(self.fc2(x))# 使用tanh激活函数确保输出在[-1, 1]范围action = torch.tanh(self.fc3(x))return actionclass CriticNetwork(nn.Module):"""DDPG评论员网络"""def __init__(self, state_size, action_size, hidden_size=256):super(CriticNetwork, self).__init__()# 状态处理层self.fc1 = nn.Linear(state_size, hidden_size)# 状态+动作处理层self.fc2 = nn.Linear(hidden_size + action_size, hidden_size)self.fc3 = nn.Linear(hidden_size, 1)# 初始化最后一层权重self.fc3.weight.data.uniform_(-3e-3, 3e-3)def forward(self, state, action):# 先处理状态x = F.relu(self.fc1(state))# 然后结合动作x = torch.cat([x, action], dim=1)x = F.relu(self.fc2(x))q_value = self.fc3(x)return q_valueclass OUNoise:"""Ornstein-Uhlenbeck噪声"""def __init__(self, action_size, mu=0.0, theta=0.15, sigma=0.2):self.action_size = action_sizeself.mu = muself.theta = thetaself.sigma = sigmaself.state = np.ones(self.action_size) * self.mudef reset(self):"""重置噪声"""self.state = np.ones(self.action_size) * self.mudef sample(self):"""采样噪声"""dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(self.action_size)self.state += dxreturn self.stateclass ReplayBuffer:"""经验回放缓冲区"""def __init__(self, capacity=100000):self.buffer = deque(maxlen=capacity)def push(self, state, action, reward, next_state, done):"""添加经验"""self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size):"""随机采样"""batch = random.sample(self.buffer, batch_size)state, action, reward, next_state, done = zip(*batch)return (torch.FloatTensor(state),torch.FloatTensor(action),torch.FloatTensor(reward),torch.FloatTensor(next_state),torch.BoolTensor(done))def __len__(self):return len(self.buffer)class DDPGAgent:"""DDPG智能体"""def __init__(self, state_size, action_size, lr_actor=1e-4, lr_critic=1e-3, gamma=0.99, tau=0.001, noise_scale=0.1):self.state_size = state_sizeself.action_size = action_sizeself.gamma = gammaself.tau = tauself.noise_scale = noise_scale# 创建网络self.actor = ActorNetwork(state_size, action_size)self.critic = CriticNetwork(state_size, action_size)self.target_actor = ActorNetwork(state_size, action_size)self.target_critic = CriticNetwork(state_size, action_size)# 初始化目标网络self.target_actor.load_state_dict(self.actor.state_dict())self.target_critic.load_state_dict(self.critic.state_dict())# 优化器self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)# 经验回放和噪声self.memory = ReplayBuffer()self.noise = OUNoise(action_size)# 统计信息self.actor_losses = []self.critic_losses = []def select_action(self, state, add_noise=True):"""选择动作"""state = torch.FloatTensor(state).unsqueeze(0)with torch.no_grad():action = self.actor(state).cpu().numpy()[0]if add_noise:action += self.noise.sample() * self.noise_scaleaction = np.clip(action, -1, 1)  # 确保动作在合理范围内return actiondef store_transition(self, state, action, reward, next_state, done):"""存储经验"""self.memory.push(state, action, reward, next_state, done)def learn(self, batch_size=64):"""学习过程"""if len(self.memory) < batch_size:return# 采样经验states, actions, rewards, next_states, dones = self.memory.sample(batch_size)# 更新评论员with torch.no_grad():next_actions = self.target_actor(next_states)next_q_values = self.target_critic(next_states, next_actions)target_q_values = rewards.unsqueeze(1) + (self.gamma * next_q_values * ~dones.unsqueeze(1))current_q_values = self.critic(states, actions)critic_loss = F.mse_loss(current_q_values, target_q_values)self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 更新演员predicted_actions = self.actor(states)actor_loss = -self.critic(states, predicted_actions).mean()self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 软更新目标网络self.soft_update(self.target_actor, self.actor)self.soft_update(self.target_critic, self.critic)# 记录损失self.actor_losses.append(actor_loss.item())self.critic_losses.append(critic_loss.item())def soft_update(self, target, source):"""软更新目标网络"""for target_param, param in zip(target.parameters(), source.parameters()):target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)class ContinuousControlGridWorld:"""连续控制网格世界"""def __init__(self, grid_size=4, step_size=0.1):self.grid_size = grid_sizeself.step_size = step_size# 位置范围self.start_pos = np.array([0.5, 0.5])self.goal_pos = np.array([3.5, 3.5])# 障碍物(圆形)self.obstacles = [{'center': np.array([1.5, 1.5]), 'radius': 0.3},{'center': np.array([2.5, 2.5]), 'radius': 0.3}]self.current_pos = self.start_pos.copy()def reset(self):"""重置环境"""self.current_pos = self.start_pos.copy()return self.get_state()def get_state(self):"""获取归一化状态"""# 当前位置(归一化)normalized_pos = self.current_pos / self.grid_size# 到目标的相对位置(归一化)relative_goal = (self.goal_pos - self.current_pos) / self.grid_size# 到障碍物的距离obstacle_distances = []for obs in self.obstacles:dist = np.linalg.norm(self.current_pos - obs['center']) / self.grid_sizeobstacle_distances.append(dist)state = np.concatenate([normalized_pos, relative_goal, obstacle_distances])return state.astype(np.float32)def is_collision(self, pos):"""检查碰撞"""# 边界检查if pos[0] < 0 or pos[0] > self.grid_size or pos[1] < 0 or pos[1] > self.grid_size:return True# 障碍物检查for obs in self.obstacles:if np.linalg.norm(pos - obs['center']) < obs['radius']:return Truereturn Falsedef step(self, action):"""执行动作"""# 动作是2D向量,范围[-1, 1]action = np.clip(action, -1, 1)# 转换为实际移动delta = action * self.step_sizenew_pos = self.current_pos + delta# 检查碰撞if self.is_collision(new_pos):# 碰撞,不移动,给予惩罚reward = -5.0done = Falseelse:# 正常移动old_dist = np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos = new_posnew_dist = np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist < 0.2:  # 到达目标reward = 100.0done = Trueelse:# 距离奖励 + 时间惩罚reward = (old_dist - new_dist) * 10 - 0.1done = Falsereturn self.get_state(), reward, donedef train_ddpg():"""训练DDPG智能体"""print("=== DDPG训练 ===")# 创建环境和智能体env = ContinuousControlGridWorld()state_size = len(env.get_state())action_size = 2  # 2D连续动作agent = DDPGAgent(state_size, action_size)# 训练参数num_episodes = 1000max_steps = 200episode_rewards = []for episode in range(num_episodes):state = env.reset()agent.noise.reset()  # 重置噪声episode_reward = 0for step in range(max_steps):# 选择动作action = agent.select_action(state, add_noise=True)# 执行动作next_state, reward, done = env.step(action)# 存储经验agent.store_transition(state, action, reward, next_state, done)# 学习if len(agent.memory) > 64:agent.learn()episode_reward += rewardstate = next_stateif done:breakepisode_rewards.append(episode_reward)# 衰减噪声if episode % 100 == 0:agent.noise_scale *= 0.95if episode % 100 == 0:avg_reward = np.mean(episode_rewards[-100:])print(f"Episode {episode}: Average Reward = {avg_reward:.2f}, "f"Noise Scale = {agent.noise_scale:.3f}")print("DDPG训练完成!")return agent, episode_rewardsdef evaluate_ddpg(agent, env, num_episodes=100):"""评估DDPG智能体"""print("评估DDPG智能体...")episode_rewards = []episode_lengths = []success_count = 0for episode in range(num_episodes):state = env.reset()episode_reward = 0steps = 0for step in range(200):# 不添加噪声的确定性策略action = agent.select_action(state, add_noise=False)next_state, reward, done = env.step(action)episode_reward += rewardsteps += 1state = next_stateif done:if reward > 50:  # 成功到达目标success_count += 1breakepisode_rewards.append(episode_reward)episode_lengths.append(steps)avg_reward = np.mean(episode_rewards)avg_length = np.mean(episode_lengths)success_rate = success_count / num_episodesprint(f"DDPG评估结果:")print(f"  平均奖励: {avg_reward:.2f}")print(f"  平均步数: {avg_length:.1f}")print(f"  成功率: {success_rate:.2%}")return {'avg_reward': avg_reward,'avg_length': avg_length,'success_rate': success_rate}def compare_all_algorithms():"""比较所有算法的性能"""print("=== 所有算法性能比较 ===")# 注意:这里只是展示框架,实际运行需要前面的环境algorithms = {'Q-learning': {'avg_reward': 85.2, 'success_rate': 0.82, 'avg_length': 12.3},'SARSA': {'avg_reward': 80.1, 'success_rate': 0.78, 'avg_length': 13.1},'Policy Gradient': {'avg_reward': 87.5, 'success_rate': 0.85, 'avg_length': 11.8},'PPO': {'avg_reward': 92.3, 'success_rate': 0.91, 'avg_length': 10.5},'DQN': {'avg_reward': 89.1, 'success_rate': 0.87, 'avg_length': 11.2},'Actor-Critic': {'avg_reward': 88.7, 'success_rate': 0.86, 'avg_length': 11.5},'DDPG': {'avg_reward': 94.5, 'success_rate': 0.93, 'avg_length': 9.8}}print(f"{'算法':<15} | {'平均奖励':<10} | {'成功率':<10} | {'平均步数':<10}")print("-" * 55)for name, results in algorithms.items():print(f"{name:<15} | {results['avg_reward']:<10.1f} | "f"{results['success_rate']:<10.2%} | {results['avg_length']:<10.1f}")# 训练和评估
# agent, rewards = train_ddpg()
# env = ContinuousControlGridWorld()
# results = evaluate_ddpg(agent, env)# 比较所有算法
# compare_all_algorithms()

13.7 DDPG的优缺点

13.7.1 优点
  1. 连续控制:专门为连续动作空间设计
  2. 样本效率:使用经验回放,提高样本利用率
  3. 稳定性:目标网络和软更新提高训练稳定性
  4. 确定性:输出确定性动作,适合需要精确控制的任务
13.7.2 缺点
  1. 超参数敏感:需要仔细调节学习率、噪声等参数
  2. 探索困难:确定性策略依赖噪声探索
  3. Q函数高估:可能存在Q值高估问题
  4. 收敛慢:相比一些简单方法,收敛可能较慢

13.8 DDPG的改进版本

13.8.1 TD3 (Twin Delayed DDPG)

主要改进

  1. 双评论员网络:减少Q值高估
  2. 延迟策略更新:降低更新频率
  3. 目标动作噪声:在目标动作上添加噪声
13.8.2 SAC (Soft Actor-Critic)

主要改进

  1. 最大熵强化学习:在奖励中加入熵项
  2. 随机策略:输出动作分布而不是确定性动作
  3. 自动温度调节:自适应调节探索程度

13.9 实际应用考虑

13.9.1 动作空间归一化
def normalize_action(action, action_low, action_high):"""将[-1, 1]的动作映射到实际动作空间"""return action_low + (action + 1.0) * 0.5 * (action_high - action_low)def denormalize_action(action, action_low, action_high):"""将实际动作映射到[-1, 1]"""return 2.0 * (action - action_low) / (action_high - action_low) - 1.0
13.9.2 奖励设计

好的奖励设计原则

  1. 稠密奖励:提供连续的反馈
  2. 平衡项:避免某一项奖励过于突出
  3. 归一化:保持奖励在合理范围内

小结

DDPG是连续控制领域的重要算法:

  1. 理论创新:确定性策略梯度定理的应用
  2. 工程实践:结合DQN和Actor-Critic的优点
  3. 广泛应用:机器人控制、游戏AI等领域
  4. 持续发展:启发了TD3、SAC等后续算法

DDPG的核心贡献

  • 解决了连续动作空间的强化学习问题
  • 提供了稳定的训练框架
  • 为后续算法奠定了基础

总结:小智的强化学习之旅

完整的学习路径

通过这个完整的教程,我们跟随小智走过了强化学习的完整旅程:

基础篇
  1. 状态表示 - 小智学会了如何理解世界
  2. 马尔科夫过程 - 小智理解了"活在当下"的重要性
  3. 贝尔曼方程 - 小智掌握了价值的递归本质
  4. 表格方法 - 小智学会了最基本的学习方法
进阶篇
  1. Q-learning - 小智学会了独立探索和学习
  2. 策略梯度 - 小智学会了直接优化行为
  3. PPO - 小智学会了稳定的策略改进
  4. 深度Q网络 - 小智拥有了神经网络大脑
高级篇
  1. 连续动作 - 小智学会了精确控制
  2. 演员-评论员 - 小智拥有了双重人格
  3. 稀疏奖励 - 小智学会了在困难中坚持
  4. 模仿学习 - 小智学会了从导师那里学习
  5. DDPG - 小智成为了连续控制大师

核心概念总结

数学基础
  • 马尔科夫性质 P ( s t + 1 ∣ s t , s t − 1 , . . . ) = P ( s t + 1 ∣ s t ) P(s_{t+1}|s_t, s_{t-1}, ...) = P(s_{t+1}|s_t) P(st+1st,st1,...)=P(st+1st)
  • 贝尔曼方程 V ( s ) = max ⁡ a ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V ( s ′ ) ] V(s) = \max_a \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma V(s')] V(s)=maxasP(ss,a)[R(s,a,s)+γV(s)]
  • 策略梯度定理 ∇ J ( θ ) = E [ ∇ log ⁡ π ( a ∣ s ) A ( s , a ) ] \nabla J(\theta) = \mathbb{E}[\nabla \log \pi(a|s) A(s,a)] J(θ)=E[logπ(as)A(s,a)]
算法分类
  • 基于价值:Q-learning, DQN
  • 基于策略:Policy Gradient, PPO
  • 演员-评论员:A2C, DDPG
  • 模仿学习:BC, DAgger, GAIL
关键技术
  • 经验回放:提高样本效率
  • 目标网络:稳定训练过程
  • 探索策略:平衡探索与利用
  • 函数近似:处理大状态空间

实践指南

选择算法的建议
  1. 离散动作 + 简单环境 → Q-learning
  2. 离散动作 + 复杂环境 → DQN, PPO
  3. 连续动作 + 确定性控制 → DDPG
  4. 连续动作 + 随机策略 → PPO, SAC
  5. 有专家演示 → 模仿学习
  6. 稀疏奖励 → 奖励整形, 好奇心驱动
调试技巧
  1. 从简单开始:先在小环境验证算法
  2. 监控指标:奖励、损失、探索率等
  3. 可视化学习:绘制学习曲线
  4. 对比基线:与简单方法比较
  5. 参数搜索:系统性地调节超参数

未来展望

强化学习仍在快速发展:

新兴方向
  • 元学习:学会如何学习
  • 多智能体:多个智能体协作/竞争
  • 分层强化学习:复杂任务的分解
  • 安全强化学习:保证安全约束
应用领域
  • 自动驾驶:路径规划和控制
  • 机器人:操作和导航
  • 游戏AI:策略游戏和电竞
  • 推荐系统:个性化推荐
  • 金融:算法交易
  • 医疗:治疗方案优化

最后的话

强化学习是一门结合了数学理论、计算机科学和心理学的综合性学科。通过这个教程,我们不仅学会了各种算法,更重要的是理解了智能决策的本质。

小智的故事告诉我们:

  • 探索很重要:没有探索就没有发现
  • 经验很宝贵:过去的经历指导未来的决策
  • 平衡是关键:探索与利用、偏差与方差
  • 持续学习:环境在变化,学习永不停止

希望这个教程能够帮助你在强化学习的道路上越走越远,就像小智一样,从一个简单的网格世界开始,最终掌握复杂世界的智能决策!

强化学习的核心思想:通过与环境的交互,在试错中学习最优的行为策略。这不仅是机器学习的重要分支,更是理解智能本质的重要途径。

愿你的AI之旅精彩而充实!🚀


完整强化学习教程到此结束。感谢陪伴小智走过这段学习之旅!


代码运行说明:

强化学习教程运行示例

概述

本文件 运行示例.py 包含了《完整强化学习教程-合并版.md》中所有主要算法的Python实现。通过小智在4×4网格世界中的冒险,演示了从基础状态表示到高级深度强化学习算法的完整学习路径。

环境设置

网格世界参数

  • 网格大小: 4×4
  • 起点: (1,1) - 左上角
  • 目标: (4,4) - 右下角
  • 障碍物: (2,2), (3,3)
  • 动作空间: 上、下、左、右、不动

奖励设计

  • 到达目标: +100
  • 每步移动: -1
  • 撞墙/撞障碍物: -10

包含的算法

基础理论 (Chapter 1-3)

  • 状态表示: GridWorldEnvironment
  • 马尔可夫过程: MarkovGridWorld (带转移概率)
  • 贝尔曼方程: MDPGridWorld + 价值迭代

经典算法 (Chapter 4-6)

  • Q-learning: QLearningAgent (off-policy)
  • SARSA: SARSAAgent (on-policy)
  • 表格方法: 蒙特卡洛、时间差分

深度强化学习 (Chapter 7-8)

  • 深度Q网络(DQN): DQNAgent (需要PyTorch)
  • ⚠️ 策略梯度: PolicyGradientAgent (需要PyTorch)
  • ⚠️ PPO算法: 在教程中有详细实现

高级算法 (Chapter 9-13)

  • 📚 连续动作深度Q网络
  • 📚 Actor-Critic算法
  • 📚 稀疏奖励处理
  • 📚 模仿学习
  • 📚 DDPG算法

依赖安装

基础依赖 (必需)

pip install numpy matplotlib

可选依赖

# 深度学习功能
pip install torch torchvision# 数据分析
pip install pandas

使用方法

直接运行

python3 运行示例.py

选择测试模式

运行后会显示菜单:

  1. 环境测试 (状态表示)
  2. 马尔可夫过程测试
  3. 贝尔曼方程和价值迭代
  4. Q-learning算法
  5. SARSA算法
  6. 深度Q网络 (需要PyTorch)
  7. 算法性能比较
  8. 完整算法演示 (推荐)
  9. 运行所有测试

程序化使用

import 运行示例# 创建环境
env = 运行示例.MDPGridWorld(gamma=0.9)# 训练Q-learning智能体
agent = 运行示例.QLearningAgent(env)
rewards = agent.train(num_episodes=1000)# 提取和可视化策略
policy = agent.extract_policy()
运行示例.visualize_policy(env, policy)

输出示例

策略可视化

=== 学到的策略 ===
符号含义: ↑上 ↓下 ←左 →右 ●不动
--------------------
| →  | →  | ↓  | 💎 |
--------------------
| ↑  | 🚫 | ↓  | ↓  |
--------------------
| ↑  | ↑  | 🚫 | ↓  |
--------------------
| →  | →  | →  | ↑  |

学习曲线

程序会自动生成matplotlib图表,显示不同算法的学习进度和性能比较。

教程对应关系

文件中的类/函数教程章节说明
GridWorldEnvironmentChapter 1状态表示基础
MarkovGridWorldChapter 2马尔可夫转移概率
MDPGridWorldChapter 3贝尔曼方程和价值迭代
QLearningAgentChapter 6Q-learning算法
SARSAAgentChapter 6SARSA算法
DQNAgentChapter 8深度Q网络

扩展学习

  1. 理论学习: 阅读《完整强化学习教程-合并版.md》了解详细数学推导
  2. 实验拓展: 尝试修改网格大小、障碍物位置、奖励函数
  3. 算法比较: 观察不同算法在相同环境下的表现差异
  4. 参数调优: 调整学习率、探索率、折扣因子等超参数

常见问题

Q: PyTorch相关功能无法使用

A: 安装PyTorch: pip install torch,或跳过深度学习相关测试

Q: matplotlib图表无法显示

A: 确保安装了GUI后端: pip install matplotlib[gui]

Q: 算法收敛慢

A: 尝试调整超参数,如增加训练回合数或调整学习率

Q: 想要实现其他算法

A: 参考现有代码结构,继承基础类并实现新的学习算法

http://www.xdnf.cn/news/13973.html

相关文章:

  • 【深尚想】SM8102ABC 95%高效降压芯片!TWS/物联网专用,2A输出+1.5mm超小体积
  • 中级统计师-经济学基础知识-第二章 企业生产理论
  • Python装饰器详解和默认装饰器
  • 网络流量分析之Heavy hitters和heavy changers
  • Jmeter本身耗资源导致压测不上去解决方案
  • Java中extends与implements深度解析:继承与接口实现的本质区别
  • 【Docker基础】Docker核心概念:仓库(Registry)详解
  • Vue实现悬浮图片弹出大图预览弹窗,弹窗顶部与图片顶部平齐
  • 隧道代理IP的使用与技术优势分析
  • 几种经典排序算法的C++实现
  • jenkins连接git仓库
  • 03 - ECA模块
  • git checkout 详解
  • 危险品运输行业观察
  • Kafka环境搭建全攻略:从Docker到Java实战
  • Logback-spring.xml 配置屏蔽特定路径的日志
  • Vue3+Element Plus动态表格列宽设置
  • 【写实交互数字人】实时数字人助力政务变革:技术、产品与应用价值的全景剖析
  • 【插件推荐】WebRTC Protect — 防止 IP 泄漏
  • 苹果越来越像安卓,华为越来越像苹果
  • 电路图识图基础知识-电动机软启动器技术解析与应用(二十五)
  • 【Zephyr 系列 22】从单机开发到平台化:构建你自己的 Zephyr 物联网开发平台
  • 【结合JSR380自定义校验】
  • Altera系列FPGA基于ADV7180解码PAL视频,纯verilog去隔行,提供2套Quartus工程源码和技术支持
  • 智慧物流园区——解读华为智慧物流园区解决方案【附全文阅读】
  • 上海市计算机学会竞赛平台2022年4月月赛丙组圆环独立集(一)
  • 基于 Spring Cloud Gateway + Sentinel 实现高并发限流保护机制
  • PHP基础-控制结构
  • 全链路实时感知:网络专线端到端监控运维
  • SwiftUI隐藏返回按钮保留右滑手势方案