copilot基于 DeepSeek-R1 思路构建 VLA 自动驾驶强化学习系统
下面给出一个完整示例代码,结合 DeepSeek‑R1 的技术方法,实现一个端到端的 VLA(视觉+语言行动)自动驾驶深度强化学习系统。
本示例使用 Gym 的 CarRacing‑v0 环境,并构造了一个网络:
- 图像采用 CNN 提取视觉特征
- 语义信息(例如固定指令“drive safely”)经过简单分词、嵌入与 LSTM 得到文本特征
- 将两部分特征融合后,经全连接层输出连续动作(采用高斯策略)及状态值
采用 PPO(含裁剪和 KL 惩罚)进行在线更新。注意实际 DeepSeek‑R1 系统会有更复杂的模块与奖励设计,此代码仅做结构演示,可根据需求进一步扩展。
下面就是完整代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
基于 DeepSeek-R1 技术方法实现 VLA 端到端自动驾驶深度强化学习示例。
本示例使用 Gym 的 CarRacing-v0 环境,结合视觉与语言(固定文本指令“drive safely”)输入,
构造一个 Actor-Critic 网络,并使用 PPO 算法进行在线更新。
"""import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import cv2
from torch.utils.data import DataLoader, TensorDataset# 选择设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")#########################
# 图像预处理函数
#########################
def preprocess(observation):"""将 CarRacing-v0 环境的 RGB 图像转为 float32 并归一化到 [0, 1],同时将 HWC 转换为 CHW 格式。"""obs = np.array(observation).astype(np.float32) / 255.0obs = np.transpose(obs, (2, 0, 1))return obs#########################
# 简单文本 tokenization
#########################
def tokenize(text, vocab, max_len):"""将输入文本转为 token id 序列(固定长度),采用简单的空格切分。未收录词使用 <unk> 代替。"""tokens = text.lower().strip().split()token_ids = [vocab.get(token, vocab["<unk>"]) for token in tokens]if len(token_ids) < max_len:# 补 pad(假设 <pad> 的 id 为 0)token_ids += [vocab["<pad>"]] * (max_len - len(token_ids))else:token_ids = token_ids[:max_len]return torch.LongTensor(token_ids)#########################
# 语言编码器:Embedding + LSTM
#########################
class LanguageEncoder(nn.Module):def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1):super(LanguageEncoder, self).__init__()self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)def forward(self, x):# x : [batch, seq_len]emb = self.embedding(x) # [batch, seq_len, embed_dim]outputs, (h_n, _) = self.lstm(emb)# 取最后一层最后时刻作为文本表示,维度:[batch, hidden_dim]return h_n[-1]#########################
# VLA Actor-Critic 网络
#########################
class VLAActorCritic(nn.Module):def __init__(self, input_shape, num_actions, vocab_size, embed_dim=32, lang_hidden_dim=64):"""input_shape: 图像尺寸,例如 (3, 96, 96)num_actions: 动作空间维度(CarRacing-v0 中为 3 [steering, gas, brake])"""super(VLAActorCritic, self).__init__()# 图像 CNN 部分self.cnn = nn.Sequential(nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())conv_out_size = self._get_conv_out(input_shape)self.visual_fc = nn.Sequential(nn.Linear(conv_out_size, 256),nn.ReLU())# 语言编码器部分(处理文本指令)self.lang_encoder = LanguageEncoder(vocab_size, embed_dim, lang_hidden_dim)# 融合视觉与语言特征combined_dim = 256 + lang_hidden_dimself.combined_fc = nn.Sequential(nn.Linear(combined_dim, 256),nn.ReLU())# Actor 分支:输出动作均值;使用可学习参数定义对数标准差self.actor_mean = nn.Linear(256, num_actions)self.actor_logstd = nn.Parameter(torch.zeros(num_actions))# Critic 分支:输出状态值self.critic = nn.Linear(256, 1)def _get_conv_out(self, shape):o = self.cnn(torch.zeros(1, *shape))return int(np.prod(o.size()))def forward(self, image, text):"""image: [batch, channels, height, width]text: [batch, seq_len] (token id 张量)返回:动作均值、动作标准差、状态值"""# 视觉特征vis_feat = self.cnn(image)vis_feat = vis_feat.view(vis_feat.size(0), -1)vis_feat = self.visual_fc(vis_feat) # [batch, 256]# 语言特征lang_feat = self.lang_encoder(text) # [batch, lang_hidden_dim]# 融合两部分combined = torch.cat([vis_feat, lang_feat], dim=-1)combined_feat = self.combined_fc(combined) # [batch, 256]# 输出策略和价值action_mean = self.actor_mean(combined_feat)action_std = torch.exp(self.actor_logstd).expand_as(action_mean)value = self.critic(combined_feat)return action_mean, action_std, value#########################
# 动作选择函数
#########################
def select_action(model, state, instruction):"""根据当前状态和文本指令选择动作state: (C, H, W) numpy 数组instruction: 已 tokenize 的文本张量 [seq_len]返回:动作(clip 后)、对应 log 概率、状态值以及当前策略分布参数"""state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)instruction_tensor = instruction.unsqueeze(0).to(device) # [1, seq_len]action_mean, action_std, value = model(state_tensor, instruction_tensor)dist = torch.distributions.Normal(action_mean, action_std)action = dist.sample() # 采样动作action_logprob = dist.log_prob(action).sum(dim=-1)action = action.detach().cpu().numpy()[0]# CarRacing 环境要求:steering ∈ [-1, 1],gas、brake ∈ [0, 1]lower_bound = np.array([-1.0, 0.0, 0.0])upper_bound = np.array([1.0, 1.0, 1.0])action = np.clip(action, lower_bound, upper_bound)return action, action_logprob.detach(), value.detach(), \action_mean.detach(), action_std.detach()#########################
# RolloutBuffer
#########################
class RolloutBuffer:def __init__(self):self.states = []self.actions = []self.logprobs = []self.rewards = []self.is_terminals = []self.values = []self.action_means = []self.action_stds = []def clear(self):self.states.clear()self.actions.clear()self.logprobs.clear()self.rewards.clear()self.is_terminals.clear()self.values.clear()self.action_means.clear()self.action_stds.clear()#########################
# GAE 计算
#########################
def compute_gae(rewards, values, dones, gamma, lam, next_value):advantages = []gae = 0.0values = values + [next_value]for step in reversed(range(len(rewards))):delta = rewards[step] + gamma * values[step + 1] * (1 - dones[step]) - values[step]gae = delta + gamma * lam * (1 - dones[step]) * gaeadvantages.insert(0, gae)returns = [adv + val for adv, val in zip(advantages, values[:-1])]return advantages, returns#########################
# PPO 更新(加入 KL 惩罚)
#########################
def ppo_update(model, optimizer, buffer, clip_epsilon, c1, beta, epochs, batch_size, gamma, lam, next_value, instr):# 计算优势和折扣回报advantages, returns = compute_gae(buffer.rewards, buffer.values, buffer.is_terminals, gamma, lam, next_value)states = torch.FloatTensor(np.array(buffer.states)).to(device)actions = torch.FloatTensor(np.array(buffer.actions)).to(device)old_logprobs = torch.FloatTensor(np.array(buffer.logprobs)).to(device)returns = torch.FloatTensor(np.array(returns)).to(device)advantages = torch.FloatTensor(np.array(advantages)).to(device)advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)old_action_means = torch.FloatTensor(np.array(buffer.action_means)).to(device)old_action_stds = torch.FloatTensor(np.array(buffer.action_stds)).to(device)dataset = TensorDataset(states, actions, old_logprobs, returns, advantages, old_action_means, old_action_stds)loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)for _ in range(epochs):for batch in loader:batch_states, batch_actions, batch_old_logprobs, batch_returns, batch_advantages, batch_old_means, batch_old_stds = [b.to(device) for b in batch]# 每个 batch 均使用同样的文本指令输入batch_instr = instr.unsqueeze(0).repeat(batch_states.size(0), 1).to(device)action_mean, action_std, values = model(batch_states, batch_instr)dist = torch.distributions.Normal(action_mean, action_std)new_logprobs = dist.log_prob(batch_actions).sum(dim=-1)ratio = torch.exp(new_logprobs - batch_old_logprobs)surr1 = ratio * batch_advantagessurr2 = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * batch_advantagesactor_loss = -torch.min(surr1, surr2).mean()critic_loss = nn.MSELoss()(values.squeeze(), batch_returns)# 计算旧策略与当前策略之间的 KL 散度old_dist = torch.distributions.Normal(batch_old_means, batch_old_stds)kl_loss = torch.distributions.kl_divergence(old_dist, dist).sum(dim=-1).mean()loss = actor_loss + c1 * critic_loss + beta * kl_lossoptimizer.zero_grad()loss.backward()optimizer.step()#########################
# 主训练流程
#########################
def main():# 创建环境env = gym.make("CarRacing-v0")num_actions = 3input_shape = (3, 96, 96) # CarRacing 的图像尺寸# 简单词汇表(可扩展)vocab = {"<pad>": 0, "<unk>": 1, "drive": 2, "safely": 3}vocab_size = len(vocab)max_seq_len = 5# 固定文本指令instruction_text = "drive safely"instruction = tokenize(instruction_text, vocab, max_seq_len) # [seq_len]# 创建 VLA Actor-Critic 模型model = VLAActorCritic(input_shape, num_actions, vocab_size, embed_dim=32, lang_hidden_dim=64).to(device)optimizer = optim.Adam(model.parameters(), lr=2.5e-4)# PPO 超参数max_episodes = 500max_timesteps = 1000update_timestep = 2048ppo_epochs = 4batch_size = 64gamma = 0.99lam = 0.95clip_epsilon = 0.2c1 = 0.5 # 值函数损失系数beta = 0.01 # KL 惩罚系数buffer = RolloutBuffer()timestep = 0for episode in range(max_episodes):state = preprocess(env.reset())episode_reward = 0done = Falsefor t in range(max_timesteps):timestep += 1action, logprob, value, action_mean, action_std = select_action(model, state, instruction)next_state, reward, done, _ = env.step(action)next_state = preprocess(next_state)episode_reward += rewardbuffer.states.append(state)buffer.actions.append(action)buffer.logprobs.append(logprob.item())buffer.rewards.append(reward)buffer.is_terminals.append(1.0 if done else 0.0)buffer.values.append(value.item())buffer.action_means.append(action_mean.cpu().numpy()[0])buffer.action_stds.append(action_std.cpu().numpy())state = next_stateif timestep % update_timestep == 0:# 若回合未结束,则用当前状态预测下一状态的值,否则为 0with torch.no_grad():state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)instr_tensor = instruction.unsqueeze(0).to(device)next_value = model(state_tensor, instr_tensor)[2].item() if not done else 0.0ppo_update(model, optimizer, buffer, clip_epsilon, c1, beta,ppo_epochs, batch_size, gamma, lam, next_value, instruction)buffer.clear()timestep = 0if done:breakprint(f"Episode: {episode}, Reward: {episode_reward:.2f}")env.close()if __name__ == "__main__":main()
进一步说明
-
视觉与语言联合处理
代码中通过 CNN 提取图像信息,通过简单的嵌入+LSTM 模块处理文本指令(本例中固定为“drive safely”),二者融合后指导决策。这种设计有助于实现具备自然语言指导能力的自动驾驶系统。 -
RL 算法
整个系统基于 PPO 算法训练,同时在损失函数中加入了 KL 惩罚项,鼓励策略更新时保持平稳,从而提升安全性和鲁棒性。 -
扩展方向
在真实应用中,可以将语言输入扩展为动态或复杂的任务描述,采用预训练大模型(例如 Transformer)替代简单 LSTM,同时构建更真实的仿真环境进行在线训练与迁移。
这一完整代码示例为你提供了基于 DeepSeek-R1 思路构建 VLA 自动驾驶强化学习系统的起点,欢迎进一步讨论和交流更多细节。