当前位置: 首页 > ds >正文

强化学习算法实战:一个例子实现sarsa、dqn、ddqn、qac、a2c、trpo、ppo

简介

在学习强化学习算法:sarsa、dqn、ddqn、qac、a2c、trpo、ppo时,由于有大量数据公式的推导,觉得十分晦涩,且听过就忘记了。
但是当把算法应用于实战时,代码的实现要比数据推导要直观很多。
接下来通过不同的算法实现gym中的CartPole-v1游戏。

游戏介绍

CartPole(推车倒立摆) 是强化学习中经典的基准测试任务,因为其直观可视、方便调试、状态和动作空间小等特性,常用于入门教学和算法验证。它的目标是训练一个智能体(agent)通过左右移动小车,使车顶的杆子尽可能长时间保持竖直不倒。
在这里插入图片描述

  • 环境:小车(cart)可以在水平轨道上左右移动,顶部通过关节连接一根自由摆动的杆子(pole)。
  • 目标:通过左右移动小车,使杆子的倾斜角度不超出阈值(±12°或±15°),同时小车不超出轨道范围(如轨道长度的±2.4单位)。简单理解为,就是杆子不会倒下里,小车不会飞出屏幕。
  • 状态:状态空间包含4个连续变量,分别是小车位置(x),小车速度(v),杆子角度(θ),杆子角速度(ω)
  • 动作:动作空间只有2个离线动作,分别是0(向左移动)或1(向右移动)
    奖励机制:每成功保持杆子不倒+1分,目前是让奖励最大化,即杆子永远不倒

DQN&DDQN&Dueling DQN

DQN

构建价值网络

DQN需要一个价值网络和目标网络,用来评估执行的动作得到的动作价值。这两个网络使用的是同一个网络结构,通常使用神经网络来实现。
输入的维度是状态空间的4个变量,分别是小车位置(x),小车速度(v),杆子角度(θ),杆子角速度(ω)。输出的维度是动作空间的维度,分别表示向左、向右移动的动作价值。
代码如下:

class QNetWork(nn.Module):def __init__(self, state_dim, action_dim):super().__init__()self.fc = nn.Sequential(nn.Linear(state_dim, 64),nn.ReLU(),nn.Linear(64, 64),nn.ReLU(),nn.Linear(64, action_dim))def forward(self,x):return self.fc(x)

构建DQN智能体

DQN智能体具备如下功能:

  1. 选择动作
  2. 存储经验
  3. 训练上面的神经网络:目的是返回给定状态下尽可能接近真实的动作价值
  4. 模型保存
  5. 评估价值网络
初始化参数

初始化q网络、目标网络、设置优化器、设置经验回放使用的缓存大小、训练的batch_size
dqn的折扣因子、探索率、更新目标网络的频率、智能体步数计数器初始化、记录最佳网络分数值参数初始化、评估轮数设置

    def __init__(self,state_dim, action_dim):# 神经网络网络相关self.q_net = QNetWork(state_dim,action_dim)self.target_net = QNetWork(state_dim,action_dim)self.target_net.load_state_dict(self.q_net.state_dict())self.optimizer = optim.Adam(self.q_net.parameters(), lr=1e-3)self.replay_buffer = deque(maxlen=10000)self.batch_size = 64# DQN相关self.gamma = 0.99self.epsilon = 0.1self.update_target_freq = 100self.step_count=0self.best_avg_reward = 0self.eval_episodes=5
动作选择

在DQN算法中,会采用epsilon参数来增加智能体选择动作的探索性,因此,动作选择的代码逻辑为:

  • 以epsilon的概率随机选择一个动作
  • 以1-epsilon的概率来选择价值网络返回结果中动作价值更大的动作
    def choose_action(self, state):if np.random.rand() < self.epsilon:return np.random.randint(0,2) //随机选择动作else:state_tensor = torch.FloatTensor(state)q_values = self.q_net(state_tensor) //调用价值网络选择动作return q_values.cpu().detach().numpy().argmax()
存储经验

DQN中,使用经验回放,那么我们需要预留缓冲区来存放历史的轨迹数据,方便后续取出用来训练网络
存储当前的状态、选择的动作、获得的奖励、下一个状态、游戏是否结束(即杆子是不是倒下)

    def store_experience(self,state, action, reward, next_state, done):self.replay_buffer.append((state, action, reward, next_state, done))
训练神经网络⭐️

