【LLMs篇】18:基于EasyR1的Qwen2.5-VL GRPO训练
概述
本文档详细描述了使用 EasyR1 框架在 Geometry3K 数据集上运行 Qwen2.5-VL-7B-Instruct 模型进行 GRPO (Group Robust Policy Optimization) 训练的完整流程,包括算法原理、数据处理、模型集成和训练过程。
1. 训练启动脚本分析
1.1 脚本文件:examples/qwen2_5_vl_7b_geo3k_grpo.sh
#!/bin/bash
set -x
export PYTHONUNBUFFERED=1MODEL_PATH=/home/ubuntu/.cache/huggingface/hub/models--Qwen--Qwen2.5-VL-7B-Instruct/snapshots/cc594898137f460bfe9f0759e9844b3ce807cfb5python3 -m verl.trainer.main \config=examples/config.yaml \data.train_files=hiyouga/geometry3k@train \data.val_files=hiyouga/geometry3k@test \worker.actor.model.model_path=${MODEL_PATH} \trainer.experiment_name=qwen2_5_vl_7b_geo_grpo \trainer.n_gpus_per_node=4
1.2 关键参数说明
- 模型路径: 指定本地 Qwen2.5-VL-7B-Instruct 模型位置
- 数据集: 使用 HuggingFace 的
hiyouga/geometry3k
数据集 - GPU配置: 使用 4 GPUs per node
- 实验名称:
qwen2_5_vl_7b_geo_grpo
2. 配置文件详细解析
2.1 数据配置 (config.yaml
)
data:train_files: hiyouga/math12k@train # 被脚本覆盖为geometry3kval_files: hiyouga/math12k@test # 被脚本覆盖为geometry3kprompt_key: problem # 几何问题文本answer_key: answer # 标准答案image_key: images # 图像数据列表video_key: videos # 视频数据列表max_prompt_length: 2048 # 最大prompt长度max_response_length: 2048 # 最大回复长度rollout_batch_size: 512 # 数据生成批大小val_batch_size: 1024 # 验证批大小format_prompt: ./examples/format_prompt/math.jinja # Prompt模板min_pixels: 262144 # 图像最小像素数max_pixels: 4194304 # 图像最大像素数filter_overlong_prompts: true # 过滤超长prompt
2.2 算法配置
algorithm:adv_estimator: grpo # 使用GRPO算法disable_kl: false # 启用KL散度use_kl_loss: true # 使用KL损失kl_penalty: low_var_kl # KL惩罚类型kl_coef: 1.0e-2 # KL系数online_filtering: false # 在线过滤filter_key: overall # 过滤指标
2.3 Worker配置
Actor (策略网络)
worker.actor:global_batch_size: 128 # 全局批大小micro_batch_size_per_device_for_update: 1 # 更新时每设备微批大小micro_batch_size_per_device_for_experience: 2 # 体验生成时每设备微批大小max_grad_norm: 1.0 # 梯度裁剪padding_free: true # 无填充训练dynamic_batching: true # 动态批处理model:model_path: Qwen/Qwen2.5-7B-Instruct # 模型路径enable_gradient_checkpointing: true # 梯度检查点freeze_vision_tower: false # 不冻结视觉塔optim:lr: 1.0e-6 # 学习率weight_decay: 1.0e-2 # 权重衰减strategy: adamw # 优化器策略fsdp:enable_full_shard: true # 启用完全分片enable_cpu_offload: false # CPU卸载
Rollout (推理生成)
worker.rollout:n: 5 # 每个prompt生成5个回复temperature: 1.0 # 采样温度top_p: 1.0 # Top-p采样gpu_memory_utilization: 0.6 # GPU内存利用率tensor_parallel_size: 2 # 张量并行大小val_override_config:temperature: 0.6 # 验证时温度top_p: 0.95 # 验证时top-pn: 1 # 验证时每个prompt生成1个回复
Reward (奖励函数)
worker.reward:reward_type: batch # 批量奖励reward_function: ./examples/reward_function/math.py:compute_score # 奖励函数
2.4 训练器配置
trainer:total_epochs: 15 # 总训练轮数project_name: easy_r1 # 项目名称experiment_name: qwen2_5_7b_math_grpo # 实验名称nnodes: 1 # 节点数n_gpus_per_node: 8 # 每个节点GPU数val_freq: 5 # 验证频率val_before_train: true # 训练前验证save_freq: 5 # 保存频率save_limit: 3 # 保存限制
3. GRPO算法原理与实现
3.1 GRPO核心思想
Group Robust Policy Optimization (GRPO) 是一种针对强化学习中奖励分布不均匀问题的优化算法。其核心思想是:
- 组内标准化: 对于同一个prompt生成的多个回复,在组内进行标准化
- 相对比较: 重点关注同一prompt下不同回复的相对质量
- 鲁棒性: 减少不同prompt间奖励分布差异的影响
3.2 算法实现 (verl/trainer/core_algos.py:171-212
)
def compute_grpo_outcome_advantage(token_level_rewards: torch.Tensor, # token级奖励response_mask: torch.Tensor, # 回复掩码index: torch.Tensor, # 数据索引(用于分组)eps: float = 1e-6 # 数值稳定性参数
) -> torch.Tensor:"""GRPO优势计算函数算法步骤:1. 计算每个回复的总奖励分数2. 根据index将回复分组3. 计算每组的均值和标准差4. 进行组内标准化: (score - group_mean) / (group_std + eps)5. 将标准化结果广播到所有token位置"""# 计算每个回复的总分数scores = token_level_rewards.sum(dim=-1) # [batch_size]# 按index分组并计算统计量id2score = {}for i, idx in enumerate(index):idx = idx.item()if idx not in id2score:id2score[idx] = []id2score[idx].append(scores[i].item())# 计算每组的均值和标准差id2mean, id2std = {}, {}for idx in id2score:id2mean[idx] = torch.mean(torch.tensor(id2score[idx], device=scores.device))id2std[idx] = torch.std(torch.tensor(id2score[idx], device=scores.device))# 标准化并广播advantages = torch.zeros_like(token_level_rewards)for i in range(len(index)):idx = index[i].item()normalized_score = (scores[i] - id2mean[idx]) / (id2std[idx] + eps)# 将标准化分数广播到所有token位置advantages[i] = normalized_score * response_mask[i]return advantages
3.3 算法要求和特点
- 多样本要求:
rollout.n > 1
(配置中设为5) - 结果导向: 只考虑最终奖励,不使用token级奖励
- 稳定性: 通过组内标准化提供更稳定的梯度信号
- 适用场景: 特别适合数学推理等有明确正确答案的任务
4. Geometry3K数据集处理流程
4.1 数据集结构
Geometry3K数据集包含几何问题及其对应的图像:
{"problem": "几何问题描述,包含<image>标记","answer": "数值答案,如 48","images": [PIL.Image对象列表]
}
4.2 数据加载与预处理 (verl/utils/dataset.py
)
4.2.1 图像处理流程
def process_image(image, processor, min_pixels=262144, max_pixels=4194304):"""图像预处理函数处理步骤:1. 支持多种输入格式:路径字符串、字典、PIL.Image对象2. 转换为RGB格式3. 基于像素数限制调整图像尺寸4. 返回处理后的PIL.Image对象"""if isinstance(image, str):# 从路径加载图像image = Image.open(image).convert("RGB")elif isinstance(image, dict):# 处理字典格式if "bytes" in image:image = Image.open(io.BytesIO(image["bytes"])).convert("RGB")elif "path" in image:image = Image.open(image["path"]).convert("RGB")# 调整图像尺寸以符合像素限制width, height = image.sizecurrent_pixels = width * heightif current_pixels > max_pixels:# 缩小图像scale_factor = (max_pixels / current_pixels) ** 0.5new_width = int(width * scale_factor)new_height = int(height * scale_factor)image = image.resize((new_width, new_height), Image.LANCZOS)elif current_pixels < min_pixels:# 放大图像scale_factor = (min_pixels / current_pixels) ** 0.5new_width = int(width * scale_factor)new_height = int(height * scale_factor)image = image.resize((new_width, new_height), Image.LANCZOS)return image
4.2.2 多模态消息构建
def _build_messages(self, data_point):"""构建多模态消息处理流程:1. 解析问题文本中的<image>标记2. 提取对应的图像数据3. 构建content_list,交替包含文本和图像4. 生成符合HuggingFace格式的消息列表"""problem = data_point[self.prompt_key]images = data_point.get(self.image_key, [])# 解析<image>标记并构建content列表content_list = []parts = problem.split("<image>")for i, part in enumerate(parts):if part.strip():content_list.append({"type": "text", "text": part.strip()})# 在每个文本部分后添加对应的图像(如果存在)if i < len(images):processed_image = self.process_image(images[i])content_list.append({"type": "image", "image": processed_image})messages = [{"role": "user", "content": content_list}]return messages, [img for img in images if img is not None]
4.3 Prompt格式化 (examples/format_prompt/math.jinja
)
{{ content | trim }} You FIRST think about the reasoning process as an internal monologue and then provide the final answer. The reasoning process MUST BE enclosed within <think> </think> tags. The final answer MUST BE put in \boxed{}.
这个模板要求模型:
- 思考过程: 在
<think>
标签内进行推理 - 最终答案: 在
\boxed{}
内给出答案
4.4 数据转换流程
原始Geometry3K数据↓
{problem: "在三角形ABC中...<image>...求面积", answer: "48", images: [PIL.Image]}↓
应用math.jinja模板↓
"在三角形ABC中...<image>...求面积 You FIRST think about the reasoning process..."↓
构建多模态消息↓
[{"role": "user", "content": [{"type": "text", "text": "在三角形ABC中..."},{"type": "image", "image": <PIL.Image>},{"type": "text", "text": "...求面积 You FIRST think..."}
]}]↓
Qwen2VL处理器处理↓
{"input_ids": tensor([...]),"attention_mask": tensor([...]),"position_ids": tensor(3, seq_len), # 3D位置编码"pixel_values": tensor([...]), # 图像像素值"image_grid_thw": tensor([...]) # 图像网格信息
}
5. Qwen2.5-VL模型集成详解
5.1 模型加载 (verl/workers/fsdp_workers.py
)
def load_model():"""加载Qwen2.5-VL多模态模型"""model = AutoModelForImageTextToText.from_pretrained(model_path,torch_dtype=torch.bfloat16,attn_implementation="flash_attention_2", # 使用Flash Attentiontrust_remote_code=False,device_map=None # 由FSDP管理设备分配)# 启用梯度检查点以节省内存if enable_gradient_checkpointing:model.gradient_checkpointing_enable()# 视觉塔冻结选项if freeze_vision_tower:if hasattr(model, "model") and hasattr(model.model, "visual"):model.model.visual.requires_grad_(False)elif hasattr(model, "visual"):model.visual.requires_grad_(False)return model
5.2 多模态位置编码 (verl/models/transformers/qwen2_vl.py
)
Qwen2.5-VL使用特殊的3D旋转位置编码(MRoPE)处理多模态输入:
def get_rope_index(input_ids, image_grid_thw, video_grid_thw, attention_mask):"""生成多模态旋转位置编码索引返回形状:[3, seq_len]- 第0维:时间维度位置- 第1维:高度维度位置 - 第2维:宽度维度位置"""position_ids = torch.zeros(3, seq_len, dtype=torch.long, device=device)# 文本token使用标准位置编码text_positions = attention_mask.cumsum(dim=-1) - 1position_ids[0] = text_positions # 时间维度position_ids[1] = text_positions # 高度维度position_ids[2] = text_positions # 宽度维度# 图像token使用2D位置编码for image_info in image_grid_thw:t, h, w = image_infofor i in range(h):for j in range(w):pos = get_image_token_position(i, j)position_ids[0, pos] = 0 # 时间=0position_ids[1, pos] = i # 高度位置position_ids[2, pos] = j # 宽度位置return position_ids
5.3 FSDP集成和内存优化
# FSDP配置
fsdp_config = FSDPConfig(enable_full_shard=True, # 完全分片enable_cpu_offload=False, # CPU卸载enable_rank0_init=True, # rank0初始化mixed_precision=torch.bfloat16 # 混合精度
)# 参数和优化器卸载
offload_config = OffloadConfig(offload_params=True, # 参数卸载到CPUoffload_optimizer=True # 优化器状态卸载到CPU
)# 模型包装
model = FSDP(model,fsdp_config=fsdp_config,offload_config=offload_config
)
5.4 vLLM推理集成 (verl/workers/rollout/vllm_rollout_spmd.py
)
def setup_vllm_engine():"""配置vLLM推理引擎以支持多模态推理"""engine_args = EngineArgs(model=model_path,tensor_parallel_size=2, # 张量并行gpu_memory_utilization=0.6, # GPU内存利用率enforce_eager=False, # 启用CUDA图优化enable_chunked_prefill=False, # 分块预填充limit_mm_per_prompt={"image": 10}, # 限制每个prompt的图像数量trust_remote_code=False)engine = LLMEngine.from_engine_args(engine_args)return enginedef generate_responses(prompts, images_list, sampling_params):"""使用vLLM生成多模态响应"""# 构建多模态输入multi_modal_inputs = []for images in images_list:if images:multi_modal_inputs.append({"image": images})else:multi_modal_inputs.append({})# 批量生成outputs = engine.generate(prompts=prompts,sampling_params=sampling_params,multi_modal_data=multi_modal_inputs,use_tqdm=not disable_tqdm)return [output.outputs[0].text for output in outputs]
6. 奖励函数设计 (examples/reward_function/math.py
)
6.1 奖励函数组成
def compute_score(reward_inputs: list[dict[str, Any]], format_weight: float = 0.1):"""计算综合奖励分数奖励组成:1. format_reward: 格式奖励(是否包含<think>标签和\boxed{}答案)2. accuracy_reward: 准确性奖励(答案是否正确)3. overall: 综合得分 = (1-format_weight) * accuracy + format_weight * format"""scores = []for reward_input in reward_inputs:response = reward_input["response"]ground_truth = reward_input["ground_truth"]# 格式奖励:检查是否有思考过程和boxed答案format_score = format_reward(response)# 准确性奖励:使用mathruler库验证答案accuracy_score = accuracy_reward(response, ground_truth)# 综合得分overall_score = (1 - format_weight) * accuracy_score + format_weight * format_scorescores.append({"overall": overall_score,"format": format_score,"accuracy": accuracy_score})return scores
6.2 格式检查
def format_reward(response: str) -> float:"""检查回复格式是否正确要求:1. 包含<think>...</think>思考过程2. 包含\boxed{}最终答案"""pattern = re.compile(r"<think>.*</think>.*\\boxed\{.*\}.*", re.DOTALL)format_match = re.fullmatch(pattern, response)return 1.0 if format_match else 0.0
6.3 准确性检查
def accuracy_reward(response: str, ground_truth: str) -> float:"""检查答案准确性使用mathruler库:1. 从回复中提取\boxed{}中的答案2. 与标准答案进行比较"""answer = extract_boxed_content(response)return 1.0 if grade_answer(answer, ground_truth) else 0.0
7. 完整训练流程
7.1 训练循环概览
def training_loop():"""GRPO训练主循环"""for epoch in range(total_epochs):for batch in dataloader:# 1. 数据准备阶段prompts, ground_truths, images = prepare_batch(batch)# 2. Rollout阶段:生成多个回复responses = generate_multiple_responses(prompts=prompts,images=images,n_responses=5, # 每个prompt生成5个回复temperature=1.0,top_p=1.0)# 3. 奖励计算阶段rewards = compute_batch_rewards(responses, ground_truths)# 4. GRPO优势估计advantages = compute_grpo_outcome_advantage(token_level_rewards=rewards,response_mask=response_masks,index=batch_indices)# 5. 策略更新actor_loss = compute_policy_loss(log_probs=response_log_probs,advantages=advantages,old_log_probs=old_response_log_probs)# 6. KL散度惩罚kl_loss = compute_kl_loss(new_log_probs=response_log_probs,old_log_probs=old_response_log_probs)total_loss = actor_loss + kl_coef * kl_loss# 7. 反向传播和优化total_loss.backward()torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)optimizer.step()optimizer.zero_grad()# 8. 验证和保存if step % val_freq == 0:validate_model()if step % save_freq == 0:save_checkpoint()
7.2 关键训练阶段详解
7.2.1 Rollout阶段
def rollout_phase(actor_model, prompts, images, n_responses=5):"""使用vLLM进行高效的多回复生成"""all_responses = []all_log_probs = []# 为每个prompt生成n个回复for prompt, image_list in zip(prompts, images):responses = vllm_engine.generate(prompts=[prompt] * n_responses,multi_modal_data=[{"image": image_list}] * n_responses,sampling_params=SamplingParams(temperature=1.0,top_p=1.0,max_tokens=2048))# 计算回复的log概率(用于后续策略更新)log_probs = actor_model.compute_log_probs(prompt, responses, image_list)all_responses.extend(responses)all_log_probs.extend(log_probs)return all_responses, all_log_probs
7.2.2 奖励计算阶段
def reward_phase(responses, ground_truths):"""批量计算奖励"""reward_inputs = []for response, ground_truth in zip(responses, ground_truths):reward_inputs.append({"response": response,"ground_truth": ground_truth})# 使用批量奖励函数scores = compute_score(reward_inputs, format_weight=0.1)# 转换为tensoroverall_rewards = torch.tensor([s["overall"] for s in scores])format_rewards = torch.tensor([s["format"] for s in scores])accuracy_rewards = torch.tensor([s["accuracy"] for s in scores])return overall_rewards, format_rewards, accuracy_rewards
7.2.3 GRPO优势计算阶段
def grpo_advantage_phase(rewards, batch_indices):"""计算GRPO优势"""# 将奖励reshape为token级别(但值在每个position相同)token_rewards = rewards.unsqueeze(-1).expand(-1, max_seq_len)# 计算response mask(标识回复部分的token)response_masks = create_response_masks(responses, tokenizer)# 调用GRPO算法advantages = compute_grpo_outcome_advantage(token_level_rewards=token_rewards,response_mask=response_masks,index=batch_indices,eps=1e-6)return advantages
7.3 分布式训练协调
def distributed_training_setup():"""分布式训练设置"""# Ray集群初始化ray.init(address="auto")# FSDP进程组初始化torch.distributed.init_process_group("nccl")# 工作角色分配actor_workers = create_fsdp_workers(model_path=model_path,role="actor",n_gpus=4)rollout_workers = create_vllm_workers(model_path=model_path,role="rollout", tensor_parallel_size=2)ref_workers = create_fsdp_workers(model_path=model_path,role="reference",n_gpus=2,cpu_offload=True)reward_workers = create_reward_workers(reward_function_path="examples/reward_function/math.py:compute_score")return actor_workers, rollout_workers, ref_workers, reward_workers
7.4 内存和性能优化
7.4.1 内存优化策略
- FSDP完全分片: 将模型参数分片到多个GPU
- CPU卸载: 将不活跃参数卸载到CPU内存
- 梯度检查点: 重新计算部分前向传递以节省显存
- 混合精度: 使用bfloat16减少内存使用
- 动态批处理: 根据序列长度动态调整批大小
7.4.2 性能优化技术
- Flash Attention: 高效的注意力机制实现
- vLLM推理: 高吞吐量的推理引擎
- 张量并行: 在推理阶段使用多GPU并行
- Padding-Free训练: 避免不必要的填充token
- Chunked Prefill: 分块处理长序列的prefill阶段
8. 实验配置和预期结果
8.1 硬件要求
- GPU: 4x A100 80GB 或同等性能GPU
- 内存: 每GPU至少64GB系统内存
- 存储: 高速SSD存储,至少500GB可用空间
8.2 训练时间估算
- 数据集大小: Geometry3K约3000个样本
- 每轮训练时间: 约2-3小时(4xA100)
- 总训练时间: 15轮约30-45小时
- 验证频率: 每5轮进行一次验证
8.3 预期性能指标
- 格式准确率: >95% (模型学会使用正确的输出格式)
- 答案准确率: 在Geometry3K测试集上预期达到70-80%
- 综合得分: overall score预期达到0.75-0.85
- 收敛性: 通常在10-15轮内收敛
8.4 监控指标
-
训练指标:
- Actor loss (策略损失)
- KL divergence (与初始模型的KL散度)
- Reward scores (奖励分数分布)
- Gradient norms (梯度范数)
-
验证指标:
- Validation accuracy (验证集准确率)
- Format compliance (格式符合率)
- Response quality (回复质量)
-
系统指标:
- GPU memory utilization (GPU内存使用率)
- Training speed (训练速度)
- Convergence rate (收敛速率)
9. 故障排除和调试
9.1 常见问题
- OOM错误: 减小batch size或启用更多CPU卸载
- 收敛问题: 调整学习率或KL系数
- 格式问题: 检查prompt模板和奖励函数
- 性能问题: 优化数据加载和预处理流程
9.2 调试技巧
- 日志分析: 监控训练日志中的损失变化
- 样本检查: 定期检查生成的样本质量
- 梯度监控: 检查梯度的范数和分布
- 资源监控: 监控GPU和CPU资源使用情况
10. 总结
本文档详细描述了使用EasyR1框架在Geometry3K数据集上运行Qwen2.5-VL GRPO训练的完整流程。该流程集成了最新的多模态学习、强化学习和分布式训练技术,展现了现代AI系统的复杂性和先进性。
主要特色:
- 多模态RLHF: 支持图像+文本的强化学习训练
- GRPO算法: 提供更稳定的策略梯度优化
- 分布式高效: 通过FSDP和vLLM实现高效训练和推理
- 内存优化: 多层次的内存管理策略
- 结果导向: 专门为数学推理任务优化的奖励设计
这个系统为多模态推理任务的强化学习训练提供了一个完整的解决方案,可以作为类似任务的参考实现。