CAMEL的特色功能——数据合成
CAMEL的特色功能——数据合成
6.1 前言
在大模型时代,高质量数据正在成为越来越重要的一部分,然而通过人工的标注的方式获取数据的成本太高,并且真实世界的数据正迅速耗尽,于是就有了使用AI来合成数据的方法,下面我们来介绍如何使用CAMEL帮助我们合成SFT数据。
CAMEL 和 Unsloth 是一对出色的搭档。在此章节中,我们将两者结合起来,以训练模型精通页面上的内容。您将学习如何使用 CAMEL 进行数据生成、如何训练以及如何运行模型。
Unsloth 需要 GPU 环境,要在您自己的计算机上安装 Unsloth,请按照此处的安装说明进行操作。
以下流程,笔者使用移动版Nvidia GTX4070显卡,显存为8GB。
6.2 CoT数据生成及模型微调
设置好后面用于进行数据合成的LLM
import os
from datetime import datetime
import json
from camel.datagen.cotdatagen import CoTDataGenerator
from camel.models import ModelFactory
from camel.types import ModelPlatformType, ModelType
from camel.configs import ChatGPTConfig
from camel.agents import ChatAgentsys_msg = '你是慢思考数据和代码方面的天才'model = ModelFactory.create(model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL,model_type="Qwen/Qwen2.5-72B-Instruct",url='https://api-inference.modelscope.cn/v1/',api_key='你的api密钥')chat_agent = ChatAgent(system_message=sys_msg,model=model,message_window_size=10,output_language='中文'
)
数据准备
我们需要准备一份Q&A 数据,这里我们采用json格式的QA数据,格式如下:
''' { "问题1": "答案1", "问题2": "答案2", ... }''''
# 获取示例JSON数据import requests
import json# JSON文件的URL
url = 'https://raw.githubusercontent.com/zjrwtx/alldata/refs/heads/main/qa_data.json'# 发送GET请求获取JSON文件
response = requests.get(url)# 检查请求是否成功
if response.status_code == 200:# 将响应内容解析为JSONjson_data = response.json()# 指定保存JSON数据的文件路径file_path = 'qa_data.json'# 将JSON数据写入文件with open(file_path, 'w', encoding='utf-8') as json_file:json.dump(json_data, json_file, ensure_ascii=False, indent=4)print(f"JSON数据已成功保存到 {file_path}")
else:print(f"获取JSON文件失败。状态码: {response.status_code}")>>>JSON数据已成功保存到 qa_data.json
with open(file_path, 'r', encoding='utf-8') as f:qa_data = json.load(f)print(qa_data)
>>>
{'What is the coefficient of $x^2y^6$ in the expansion of $\\left(\\frac{3}{5}x-\\frac{y}{2}\\right)^8$? Express your answer as a common fraction': '\\frac{63}{400}', 'how many a in banana?': '3'}
可以发现我们的示例数据是单纯的一问一答的数据,并没有中间的思考过程,这就让人没有那么容易相信模型的回答是真正准确的,就和我们解题的时候,没有中间过程,只有最后的结果的话通常不太会让老师相信我们是真的会这套题。我们现在让模型帮助我们补上中间的思考过程。
# 创建CoT数据生成器实例
testo1 = CoTDataGenerator(chat_agent, golden_answers=qa_data)
# 初始化用于存储生成答案的字典
generated_answers = {}# 测试问答
for question in qa_data.keys():print(f"问题: {question}")# 获取AI的思考过程和答案answer = testo1.get_answer(question)generated_answers[question] = answerprint(f"AI的思考过程和答案:\n{answer}")# 验证答案是否正确is_correct = testo1.verify_answer(question, answer)print(f"答案验证结果: {'正确' if is_correct else '错误'}")print("-" * 50)print() # 每次迭代后添加空行>>>
问题: What is the coefficient of $x^2y^6$ in the expansion of $\left(\frac{3}{5}x - \frac{y}{2}\right)^8$? Express your answer as a common fraction
AI的思考过程和答案:
### 分析问题要求
我们需要找到二项式 \(\left(\frac{3}{5}x - \frac{y}{2}\right)^8\) 展开后 \(x^2y^6\) 项的系数。这是一个典型的二项式定理应用问题。### 解决问题的步骤
1. **确定二项式定理的形式**:二项式定理告诉我们,对于任意的正整数 \(n\) 和任意的实数 \(a\) 和 \(b\),有:\[(a + b)^n = \sum_{k=0}^{n} \binom{n}{k} a^{n-k} b^k\]其中 \(\binom{n}{k}\) 是组合数,表示从 \(n\) 个元素中选择 \(k\) 个元素的方式数。2. **识别 \(a\) 和 \(b\)**:在这个问题中,\(a = \frac{3}{5}x\),\(b = -\frac{y}{2}\),并且 \(n = 8\)。3. **确定 \(x^2y^6\) 项对应的 \(k\) 值**:我们需要找到 \(a^{n-k} b^k\) 中 \(x^2y^6\) 的项。这意味着:\[\left(\frac{3}{5}x\right)^{8-k} \left(-\frac{y}{2}\right)^k = x^2 y^6\]通过比较指数,我们得到:\[8 - k = 2 \quad \text{和} \quad k = 6\]
...
因此,单词 "banana" 中有 3 个字母 'a'。
答案验证结果: 正确
--------------------------------------------------
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...
可以发现,Agent自动帮我们补全了中间过程,这里我们可以简单了解一下CoTDataGenerator
的工作原理,这个类实现了生成 Chain of Thought (CoT) 数据的功能,主要通过以下几个关键机制:
- 双代理系统
使用 generator_agent 生成答案
使用 verifier_agent 验证答案
也可以使用单一代理同时负责生成和验证
- 蒙特卡洛树搜索 (MCTS)
在 solve_question 方法中实现
通过多次迭代搜索最佳答案
每次迭代都会生成新的答案并评估其质量
- 二分查找错误定位
通过 binary_search_error 方法定位答案中的错误位置
可以精确找到答案中出错的部分
生成 CoT 数据的具体流程如下:
-
初始化时传入预定义的正确答案 (golden_answers)
-
对于每个问题:
先尝试直接生成答案
如果答案不正确,则启动 MCTS 搜索
在搜索过程中不断评估答案质量
使用二分查找定位错误
基于正确部分生成新的解决方案
-
将最终解决方案存储在 solution_tree 中
将生成的答案导出到 JSON 文件,并将其转换为 Alpaca traing 数据格式
# 创建简化输出结构
simplified_output = {'timestamp': datetime.now().isoformat(), # 当前时间戳'qa_pairs': generated_answers # 生成的问答对
}# 生成带时间戳的文件名
simplified_file = f'generated_answers_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'# 将数据写入JSON文件
with open(simplified_file, 'w', encoding='utf-8') as f:json.dump(simplified_output, f, ensure_ascii=False, indent=2)# 打印导出成功信息
print(f"生成的答案已导出到: {simplified_file}")
我们可以将其封装成函数:
import json
from datetime import datetimedef transform_qa_format(input_file):# 读取输入的JSON文件with open(input_file, 'r', encoding='utf-8') as f:data = json.load(f)# 转换数据格式transformed_data = []for question, answer in data['qa_pairs'].items():transformed_pair = {"instruction": question, # 指令/问题"input": "", # 输入"output": answer # 输出/答案}transformed_data.append(transformed_pair)# 生成带时间戳的输出文件名timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")output_file = f'transformed_qa_{timestamp}.json'# 写入转换后的数据with open(output_file, 'w', encoding='utf-8') as f:json.dump(transformed_data, f, ensure_ascii=False, indent=2)return output_file, transformed_data
output_file, transformed_data = transform_qa_format(simplified_file)
print(f"转换完成。输出已保存到: {output_file}")>>>
转换完成。输出已保存到: transformed_qa_20250128_134702.json
将数据上传到 Huggingface
这里定义了一个函数 upload_to_huggingface 将数据集上传到 Hugging Face。该脚本是模块化的,帮助程序函数处理特定任务,例如数据集名称生成、数据集创建、元数据卡创建和记录添加
# 导入必要的模块和类
from camel.datahubs.huggingface import HuggingFaceDatasetManager # 管理与Hugging Face数据集的交互
from camel.datahubs.models import Record # 表示数据集中的单个记录
from datetime import datetime # 处理日期和时间操作# 主函数:将数据集上传到Hugging Face
def upload_to_huggingface(transformed_data, username, dataset_name=None):r"""将转换后的数据上传到Hugging Face数据集平台。参数:transformed_data (list): 转换后的数据,通常是字典列表。username (str): Hugging Face用户名。dataset_name (str, 可选): 自定义数据集名称。返回:str: 上传的数据集URL。"""# 初始化HuggingFaceDatasetManager以与Hugging Face数据集交互manager = HuggingFaceDatasetManager()# 生成或验证数据集名称dataset_name = generate_or_validate_dataset_name(username, dataset_name)# 在Hugging Face上创建数据集并获取数据集URLdataset_url = create_dataset(manager, dataset_name)# 创建数据集卡片以添加元数据create_dataset_card(manager, dataset_name, username)# 将转换后的数据转换为Record对象列表records = create_records(transformed_data)# 将Record对象添加到数据集中add_records_to_dataset(manager, dataset_name, records)# 返回数据集URLreturn dataset_url# 生成或验证数据集名称
def generate_or_validate_dataset_name(username, dataset_name):r"""生成默认数据集名称或验证并格式化用户提供的名称。参数:username (str): Hugging Face用户名。dataset_name (str, 可选): 用户提供的自定义数据集名称。返回:str: 格式化后的数据集名称。"""if dataset_name is None:# 如果未提供数据集名称,则使用用户名和当前日期生成默认名称dataset_name = f"{username}/qa-dataset-{datetime.now().strftime('%Y%m%d')}"else:# 如果提供了数据集名称,则格式化以包含用户名dataset_name = f"{username}/{dataset_name}"return dataset_name# 在Hugging Face上创建数据集
def create_dataset(manager, dataset_name):r"""在Hugging Face上创建新数据集并返回数据集URL。参数:manager (HuggingFaceDatasetManager): HuggingFaceDatasetManager实例。dataset_name (str): 数据集名称。返回:str: 创建的数据集URL。"""print(f"正在创建数据集: {dataset_name}")# 使用HuggingFaceDatasetManager创建数据集dataset_url = manager.create_dataset(name=dataset_name)print(f"数据集创建成功: {dataset_url}")return dataset_url# 创建包含元数据的数据集卡片
def create_dataset_card(manager, dataset_name, username):r"""创建数据集卡片以添加元数据参数:manager (HuggingFaceDatasetManager): HuggingFaceDatasetManager实例。dataset_name (str): 数据集名称。username (str): Hugging Face用户名。"""print("正在创建数据集卡片...")# 使用HuggingFaceDatasetManager创建数据集卡片manager.create_dataset_card(dataset_name=dataset_name,description="COT_qa_dataset", # 数据集描述license="mit", # 数据集许可证language=["en"], # 数据集语言size_category="<1MB", # 数据集大小类别version="0.1.0", # 数据集版本tags=["camel", "question-answering"], # 数据集标签task_categories=["question-answering"], # 数据集任务类别authors=[username] # 数据集作者)print(f"数据集卡片创建成功。")# 将转换后的数据转换为Record对象
def create_records(transformed_data):r"""将转换后的数据转换为Record对象列表。参数:transformed_data (list): 转换后的数据,通常是字典列表。返回:list: Record对象列表。"""records = []# 遍历转换后的数据并将每个字典转换为Record对象for item in transformed_data:record = Record(**item) # 使用字典键值对创建Record对象records.append(record)return records# 将Record对象添加到数据集
def add_records_to_dataset(manager, dataset_name, records):r"""将Record对象列表添加到数据集中。参数:manager (HuggingFaceDatasetManager): HuggingFaceDatasetManager实例。dataset_name (str): 数据集名称。records (list): Record对象列表。"""print("正在将记录添加到数据集中...")# 使用HuggingFaceDatasetManager将记录添加到数据集manager.add_records(dataset_name=dataset_name, records=records)print("记录添加成功。")
配置 Huggingface 的 Access Token
我们可以到这里从 Huggingface 获取 API Key
from getpass import getpass
import osHUGGING_FACE_TOKEN = getpass('Enter your HUGGING_FACE_TOKEN: ')
os.environ["HUGGING_FACE_TOKEN"] = HUGGING_FACE_TOKEN
# -- coding: utf-8 --
# 设置个人HuggingFace配置,然后上传到HuggingFace
username = input("请输入您的HuggingFace用户名: ")
dataset_name = input("请输入数据集名称(按Enter使用默认值): ").strip()
if not dataset_name:dataset_name = Nonetry:dataset_url = upload_to_huggingface(transformed_data, username, dataset_name)print(f"\n数据成功上传到HuggingFace!")print(f"数据集URL: {dataset_url}")
except Exception as e:print(f"上传到HuggingFace时出错: {str(e)}")>>>
正在创建数据集: SUNTAO0213/qa-dataset-20250128
数据集创建成功: https://huggingface.co/datasets/SUNTAO0213/qa-dataset-20250128正在创建数据集卡片...
数据集卡片创建成功。
正在将记录添加到数据集中...
记录添加成功。数据成功上传到HuggingFace!
数据集URL: https://huggingface.co/datasets/SUNTAO0213/qa-dataset-20250128
我们可以在自己的主页看到自己上传的数据集。
配置 Unsloth 环境
from unsloth import FastLanguageModel
import torchmax_seq_length = 2048 # 可以选择任意长度! 我们内部自动支持RoPE缩放!
dtype = None # None表示自动检测。Tesla T4、V100使用Float16,Ampere+使用Bfloat16
load_in_4bit = True # 使用4bit量化以减少内存使用。可以设为False。# 我们支持的4bit预量化模型,可以4倍速下载 + 不会OOM
fourbit_models = ["unsloth/Meta-Llama-3.1-8B-bnb-4bit", # Llama-3.1 15万亿token模型,速度提升2倍!"unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit","unsloth/Meta-Llama-3.1-70B-bnb-4bit","unsloth/Meta-Llama-3.1-405B-bnb-4bit", # 我们也上传了405b的4bit版本!"unsloth/Mistral-Nemo-Base-2407-bnb-4bit", # 新的Mistral 12b,速度提升2倍!"unsloth/Mistral-Nemo-Instruct-2407-bnb-4bit","unsloth/mistral-7b-v0.3-bnb-4bit", # Mistral v3,速度提升2倍!"unsloth/mistral-7b-instruct-v0.3-bnb-4bit","unsloth/Phi-3.5-mini-instruct", # Phi-3.5,速度提升2倍!"unsloth/Phi-3-medium-4k-instruct","unsloth/gemma-2-9b-bnb-4bit","unsloth/gemma-2-27b-bnb-4bit", # Gemma,速度提升2倍!
] # 更多模型请访问 https://huggingface.co/unslothmodel, tokenizer = FastLanguageModel.from_pretrained(# 可以从以下选择:# "unsloth/Qwen2.5-0.5B", "unsloth/Qwen2.5-1.5B", "unsloth/Qwen2.5-3B"# "unsloth/Qwen2.5-14B", "unsloth/Qwen2.5-32B", "unsloth/Qwen2.5-72B",# 还有所有的Instruct版本和Math、Coding版本!model_name = "unsloth/Qwen2.5-1.5B",max_seq_length = max_seq_length,dtype = dtype,load_in_4bit = load_in_4bit,# token = "hf_...", # 如果使用如meta-llama/Llama-2-7b-hf这样的受限模型时需要使用token
)
这里我们准备用一个Qwen2.5-1.5B的模型来进行微调
model = FastLanguageModel.get_peft_model(model,r = 16, # 选择任意大于0的数! 建议值为8、16、32、64、128target_modules = ["q_proj", "k_proj", "v_proj", "o_proj","gate_proj", "up_proj", "down_proj",],lora_alpha = 16,lora_dropout = 0, bias = "none", # [新特性] "unsloth"使用少30%的显存,可以容纳2倍大的批次大小!use_gradient_checkpointing = "unsloth", # 对于很长的上下文使用True或"unsloth"random_state = 3407,use_rslora = False, # 我们支持秩稳定化LoRAloftq_config = None, # 以及LoftQ
)
将 CoT 数据转换为符合 SFT 标准的训练数据格式
alpaca_prompt = """以下是描述任务的指令,以及提供更多上下文的输入。请写出恰当完成该请求的回应。### Instruction:
{}### Input:
{}### Response:
{}"""EOS_TOKEN = tokenizer.eos_token # 必须添加EOS_TOKEN
def formatting_prompts_func(examples):instructions = examples["instruction"]inputs = examples["input"] outputs = examples["output"]texts = []for instruction, input, output in zip(instructions, inputs, outputs):# 必须添加EOS_TOKEN,否则生成将无限进行下去!text = alpaca_prompt.format(instruction, input, output) + EOS_TOKENtexts.append(text)return { "text" : texts, }
passfrom datasets import load_dataset
dataset = load_dataset("SUNTAO0213/TEST", split = "train") #这里填写自己的数据地址
dataset = dataset.map(formatting_prompts_func, batched = True,)
现在我们准备训练模型 让我们使用 Huggingface TRL 的 !更多相关文档在这里: TRL SFT 文档。我们执行 60 个步骤来加快速度。
from trl import SFTTrainer
from transformers import TrainingArguments
from unsloth import is_bfloat16_supportedtrainer = SFTTrainer(model = model,tokenizer = tokenizer,train_dataset = dataset,dataset_text_field = "text", max_seq_length = max_seq_length,dataset_num_proc = 1,packing = False, # 对于短序列可以使训练速度提高5倍args = TrainingArguments(per_device_train_batch_size = 2, # 每个设备的训练批次大小gradient_accumulation_steps = 4, # 梯度累积步数warmup_steps = 5, # 预热步数# num_train_epochs = 1, # 设置完整训练运行的轮数max_steps = 60, # 最大训练步数learning_rate = 2e-4, # 学习率fp16 = not is_bfloat16_supported(), # 是否使用FP16bf16 = is_bfloat16_supported(), # 是否使用BF16logging_steps = 1, # 日志记录步数optim = "adamw_8bit", # 优化器weight_decay = 0.01, # 权重衰减lr_scheduler_type = "linear", # 学习率调度器类型seed = 3407, # 随机种子output_dir = "outputs", # 输出目录report_to = "none", # 用于WandB等工具的报告),
)
#@title 显示当前内存统计信息
gpu_stats = torch.cuda.get_device_properties(0)
start_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3)
print(f"GPU = {gpu_stats.name}. 最大内存 = {max_memory} GB.")
print(f"已使用内存 = {start_gpu_memory} GB.")
开始模型训练
trainer_stats = trainer.train()