最终要的部分,不同的算法,基本上也就是这一部分存在差异。
代码流程如下:

  1. 缓冲区中采样一个batch的数据
  2. 计算当前q值和目标q值(不同的dqn算法,主要是这两步计算的方式不同)
  3. 计算损失,除了后面ppo需要自己构造损失函数,其他的基本都是用MSELoss,也就是当前q值和目标q值的平方差,
  4. 梯度下降&更新网络,这两部都有现成的库来完成,基本上也是固定代码,
    def train(self):# 判断是否有足够经验用例用来学习if len(self.replay_buffer) < self.batch_size:return# 从缓冲区随机采样batch = random.sample(self.replay_buffer, self.batch_size)states, actions, rewards, next_states, dones = zip(*batch)states = torch.FloatTensor(np.array(states))actions = torch.LongTensor(actions)rewards = torch.FloatTensor(rewards)next_states = torch.FloatTensor(np.array(next_states))dones = torch.FloatTensor(dones)# 计算当前q值,输入当前状态至q网络获得所有q值,使用历史经验中选择的动作的q值作为当前q值current_q = self.q_net(states).gather(1, actions.unsqueeze(1)).squeeze()# 计算目标q值with torch.no_grad():# 使用目标网络计算下一个状态的所有q值,直接选择最大的q值作为下一状态的q值# 这里其实隐含着两个步骤:使用目标网络选择动作+计算下一状态q值next_q = self.target_net(next_states).max(1)[0]# 计算目标q值,1-dones表示当前状态执行动作后如果杆子倒了,那么为1-dones=0,否则为1target_q = rewards + self.gamma * next_q * (1 - dones)# 计算损失、梯度下降、网络更新loss = nn.MSELoss()(current_q,target_q)self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 计算步数,每隔一定步数更新目标网络self.step_count += 1if self.step_count % self.update_target_freq == 0:self.target_net.load_state_dict({k: v.clone() for k, v in self.q_net.state_dict().items()})
保存模型
    def save_model(self,path="./output/best_model_bak.pth"):torch.save(self.q_net.state_dict(),path)print(f"Model saved to {path}")
评估模型

这一步用来评估当前网络的好坏,我们评估得分更高的网络进行保存。
评估的逻辑如下:

  1. 从初始状态开始,使用我们训练的q网络选择动作控制杆子
  2. 累加轨迹下每个步骤获得的reward作为当前网络的得分
  3. 一直到杆子倒下(说明网络效果不够好)
  4. 或者分数足够高(说明网络可以控制杆子很长时间保持平衡,网络效果效果很好),再跳出循环
  5. 评估一定轮数后,取分数均值作为评估结果,分数越高越好

代码如下:

    def evaluate(self, env):# 入参是游戏环境,需要一个新的环境进行评估# 由于模型评估不需要随机探索,因此记录当前的epsilon,并设置智能体epsilon=0origin_epsilon = self.epsilonself.epsilon = 0total_rewards = []# self.eval_episodes表示评估的轮数for _ in range(self.eval_episodes):state = env.reset()[0]episode_reward = 0while True:# 使用智能体选择动作action = self.choose_action(state)# 与环境交互next_state, reward, done, _, _ = env.step(action)# 得到reward和下一状态episode_reward += rewardstate = next_state# 判断杆子是否倒下、分数是否足够高if done or episode_reward>2e4:breaktotal_rewards.append(episode_reward)# 网络需要回到评估前的状态继续训练,因此恢复epsilon的值self.epsilon = origin_epsilon# 返回平均分数return np.mean(total_rewards)

主流程

  1. 初始化游戏环境:游戏环境用于选择动作后交互,并获取reward和下一状态,初始化后获得初始状态
  2. 初始化智能体:构建智能体对象,设置epsilon=1,因为初始q网络效果不好,所以设置很大的epsilon让智能体自由探索环境,设置训练的episode数目
  3. 对于每个episode:
    • 对于episode中的每一步
      • 使用智能体选择动作
      • 与环境交互,并获得下一状态next_state,该动作的奖励值reward,杆子是否倒下done,存储本次经验
      • 智能体训练
    • 当一个episode结束后,更新一次epsilon参数,使epsilon慢慢衰减,这是因为随着训练,q网络效果变好,这时我们开始慢慢相信q网络给我们做出的选择
    • 每10个episode我们对q网络做一次评估,存储最佳的q网络

代码如下:

