NLP大语言模型数据准备
NLP大语言模型数据准备完整指南
1. 概述
大语言模型(LLM)的性能很大程度上取决于训练数据的质量和规模。数据准备是整个模型训练流程中最关键且耗时的环节,通常占据整个项目60-80%的时间。
1.1 数据准备流程概览
原始数据收集 → 数据清洗 → 数据预处理 → 分词/编码 → 数据集构建 → 质量验证
2. 数据收集
2.1 数据源类型
公开数据集
- Common Crawl: 网页爬虫数据,规模达PB级
- Wikipedia: 高质量百科全书数据
- BookCorpus: 图书文本数据
- OpenWebText: Reddit高质量链接内容
- C4 (Colossal Clean Crawled Corpus): Google清洗后的网页数据
- The Pile: 825GB多源混合数据集
专有数据
- 企业内部文档
- 领域专业资料
- 用户生成内容(需遵守隐私法规)
2.2 数据收集工具
# 常用爬虫框架
- Scrapy: 强大的爬虫框架
- BeautifulSoup: HTML/XML解析
- Selenium: 动态网页爬取
- Requests: HTTP请求库# 数据下载工具
- wget/curl: 命令行下载工具
- Hugging Face Datasets: 数据集管理库
- TorchData: PyTorch数据加载工具
3. 数据清洗
3.1 去重处理
精确去重
# 使用哈希算法进行文档级去重
import hashlib
from collections import defaultdictdef exact_dedup(documents):seen_hashes = set()unique_docs = []for doc in documents:doc_hash = hashlib.sha256(doc.encode()).hexdigest()if doc_hash not in seen_hashes:seen_hashes.add(doc_hash)unique_docs.append(doc)return unique_docs
模糊去重
# 使用MinHash进行近似去重
from datasketch import MinHash, MinHashLSHdef fuzzy_dedup(documents, threshold=0.9):lsh = MinHashLSH(threshold=threshold)unique_docs = []for idx, doc in enumerate(documents):minhash = MinHash()for word in doc.split():minhash.update(word.encode('utf8'))if not lsh.query(minhash):lsh.insert(f"doc_{idx}", minhash)unique_docs.append(doc)return unique_docs
3.2 质量过滤
语言检测
from langdetect import detect_langs
import fasttext# FastText语言检测模型
model = fasttext.load_model('lid.176.bin')def filter_by_language(text, target_lang='zh'):predictions = model.predict(text)lang = predictions[0][0].replace('__label__', '')confidence = predictions[1][0]return lang == target_lang and confidence > 0.8
内容质量评估
- 困惑度过滤: 移除困惑度异常高的文本
- 长度过滤: 移除过短或过长的文档
- 特殊字符比例: 控制特殊字符占比
- 重复性检测: 检测文档内部重复
def quality_filter(text):# 长度过滤if len(text) < 100 or len(text) > 100000:return False# 特殊字符比例special_char_ratio = len([c for c in text if not c.isalnum()]) / len(text)if special_char_ratio > 0.5:return False# 重复行检测lines = text.split('\n')if len(set(lines)) / len(lines) < 0.7: # 超过30%重复return Falsereturn True
3.3 隐私和敏感信息处理
import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine# 使用正则表达式清理PII
def remove_pii(text):# 邮箱text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)# 电话号码text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)# 身份证号text = re.sub(r'\b\d{17}[\dXx]\b', '[ID]', text)# IP地址text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)return text# 使用Presidio进行高级PII检测
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()def anonymize_text(text):results = analyzer.analyze(text=text, language='en')anonymized = anonymizer.anonymize(text=text, analyzer_results=results)return anonymized.text
4. 数据预处理
4.1 文本规范化
import unicodedata
import ftfydef normalize_text(text):# 修复编码问题text = ftfy.fix_text(text)# Unicode规范化text = unicodedata.normalize('NFKC', text)# 空白字符规范化text = ' '.join(text.split())# 大小写处理(根据需求)# text = text.lower()return text
4.2 文本清理
import re
from bs4 import BeautifulSoupdef clean_text(text):# 移除HTML标签text = BeautifulSoup(text, 'html.parser').get_text()# 移除URLstext = re.sub(r'http[s]?://\S+', '', text)# 移除多余空白text = re.sub(r'\s+', ' ', text)# 移除控制字符text = ''.join(char for char in text if ord(char) >= 32)return text.strip()
5. 分词与编码
5.1 分词器类型
BPE (Byte Pair Encoding)
from tokenizers import ByteLevelBPETokenizer# 训练BPE分词器
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(files=['corpus.txt'],vocab_size=50000,min_frequency=2,special_tokens=['<s>', '</s>', '<pad>', '<unk>', '<mask>']
)
WordPiece
from tokenizers import BertWordPieceTokenizertokenizer = BertWordPieceTokenizer()
tokenizer.train(files=['corpus.txt'],vocab_size=30000,min_frequency=2,limit_alphabet=1000,wordpieces_prefix='##'
)
SentencePiece
import sentencepiece as spm# 训练SentencePiece模型
spm.SentencePieceTrainer.train(input='corpus.txt',model_prefix='model',vocab_size=32000,character_coverage=0.9995,model_type='unigram', # 或 'bpe'pad_id=3,unk_id=0,bos_id=1,eos_id=2
)
5.2 中文分词特殊处理
import jieba
from LAC import LAC# Jieba分词
def chinese_tokenize_jieba(text):return list(jieba.cut(text))# 百度LAC分词
lac = LAC(mode='seg')
def chinese_tokenize_lac(text):return lac.run(text)[0]# 使用预训练模型的分词器
from transformers import AutoTokenizertokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
tokens = tokenizer.tokenize(text)
6. 数据集构建
6.1 预训练数据格式
文本文件格式
# 每行一个文档,空行分隔
def create_pretraining_data(documents, output_file):with open(output_file, 'w', encoding='utf-8') as f:for doc in documents:f.write(doc + '\n\n')
JSONL格式
import jsondef create_jsonl_dataset(documents, output_file):with open(output_file, 'w', encoding='utf-8') as f:for doc in documents:json_line = json.dumps({'text': doc}, ensure_ascii=False)f.write(json_line + '\n')
6.2 微调数据格式
指令微调格式
def create_instruction_data(instructions, inputs, outputs):dataset = []for inst, inp, out in zip(instructions, inputs, outputs):sample = {'instruction': inst,'input': inp,'output': out}dataset.append(sample)return dataset
对话格式
def create_dialogue_data(conversations):dataset = []for conv in conversations:formatted_conv = {'messages': [{'role': 'system', 'content': conv['system']},{'role': 'user', 'content': conv['user']},{'role': 'assistant', 'content': conv['assistant']}]}dataset.append(formatted_conv)return dataset
7. 数据增强
7.1 回译增强
from transformers import MarianMTModel, MarianTokenizerdef back_translation(text, src_lang='zh', tgt_lang='en'):# 中译英model_name = f'Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}'tokenizer = MarianTokenizer.from_pretrained(model_name)model = MarianMTModel.from_pretrained(model_name)inputs = tokenizer(text, return_tensors="pt", padding=True)translated = model.generate(**inputs)en_text = tokenizer.decode(translated[0], skip_special_tokens=True)# 英译中model_name = f'Helsinki-NLP/opus-mt-{tgt_lang}-{src_lang}'tokenizer = MarianTokenizer.from_pretrained(model_name)model = MarianMTModel.from_pretrained(model_name)inputs = tokenizer(en_text, return_tensors="pt", padding=True)translated = model.generate(**inputs)augmented_text = tokenizer.decode(translated[0], skip_special_tokens=True)return augmented_text
7.2 同义词替换
import random
from synonyms import synonymsdef synonym_replacement(text, n=2):words = text.split()new_words = words.copy()random_word_list = list(set([word for word in words if word.isalpha()]))random.shuffle(random_word_list)num_replaced = 0for random_word in random_word_list:synonyms_list = synonyms(random_word)if len(synonyms_list) > 0:synonym = random.choice(synonyms_list)new_words = [synonym if word == random_word else word for word in new_words]num_replaced += 1if num_replaced >= n:breakreturn ' '.join(new_words)
8. 技术栈与工具
8.1 数据处理框架
工具 | 用途 | 特点 |
---|---|---|
Apache Spark | 大规模数据处理 | 分布式计算,支持PB级数据 |
Dask | Python并行计算 | 扩展pandas/numpy到大数据 |
Ray | 分布式AI工作负载 | 高性能,易于扩展 |
Apache Beam | 批流一体处理 | 统一的编程模型 |
8.2 NLP专用工具
工具 | 用途 | 特点 |
---|---|---|
spaCy | 工业级NLP | 快速、准确、易用 |
NLTK | NLP教学和研究 | 功能全面,文档丰富 |
Stanza | 多语言NLP | Stanford出品,学术质量 |
TextBlob | 简单NLP任务 | API简洁,适合初学者 |
8.3 深度学习框架集成
# PyTorch数据加载
from torch.utils.data import Dataset, DataLoaderclass TextDataset(Dataset):def __init__(self, texts, tokenizer, max_length=512):self.texts = textsself.tokenizer = tokenizerself.max_length = max_lengthdef __len__(self):return len(self.texts)def __getitem__(self, idx):text = self.texts[idx]encoding = self.tokenizer(text,truncation=True,padding='max_length',max_length=self.max_length,return_tensors='pt')return {'input_ids': encoding['input_ids'].flatten(),'attention_mask': encoding['attention_mask'].flatten()}# 创建数据加载器
dataset = TextDataset(texts, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
8.4 数据版本管理
# DVC (Data Version Control)
dvc init
dvc add data/corpus.txt
dvc remote add -d storage s3://mybucket/path
dvc push# Git LFS (Large File Storage)
git lfs track "*.bin"
git add .gitattributes
git add model.bin
git commit -m "Add model file"
9. 质量控制与验证
9.1 数据质量指标
import numpy as np
from collections import Counterdef calculate_data_statistics(texts):stats = {'total_documents': len(texts),'total_tokens': sum(len(text.split()) for text in texts),'avg_length': np.mean([len(text) for text in texts]),'std_length': np.std([len(text) for text in texts]),'vocabulary_size': len(set(' '.join(texts).split()))}# 词频分布all_words = ' '.join(texts).split()word_freq = Counter(all_words)stats['top_10_words'] = word_freq.most_common(10)return stats
9.2 数据集分割
from sklearn.model_selection import train_test_splitdef split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):assert train_ratio + val_ratio + test_ratio == 1.0# 第一次分割:训练集和临时集train_data, temp_data = train_test_split(data, test_size=1-train_ratio, random_state=42)# 第二次分割:验证集和测试集val_data, test_data = train_test_split(temp_data, test_size=test_ratio/(val_ratio+test_ratio), random_state=42)return {'train': train_data,'validation': val_data,'test': test_data}
10. 最佳实践
10.1 数据处理Pipeline
class DataProcessingPipeline:def __init__(self, config):self.config = configself.tokenizer = self._load_tokenizer()def _load_tokenizer(self):return AutoTokenizer.from_pretrained(self.config['tokenizer'])def process(self, raw_data):# 1. 清洗cleaned_data = self.clean(raw_data)# 2. 去重deduped_data = self.deduplicate(cleaned_data)# 3. 过滤filtered_data = self.filter(deduped_data)# 4. 规范化normalized_data = self.normalize(filtered_data)# 5. 分词tokenized_data = self.tokenize(normalized_data)return tokenized_datadef clean(self, data):return [clean_text(text) for text in data]def deduplicate(self, data):return exact_dedup(data)def filter(self, data):return [text for text in data if quality_filter(text)]def normalize(self, data):return [normalize_text(text) for text in data]def tokenize(self, data):return [self.tokenizer(text) for text in data]
10.2 性能优化建议
- 并行处理: 使用multiprocessing或Ray进行并行数据处理
- 批处理: 批量处理数据而非逐条处理
- 缓存机制: 缓存中间结果避免重复计算
- 内存管理: 使用生成器处理大文件,避免内存溢出
- 分布式存储: 使用HDFS或对象存储处理海量数据
10.3 常见问题与解决方案
问题 | 解决方案 |
---|---|
内存不足 | 使用流式处理,分批加载数据 |
处理速度慢 | 并行化处理,使用更高效的算法 |
数据不平衡 | 采样策略,数据增强 |
编码问题 | 统一使用UTF-8,使用ftfy修复 |
分词不准确 | 使用领域特定词典,自定义分词规则 |
11. 监控与日志
import logging
from tqdm import tqdm# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('data_processing.log'),logging.StreamHandler()]
)def process_with_monitoring(data, process_func):logger = logging.getLogger(__name__)processed_data = []failed_items = []for item in tqdm(data, desc="Processing data"):try:result = process_func(item)processed_data.append(result)except Exception as e:logger.error(f"Failed to process item: {e}")failed_items.append(item)logger.info(f"Successfully processed: {len(processed_data)}")logger.info(f"Failed items: {len(failed_items)}")return processed_data, failed_items
12. 总结
数据准备是大语言模型训练的基础,直接影响模型的最终性能。关键要点:
-
数据质量优于数量: 高质量的小数据集往往优于低质量的大数据集
-
保持数据多样性: 确保数据覆盖不同领域、风格和难度
-
注重隐私保护: 严格处理PII信息,遵守相关法规
-
建立可复现的Pipeline: 版本控制、文档化、自动化
-
持续迭代优化: 根据模型表现不断改进数据处理策略
-
数据收集:介绍了常用的公开数据集(如Common Crawl、Wikipedia等)和数据收集工具
-
数据清洗:
- 精确去重和模糊去重技术
- 质量过滤(语言检测、内容质量评估)
- 隐私信息处理(PII检测和脱敏)
-
数据预处理:文本规范化、HTML清理、编码修复等技术
-
分词与编码:详细介绍了BPE、WordPiece、SentencePiece等主流分词器,以及中文分词的特殊处理
-
数据集构建:预训练和微调数据的不同格式要求
-
数据增强:回译、同义词替换等技术
-
技术栈:
- 大数据处理框架(Spark、Dask、Ray)
- NLP工具(spaCy、NLTK、Stanza)
- 深度学习框架集成(PyTorch、TensorFlow)
- 数据版本管理(DVC、Git LFS)
-
质量控制:数据统计、数据集分割、监控日志等
-
最佳实践:完整的数据处理Pipeline设计和性能优化建议