Kaggle - LLM Science Exam 大模型做科学选择题
Kaggle - LLM Science Exam
Science Exam Simple Approach w/ Model Hub | Kaggle
Platypus2-70B with Wikipedia RAG | Kaggle
5个选项只有一个选项正确,目标:回答一个选项序列(只有前三个有效)
输出正确选项 (可以多输出几个选项 防止第一选项是错的 要让正确选择尽量靠前)
以下为一个简单版本的直接调用大模型,还有一个复杂版本的 RAG + 分布式运行 70B 超大模型
目录
简单实现版: 设置prompt 直接调用大模型(无训练过程)
1. 数据集&模型导入
2. 组合构建选项问题的提示词
3. 回答第一个问题例子
4. post_process 模型输出修正
5. 输出+提交
一、RAG + 分片并行计算
1. RAG 理论概述
2. RAG 工作流程总结
3. SentenceTransformer 类 文本->向量
4. RAG 检索 整合上下文context帮助LLM
二、有限GPU内存下运行超大模型
1. 四大关键技术
2. 三阶段流程
3. 模型权重分片存储+符号链接虚拟文件系统
4. 权重加载器类 (WeightsLoader)
5. 分片LLAMA模型类 (ShardedLlama)
6. 将原始数据 tokenizer 转换为模型能理解的 标准化输入
7. 模型运行函数
8. 将之前的函数和类 总流程运行
9. 训练集 结果评估
简单实现版: 设置prompt 直接调用大模型(无训练过程)
Kaggle - LLM Science Exam-simple(直接调用大模型)
1. 数据集&模型导入
选的是 flan-t5 的小模型;适用于做小选择题;
(有个弊端是 只会输出一个选项 但根据题目要求 按正确概率从前到后输出五个选项更好)
import pandas as pdimport warnings
warnings.simplefilter("ignore")import torch
from transformers import T5Tokenizer, T5ForConditionalGenerationllm = '/kaggle/input/flan-t5/pytorch/base/4'
model = T5ForConditionalGeneration.from_pretrained(llm)
tokenizer = T5Tokenizer.from_pretrained(llm)test = pd.read_csv('/kaggle/input/kaggle-llm-science-exam/test.csv', index_col='id')
test.head()
数据格式为 prompt 对应问题;还有五个选项; train 还包含正确答案 answer
2. 组合构建选项问题的提示词
(可以将kaggle的题干发给deepseek 让它生成一个比较准确的prompt )下例为
''' 你是一个善于回答科学选择题的专家。请遵循以下步骤:
1. 仔细分析以下问题和所有选项。
2. 判断每个选项作为答案的正确可能性。
3. 将五个选项字母(A、B、C、D、E)按可能性从高到低排序。
4. 最终只输出排序后的字母序列,字母之间用单个空格分隔,不要输出任何其他文字。
例如,如果你认为B最可能正确,其次是A,然后是D、C、E,你应该输出:B A D C E
现在请回答以下问题: '''
def format_input(df, idx):preamble = 'You are an expert skilled at answering scientific multiple-choice questions. Please follow these steps:/\nCarefully analyze the following question and all options./\nEvaluate the likelihood of each option being the correct answer./\nRank the five option letters (A, B, C, D, E) in order from most likely to least likely to be correct./\nFinally, output only the sequenced letters in order, separated by single spaces, without any additional text./\nFor example, if you believe option B is most likely correct, followed by A, then D, C, and E, you should output: B A D C E /\nNow please answer the following question:.'# 前面那段话 + 问题和五个选项prompt = df.loc[idx, 'prompt']a = df.loc[idx, 'A']b = df.loc[idx, 'B']c = df.loc[idx, 'C']d = df.loc[idx, 'D']e = df.loc[idx, 'E']input_text = f"{preamble}\n\n{prompt}\n\nA) {a}\nB) {b}\nC) {c}\nD) {d}\nE) {e}"return input_text
3. 回答第一个问题例子
tokenizer input -> generate output -> decode answer
inputs = tokenizer(format_input(test, 0), return_tensors="pt")
outputs = model.generate(**inputs)
answer = tokenizer.batch_decode(outputs, skip_special_tokens=True)
print(answer)
4. post_process 模型输出修正
凑满五个选项;把少的字母拼在答案的后面
比如只输出 D 就输出 D ABCE
def post_process(predictions):valid = set(['A', 'B', 'C', 'D', 'E'])# 如果模型输出中没有任何有效字母if set(predictions).isdisjoint(valid):final_pred = 'A B C D E' # 返回默认答案else:final_pred = []for prediction in predictions:if prediction in valid: # 只保留有效字母final_pred += prediction# 添加缺失的字母to_add = valid - set(final_pred)final_pred.extend(list(to_add))# 格式化为空格分隔final_pred = ' '.join(final_pred)return final_pred
5. 输出+提交
submission = pd.read_csv('/kaggle/input/kaggle-llm-science-exam/sample_submission.csv', index_col='id')for idx in test.index:inputs = tokenizer(format_input(test, idx), return_tensors="pt")outputs = model.generate(**inputs)answer = tokenizer.batch_decode(outputs, skip_special_tokens=True)submission.loc[idx, 'prediction'] = post_process(answer)display(submission.head())
submission.to_csv('submission.csv')
方案二 运用RAG + 分片并行加载70B超大模型
Platypus2-70B with Wikipedia RAG | Kaggle
一、RAG + 分片并行计算
1. RAG 理论概述
Retrieval-Augmented Generation (RAG) 是一种结合信息检索和文本生成的混合模型架构,主要包含两个核心组件:
-
检索器 (Retriever):从大规模知识库中检索与输入相关的文档或段落
-
生成器 (Generator):基于检索到的上下文信息生成高质量的答案
RAG 的优势在于:
-
能够访问外部知识,减少模型幻觉
-
不需要重新训练整个模型即可更新知识
-
提供可追溯的信息来源
2. RAG 工作流程总结
-
查询编码:将问题+选项编码为高维向量
-
向量检索:在 FAISS 索引中查找相似文档
-
上下文构建:整合检索到的相关文档
-
准备生成:为后续的 LLM 提供增强的上下文信息
3. SentenceTransformer 类 文本->向量
功能:将文本句子转换为高维向量表示(嵌入向量)embeddings
整体工作流程
-
输入:原始文本句子列表
-
预处理:添加检索提示前缀 → 分词 → 填充/截断
-
编码:通过预训练模型获取句子表示
-
后处理:L2归一化 → 转移到CPU → 转换为numpy
-
输出:形状为
[num_sentences, embedding_dim]
的嵌入矩阵
1. 初始化 device;checkpoint;model;tokenizer
class SentenceTransformer:def __init__(self, checkpoint, device="cuda:0"):self.device = device # 设置计算设备(GPU或CPU)self.checkpoint = checkpoint # 预训练模型的路径或名称self.model = AutoModel.from_pretrained(checkpoint).to(self.device).half() # 加载模型并转换为半精度self.tokenizer = AutoTokenizer.from_pretrained(checkpoint) # 加载对应的分词器
2. 分词处理 transform 分词 → 填充/截断
def transform(self, batch):# 对批量文本进行分词处理tokens = self.tokenizer(batch["text"], truncation=True, # 截断超过最大长度的文本padding=True, # 填充较短序列以保证批次统一return_tensors="pt", # 返回PyTorch张量max_length=MAX_SEQ_LEN # 最大序列长度限制(512))return tokens.to(self.device) # 将张量移动到指定设备
3. 创建数据加载器 get_dataloader 添加前缀 + 取batch_size 加载数据,用在batch循环中
def get_dataloader(self, sentences, batch_size=32):# 添加检索专用的提示前缀sentences = ["Represent this sentence for searching relevant passages: " + x for x in sentences]# 创建Hugging Face数据集对象dataset = Dataset.from_dict({"text": sentences})# 设置实时转换函数dataset.set_transform(self.transform)# 创建PyTorch数据加载器dataloader = DataLoader(dataset, batch_size=batch_size, # 控制每次处理的样本数量shuffle=False # 不 shuffle,保持原始顺序)return dataloader
4. 编码 encode 循环batch的数据集 -> 模型输出 -> L2归一化 -> 嵌入矩阵
def encode(self, sentences, show_progress_bar=False, batch_size=32):# 创建数据加载器dataloader = self.get_dataloader(sentences, batch_size=batch_size)# 可选进度条显示pbar = tqdm(dataloader) if show_progress_bar else dataloaderembeddings = [] # 存储所有嵌入向量for batch in pbar: # 循环加载的数据集with torch.no_grad(): # 禁用梯度计算,节省内存# 前向传播获取模型输出e = self.model(**batch).pooler_output# L2归一化,使向量处于单位球面上e = F.normalize(e, p=2, dim=1)# 转移到CPU并转换为numpy数组embeddings.append(e.detach().cpu().numpy())# 合并所有批次的嵌入向量embeddings = np.concatenate(embeddings, axis=0)return embeddings
4. RAG 检索 整合上下文context帮助LLM
-
将测试集中的问题编码为向量
-
在FAISS向量数据库中搜索最相关的Wikipedia 维基百科 经过预处理和解析的科学相关内容
-
提取并整合相关文档作为上下文信息context 为后续的LLM生成阶段提供知识支持
1. 将测试集中的问题编码为向量
加载句子嵌入模型 SentenceTransformer;
构建查询文本:将问题与所有选项拼接,形成更丰富的查询 将测试集中的问题编码为向量
# 加载句子嵌入模型(BGE-small-en-v1.5),用于将文本转换为向量表示
model = SentenceTransformer(MODEL_PATH, device="cuda:0")# 构建查询文本:将问题与所有选项拼接,形成更丰富的查询 得到嵌入embedding
f = lambda row : " ".join([row["prompt"], row["A"], row["B"], row["C"], row["D"], row["E"]])
inputs = df.apply(f, axis=1).values
prompt_embeddings = model.encode(inputs, show_progress_bar=False)
2. 在FAISS向量数据库搜索
实现了大规模向量数据库的高效相似度搜索
从海量知识库中快速检索最相关的文档,为LLM提供准确的上下文信息
# 加载预构建的FAISS向量索引文件
MODEL_PATH = "/kaggle/input/bge-small-faiss/"
faiss_index = faiss.read_index(MODEL_PATH + '/faiss.index')NUM_TITLES=5 # 为每个查询检索最相关的5个文档# 在FAISS索引中执行相似度搜索,查找与每个查询最相关的文档
# [1]返回第二个元素:search()返回(distances, indices)元组,只需要索引位置
search_index = faiss_index.search(np.float32(prompt_embeddings), NUM_TITLES)[1]
3. 搜索最相关的Wikipedia文档 拼接为context 上下文
# 加载预处理好的Wikipedia段落数据集 包含解析和扩展后的科学相关文本内容
dataset = load_from_disk("/kaggle/input/all-paraphs-parsed-expanded")# 遍历测试集中的每个问题,构建对应的上下文
for i in range(len(df)):df.loc[i, "context"] = "-" + "\n-".join([dataset[int(j)]["text"] for j in search_index[i]])# 内存清理
faiss_index.reset() # 重置FAISS索引,释放内部资源
del faiss_index, prompt_embeddings, model, dataset # 删除对象引用,加速垃圾回收
clean_memory() # 执行深度内存清理:垃圾回收、C层内存整理、GPU缓存清空
二、有限GPU内存下运行超大模型
在单个T4 GPU(16GB内存)上运行Platypus2-70B模型(约140GB),面临内存容量严重不足的问题。传统方法需要将整个模型加载到GPU内存,但这里采用了创新的分层加载和流水线处理方案。
1. 四大关键技术
1. 模型分片存储与动态加载
-
分片存储:将70B模型拆分为多个部分存储在磁盘上
-
按需加载:只在需要时才将特定层加载到GPU内存
-
符号链接:创建虚拟文件系统,让程序以为模型是完整的
2. 分层处理流水线
-
逐层处理:不是一次性加载整个模型,而是按层顺序处理
-
内存复用:处理完一层后立即释放该层内存,加载下一层
-
CPU-GPU协作:在CPU内存中预加载下一层权重,减少等待时间
3. 多GPU并行计算
-
数据并行:将测试数据分割到多个GPU上同时处理
-
权重共享:多个GPU共享同一套模型权重,避免重复存储
-
线程安全:使用同步机制确保权重加载的协调性
4. 内存优化策略
-
半精度计算:使用float16而非float32,减少50%内存占用
-
注意力优化:使用Flash Attention减少内存使用
-
缓存管理:及时清理GPU和CPU内存碎片
2. 三阶段流程
准备阶段
-
建立虚拟模型文件系统
-
初始化空模型结构(不占用实质内存)
-
准备多GPU协调机制
推理阶段(逐样本逐层处理)
-
输入处理:将问题+选项+上下文转换为token序列
-
分层计算:每层依次 加载权重到GPU + 计算输出 + 释放内存
-
结果收集:逐层传递中间结果,最终得到预测分数
后处理阶段
-
对每个选项计算置信度分数
-
排序得到Top-3预测
-
生成提交格式
3. 模型权重分片存储+符号链接虚拟文件系统
-
分片存储:模型权重被预先分割存储在多个文件中
-
虚拟整合:通过符号链接创建统一的文件系统视图
权重分别在这三个文件
# 创建符号链接虚拟文件系统checkpoint_path = Path("/root/.cache/")
checkpoint_path.mkdir(exist_ok=True, parents=True)for part in [1, 2, 3]:source_dir = Path(f'/kaggle/input/platypus2-chuhac2-part{part}')for path in source_dir.glob("*"):(checkpoint_path / path.name).symlink_to(path)
4. 权重加载器类 (WeightsLoader)
每次有一部分GPU需要申请参数,给这些GPU分发参数。
要求:所有设备都请求同一层
class WeightsLoader:def __init__(self, checkpoint_path, devices):self.checkpoint_path = Path(checkpoint_path)self.states = {device: None for device in devices} # 设备状态跟踪self.state_dict = None # 当前加载的权重字典self.condition = Condition() # 线程同步条件变量def get_state_dict(self, device):# 等待权重加载完成,然后获取权重字典with self.condition:while self.states[device] is not None: # 等待当前加载完成self.condition.wait()result = self.state_dictself.states[device] = Noneif not any(self.states.values()): # 所有设备都获取完毕self.condition.notify_all()return resultdef set_state_dict(self, layer_name, device):# 请求加载指定层的权重with self.condition:self.states[device] = layer_name # 标记设备需要该层if all(self.states.values()): # 所有设备都请求同一层assert len(set(self.states.values())) == 1 # 确保请求一致# 实际加载权重到CPU内存self.state_dict = load_file(self.checkpoint_path / (layer_name + ".safetensors"), device="cpu")for d in self.states: # 重置所有设备状态self.states[d] = Noneself.condition.notify_all() # 通知所有等待线程
5. 分片LLAMA模型类 (ShardedLlama)
实现:分层初始化机制 + 分层加载与执行
1. 架构设计 参数 + 层的顺序
class ShardedLlama:def __init__(self, checkpoint_path, weights_loader, device="cuda:0", dtype=torch.float16):self.checkpoint_path = Path(checkpoint_path)self.weights_loader = weights_loader # 权重加载器实例self.device = deviceself.dtype = dtype # 半精度节省内存# 初始化空模型结构(几乎不占内存)self.config = AutoConfig.from_pretrained(self.checkpoint_path)self.tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)self.init_model() # 创建模型框架# 定义层处理顺序self.layer_names = ["model.embed_tokens"] + [f"model.layers.{i}" for i in range(len(self.model.model.layers))] + ["model.norm", "value_head"]
2. 分层初始化机制
def init_model(self):# 使用空权重初始化模型结构with init_empty_weights(): # 关键:不分配实际内存self.model = AutoModelForCausalLM.from_config(self.config)self.model.lm_head = torch.nn.Linear(8192, 8, bias=False)self.model.eval()self.model = BetterTransformer.transform(self.model) # 启用Flash Attentionself.model.tie_weights()# 提取层引用以便逐层处理self.layers = [self.model.model.embed_tokens] + list(self.model.model.layers) + [self.model.model.norm, self.model.lm_head]# 只将缓冲区移到设备(占用内存很少)for buffer_name, buffer in self.model.named_buffers():set_module_tensor_to_device(self.model, buffer_name, self.device, value=buffer, dtype=self.dtype)
3. 分层加载与执行
使用线程池实现加载与计算重叠;
预加载下一层 获取权重 + 计算结果(选项顺序)+ 释放空间
def __call__(self, inputs):# 每次调用前清理内存并重新初始化del self.modelclean_memory()self.init_model()# 准备输入数据batch = [(prefix.to(self.device), suffix.to(self.device)) for prefix, suffix in inputs]# 使用线程池实现加载与计算重叠with ThreadPoolExecutor() as executor, torch.inference_mode():# 预加载第一层future = executor.submit(self.load_layer_to_cpu, "model.embed_tokens")# 逐层处理流水线for i, (layer_name, layer) in enumerate(zip(self.layer_names, self.layers)):# 获取当前层权重,并预加载下一层state_dict = future.result()if (i + 1) < len(self.layer_names):future = executor.submit(self.load_layer_to_cpu, self.layer_names[i + 1])# 将权重转移到GPUself.move_layer_to_device(state_dict)# 执行当前层计算for j, (prefix, suffix) in enumerate(batch):if layer_name == "model.embed_tokens":batch[j] = (layer(prefix), layer(suffix)) # 嵌入层elif layer_name == "model.norm":batch[j] = (None, layer(suffix[torch.arange(n_suffixes), suffix_eos[j]][:, None])) # 归一化elif layer_name == "value_head":batch[j] = layer(suffix)[:, 0].mean(1).detach().cpu().numpy() # 输出层else:# Transformer层:使用KV缓存优化len_p, len_s = prefix.shape[1], suffix.shape[1]new_prefix, (k_cache, v_cache) = layer(prefix, use_cache=True, attention_mask=attention_mask[:, :, -len_p:, -len_p:])# 使用缓存计算suffixpos = position_ids[:, len_p:len_p + len_s].expand(n_suffixes, -1)attn = attention_mask[:, :, -len_s:, -len_p - len_s:].expand(n_suffixes, -1, -1, -1)kv_cache = (k_cache.expand(n_suffixes, -1, -1, -1), v_cache.expand(n_suffixes, -1, -1, -1))new_suffix = layer(suffix, past_key_value=kv_cache, position_ids=pos, attention_mask=attn)[0]batch[j] = (new_prefix, new_suffix)# 释放当前层内存layer.to("meta") # 移回元设备(释放GPU内存)clean_memory() # 强制垃圾回收
6. 将原始数据 tokenizer 转换为模型能理解的 标准化输入
系统提示词 + instruction + 选项 + 问题文本 + 拼接context上下文
def get_tokens(row, tokenizer): # 系统提示词模板,定义任务格式system_prefix = "Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\n{instruction}\n\n### Input:\nContext:\n{context}"# 具体任务指令,明确模型职责instruction = "Your task is to analyze the question and answer below. If the answer is correct, respond yes, if it is not correct respond no. As a potential aid to your answer, background context from Wikipedia articles is at your disposal, even if they might not always be relevant."# 处理五个选项,生成对应的suffix输入prompt_suffix = [f"{row[letter]}\n\n### Response:\n" for letter in "ABCDE"]suffix = tokenizer(prompt_suffix, return_tensors="pt", return_attention_mask=False, truncation=True, max_length=MAX_LENGTH, padding=True)["input_ids"][:, 1:]# 处理问题文本部分prompt_question = f"\nQuestion: {row['prompt']}\nProposed answer: "question = tokenizer(prompt_question, return_tensors="pt", return_attention_mask=False, truncation=True, max_length=max(0, MAX_LENGTH - suffix.shape[1]))["input_ids"][:, 1:]# 处理上下文信息,整合Wikipedia检索结果prompt_context = system_prefix.format(instruction=instruction, context=row["context"])max_length = min(MAX_CONTEXT, max(0, MAX_LENGTH - question.shape[1] - suffix.shape[1]))context = tokenizer(prompt_context, return_tensors="pt", return_attention_mask=False, truncation=True, max_length=max_length)["input_ids"]# 组合前缀部分(上下文+问题)prefix = torch.cat([context, question], dim=1)return prefix, suffix
7. 模型运行函数
ShardedLlama + input分批次 + 推理得到output
def run_model(device, df, weights_loader):# 初始化分片模型实例model = ShardedLlama(checkpoint_path, weights_loader, device=device)# 创建数据处理函数(部分应用tokenizer)f = partial(get_tokens, tokenizer=model.tokenizer)# 为DataFrame的每一行生成模型输入inputs = df.apply(f, axis=1).values# 将输入数据分割成多个批次batches = np.array_split(inputs, N_BATCHES)outputs = []# 逐批次执行模型推理for i, batch in enumerate(batches):outputs += model(batch) # 调用模型前向传播return outputs # 返回所有推理结果
8. 将之前的函数和类 总流程运行
weights_loader权重加载器 + run_model 跑模型
output 为每个选项的概率;按照概率倒序 前三名
# 仅在测试集模式下执行完整推理流程
if IS_TEST_SET:# 检测所有可用的CUDA设备devices = [f"cuda:{i}" for i in range(torch.cuda.device_count())]# 创建权重加载器实例,用于多设备权重协调weights_loader = WeightsLoader(checkpoint_path, devices)# 创建部分应用函数,固定权重加载器参数f = partial(run_model, weights_loader=weights_loader)# 使用线程池执行器进行并行计算with ThreadPoolExecutor() as executor:# 将数据分割并映射到各个设备执行outputs = list(executor.map(f, devices, np.array_split(df, 2)))outputs = sum(outputs, []) # 扁平化结果列表# 处理模型输出,生成最终预测n = len(df)for i, scores in enumerate(outputs):# 按得分降序排序,获取Top-3选项索引top3 = np.argsort(scores)[::-1]# 将索引转换为选项字母df.loc[i, "prediction"] = " ".join(["ABCDE"[j] for j in top3])# 训练集模式下的性能评估if "answer" in df.columns:# 详细准确率计算 在下一部分中
else:# 非测试集模式生成默认预测df["prediction"] = "A B C"# 保存最终预测结果到提交文件
df[["prediction"]].to_csv("submission.csv")
9. 训练集 结果评估
对于训练集 拿出 prediction的Top3;看百分之多少的问题 answer出现在预测的第 1,2,3 位置
# 仅在训练集模式下执行评估(有标准答案)
if "answer" in df.columns:# 提取每个样本的Top-1, Top-2, Top-3预测for i in range(n):df.loc[i, "top_1"] = df.loc[i, "prediction"][0] # Top-1预测(第一个字符)df.loc[i, "top_2"] = df.loc[i, "prediction"][2] # Top-2预测(第三个字符,跳过空格)df.loc[i, "top_3"] = df.loc[i, "prediction"][4] # Top-3预测(第五个字符,跳过空格)# 计算各Top级别的正确样本数top_i = [(df[f"top_{i}"] == df["answer"]).sum() for i in [1, 2, 3]]# 输出详细性能报告print(f"top1 : {top_i[0]}/{n}, top2 : {top_i[1]}/{n}, top3 : {top_i[2]}/{n} (total={sum(top_i)} / {n})")print(f"Accuracy: {100*top_i[0]/n:.1f}%, map3: {100*(top_i[0] + top_i[1]*1/2 + top_i[2]*1/3).sum()/n:.1f}%")