if __name__ == '__main__':env = gym.make('CartPole-v1')state_dim = env.observation_space.shape[0]action_dim = env.action_space.nagent = DQNAgent(state_dim, action_dim)config = {"episode": 600,"epsilon_start": 1.0,"epsilon_end": 0.01,"epsilon_decay": 0.995,}agent.epsilon = config["epsilon_start"]for episode in range(config["episode"]):state = env.reset()[0]total_reward = 0while True:action = agent.choose_action(state)next_state, reward, done, _, _ = env.step(action)agent.store_experience(state, action, reward, next_state, done)agent.train()total_reward += rewardstate=next_stateif done or total_reward > 2e4:breakagent.epsilon = max(config["epsilon_end"],agent.epsilon*config["epsilon_decay"])if episode % 10 == 0:eval_env = gym.make('CartPole-v1')avg_reward = agent.evaluate(eval_env)eval_env.close()if avg_reward > agent.best_avg_reward:agent.best_avg_reward = avg_rewardagent.save_model(path=f"output/best_model.pth")print(f"new best model saved with average reward: {avg_reward}")print(f"Episode: {episode},Train Reward: {total_reward},Best Eval Avg Reward: {agent.best_avg_reward}")

DDQN

DDQN与DQN的区别是:

  • DQN使用目标网络选择动作并计算q值
  • DDQN使用主网络(q_net)选择动作,使用目标网络计算q值

因此DDQN和DQN的代码区别仅有一行代码:

        # DQNcurrent_q = self.q_net(states).gather(1, actions.unsqueeze(1)).squeeze()with torch.no_grad():# 根据目标网络计算出来的所有Q值,选择了Q值最大的动作next_q = self.target_net(next_states).max(1)[0]target_q = rewards + self.gamma * next_q * (1 - dones)# DDQNcurrent_q = self.q_net(states).gather(1, actions.unsqueeze(1)).squeeze()with torch.no_grad():# 使用主网络选择下一状态要执行的动作next_actions = torch.LongTensor(torch.argmax(self.q_net(next_states),dim=1))# 使用目标网络计算执行这个动作后的q值next_q=self.target_net(next_states).gather(1,next_actions.unsqueeze(1)).squeeze()target_q = rewards + self.gamma * next_q * (1 - dones)

Dueling DQN

Dueling DQN与DQN的区别主要是神经网络结构不同

  1. DQN简单的神经网络,输入状态,输出不同动作的价值
  2. Dueling DQN的网络内有两个分支
    • 价值流:记录当前状态的状态价值,为标量
    • 优势流:表示该动作的优势,与动作价值的作用相同,都是评估动作的好坏,是一个动作维度的响亮

直接看代码,Dueling DQN的网络结构如下:

class QNetwork(nn.Module):def __init__(self, state_dim, action_dim):super(QNetwork, self).__init__()self.feature = nn.Sequential(nn.Linear(state_dim, 128),nn.ReLU())self.advantage = nn.Sequential(nn.Linear(128, 128),nn.ReLU(),nn.Linear(128, action_dim))self.value = nn.Sequential(nn.Linear(128, 128),nn.ReLU(),nn.Linear(128, 1))def forward(self, x):x = self.feature(x)# 优势流advantage = self.advantage(x)# 价值流value = self.value(x)# advantage - advantage.mean()消除优势函数基线影响return value + advantage - advantage.mean()
http://www.xdnf.cn/news/6851.html

相关文章:

  • RAGFlow升级到最新0.18.0新手指南
  • 【全解析】EN18031 标准下的 AUM 身份认证机制[上篇]
  • 国产三维CAD皇冠CAD(CrownCAD)建模教程:插接箱
  • B2C 商城转型指南:传统企业如何用 ZKmall模板商城实现电商化
  • 线上问题排查:JVM OOM问题如何排查和解决
  • Protobuf——Protocol Buffer详解(1)
  • RFID系统集成业务中,通过产业链上下游挖掘客户
  • Kubernetes + GlusterFS + Heketi 动态卷管理实践 !
  • 中大型水闸安全监测系统解决方案
  • 深度学习驱动下的目标检测技术:原理、算法与应用创新(三)
  • 【C#】 lock 关键字
  • 【笔记】导出Conda环境依赖以复现项目虚拟环境
  • 深度学习驱动下的目标检测技术:原理、算法与应用创新(二)
  • LLM学习笔记(七)注意力机制
  • C# NX二次开发-实体离散成点
  • 使用pyinstaller生成exe时,如何指定生成文件名字
  • Linux!启动~
  • WHAT - 前端同构 Isomorphic Javascript
  • Ubuntu系统安装VsCode
  • UAI 2025重磅揭晓:录取数据公布(附往届数据)
  • Python字符串常用内置函数详解
  • 独立开发者利用AI工具快速制作产品MVP
  • Qt功能区:Ribbon使用
  • Linux复习笔记(六)shell编程
  • 实现书签-第一部分
  • 中大型水闸安全监测系统建设实施方案
  • 在服务器上安装AlphaFold2遇到的问题(2)
  • 【C++】 —— 笔试刷题day_30
  • 【C++ | 内存管理】C++ weak_ptr 详解:成员函数、使用例子与实现原理
  • 力扣654题:最大二叉树(递归)