当前位置: 首页 > java >正文

NLP大语言模型数据准备

NLP大语言模型数据准备完整指南

1. 概述

大语言模型(LLM)的性能很大程度上取决于训练数据的质量和规模。数据准备是整个模型训练流程中最关键且耗时的环节,通常占据整个项目60-80%的时间。

1.1 数据准备流程概览

原始数据收集 → 数据清洗 → 数据预处理 → 分词/编码 → 数据集构建 → 质量验证

2. 数据收集

2.1 数据源类型

公开数据集
  • Common Crawl: 网页爬虫数据,规模达PB级
  • Wikipedia: 高质量百科全书数据
  • BookCorpus: 图书文本数据
  • OpenWebText: Reddit高质量链接内容
  • C4 (Colossal Clean Crawled Corpus): Google清洗后的网页数据
  • The Pile: 825GB多源混合数据集
专有数据
  • 企业内部文档
  • 领域专业资料
  • 用户生成内容(需遵守隐私法规)

2.2 数据收集工具

# 常用爬虫框架
- Scrapy: 强大的爬虫框架
- BeautifulSoup: HTML/XML解析
- Selenium: 动态网页爬取
- Requests: HTTP请求库# 数据下载工具
- wget/curl: 命令行下载工具
- Hugging Face Datasets: 数据集管理库
- TorchData: PyTorch数据加载工具

3. 数据清洗

3.1 去重处理

精确去重
# 使用哈希算法进行文档级去重
import hashlib
from collections import defaultdictdef exact_dedup(documents):seen_hashes = set()unique_docs = []for doc in documents:doc_hash = hashlib.sha256(doc.encode()).hexdigest()if doc_hash not in seen_hashes:seen_hashes.add(doc_hash)unique_docs.append(doc)return unique_docs
模糊去重
# 使用MinHash进行近似去重
from datasketch import MinHash, MinHashLSHdef fuzzy_dedup(documents, threshold=0.9):lsh = MinHashLSH(threshold=threshold)unique_docs = []for idx, doc in enumerate(documents):minhash = MinHash()for word in doc.split():minhash.update(word.encode('utf8'))if not lsh.query(minhash):lsh.insert(f"doc_{idx}", minhash)unique_docs.append(doc)return unique_docs

3.2 质量过滤

语言检测
from langdetect import detect_langs
import fasttext# FastText语言检测模型
model = fasttext.load_model('lid.176.bin')def filter_by_language(text, target_lang='zh'):predictions = model.predict(text)lang = predictions[0][0].replace('__label__', '')confidence = predictions[1][0]return lang == target_lang and confidence > 0.8
内容质量评估
  • 困惑度过滤: 移除困惑度异常高的文本
  • 长度过滤: 移除过短或过长的文档
  • 特殊字符比例: 控制特殊字符占比
  • 重复性检测: 检测文档内部重复
def quality_filter(text):# 长度过滤if len(text) < 100 or len(text) > 100000:return False# 特殊字符比例special_char_ratio = len([c for c in text if not c.isalnum()]) / len(text)if special_char_ratio > 0.5:return False# 重复行检测lines = text.split('\n')if len(set(lines)) / len(lines) < 0.7:  # 超过30%重复return Falsereturn True

3.3 隐私和敏感信息处理

import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine# 使用正则表达式清理PII
def remove_pii(text):# 邮箱text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)# 电话号码text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)# 身份证号text = re.sub(r'\b\d{17}[\dXx]\b', '[ID]', text)# IP地址text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)return text# 使用Presidio进行高级PII检测
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()def anonymize_text(text):results = analyzer.analyze(text=text, language='en')anonymized = anonymizer.anonymize(text=text, analyzer_results=results)return anonymized.text

4. 数据预处理

4.1 文本规范化

import unicodedata
import ftfydef normalize_text(text):# 修复编码问题text = ftfy.fix_text(text)# Unicode规范化text = unicodedata.normalize('NFKC', text)# 空白字符规范化text = ' '.join(text.split())# 大小写处理(根据需求)# text = text.lower()return text

4.2 文本清理

import re
from bs4 import BeautifulSoupdef clean_text(text):# 移除HTML标签text = BeautifulSoup(text, 'html.parser').get_text()# 移除URLstext = re.sub(r'http[s]?://\S+', '', text)# 移除多余空白text = re.sub(r'\s+', ' ', text)# 移除控制字符text = ''.join(char for char in text if ord(char) >= 32)return text.strip()

5. 分词与编码

5.1 分词器类型

BPE (Byte Pair Encoding)
from tokenizers import ByteLevelBPETokenizer# 训练BPE分词器
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(files=['corpus.txt'],vocab_size=50000,min_frequency=2,special_tokens=['<s>', '</s>', '<pad>', '<unk>', '<mask>']
)
WordPiece
from tokenizers import BertWordPieceTokenizertokenizer = BertWordPieceTokenizer()
tokenizer.train(files=['corpus.txt'],vocab_size=30000,min_frequency=2,limit_alphabet=1000,wordpieces_prefix='##'
)
SentencePiece
import sentencepiece as spm# 训练SentencePiece模型
spm.SentencePieceTrainer.train(input='corpus.txt',model_prefix='model',vocab_size=32000,character_coverage=0.9995,model_type='unigram',  # 或 'bpe'pad_id=3,unk_id=0,bos_id=1,eos_id=2
)

5.2 中文分词特殊处理

import jieba
from LAC import LAC# Jieba分词
def chinese_tokenize_jieba(text):return list(jieba.cut(text))# 百度LAC分词
lac = LAC(mode='seg')
def chinese_tokenize_lac(text):return lac.run(text)[0]# 使用预训练模型的分词器
from transformers import AutoTokenizertokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
tokens = tokenizer.tokenize(text)

6. 数据集构建

6.1 预训练数据格式

文本文件格式
# 每行一个文档,空行分隔
def create_pretraining_data(documents, output_file):with open(output_file, 'w', encoding='utf-8') as f:for doc in documents:f.write(doc + '\n\n')
JSONL格式
import jsondef create_jsonl_dataset(documents, output_file):with open(output_file, 'w', encoding='utf-8') as f:for doc in documents:json_line = json.dumps({'text': doc}, ensure_ascii=False)f.write(json_line + '\n')

6.2 微调数据格式

指令微调格式
def create_instruction_data(instructions, inputs, outputs):dataset = []for inst, inp, out in zip(instructions, inputs, outputs):sample = {'instruction': inst,'input': inp,'output': out}dataset.append(sample)return dataset
对话格式
def create_dialogue_data(conversations):dataset = []for conv in conversations:formatted_conv = {'messages': [{'role': 'system', 'content': conv['system']},{'role': 'user', 'content': conv['user']},{'role': 'assistant', 'content': conv['assistant']}]}dataset.append(formatted_conv)return dataset

7. 数据增强

7.1 回译增强

from transformers import MarianMTModel, MarianTokenizerdef back_translation(text, src_lang='zh', tgt_lang='en'):# 中译英model_name = f'Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}'tokenizer = MarianTokenizer.from_pretrained(model_name)model = MarianMTModel.from_pretrained(model_name)inputs = tokenizer(text, return_tensors="pt", padding=True)translated = model.generate(**inputs)en_text = tokenizer.decode(translated[0], skip_special_tokens=True)# 英译中model_name = f'Helsinki-NLP/opus-mt-{tgt_lang}-{src_lang}'tokenizer = MarianTokenizer.from_pretrained(model_name)model = MarianMTModel.from_pretrained(model_name)inputs = tokenizer(en_text, return_tensors="pt", padding=True)translated = model.generate(**inputs)augmented_text = tokenizer.decode(translated[0], skip_special_tokens=True)return augmented_text

7.2 同义词替换

import random
from synonyms import synonymsdef synonym_replacement(text, n=2):words = text.split()new_words = words.copy()random_word_list = list(set([word for word in words if word.isalpha()]))random.shuffle(random_word_list)num_replaced = 0for random_word in random_word_list:synonyms_list = synonyms(random_word)if len(synonyms_list) > 0:synonym = random.choice(synonyms_list)new_words = [synonym if word == random_word else word for word in new_words]num_replaced += 1if num_replaced >= n:breakreturn ' '.join(new_words)

8. 技术栈与工具

8.1 数据处理框架

工具用途特点
Apache Spark大规模数据处理分布式计算,支持PB级数据
DaskPython并行计算扩展pandas/numpy到大数据
Ray分布式AI工作负载高性能,易于扩展
Apache Beam批流一体处理统一的编程模型

8.2 NLP专用工具

工具用途特点
spaCy工业级NLP快速、准确、易用
NLTKNLP教学和研究功能全面,文档丰富
Stanza多语言NLPStanford出品,学术质量
TextBlob简单NLP任务API简洁,适合初学者

8.3 深度学习框架集成

# PyTorch数据加载
from torch.utils.data import Dataset, DataLoaderclass TextDataset(Dataset):def __init__(self, texts, tokenizer, max_length=512):self.texts = textsself.tokenizer = tokenizerself.max_length = max_lengthdef __len__(self):return len(self.texts)def __getitem__(self, idx):text = self.texts[idx]encoding = self.tokenizer(text,truncation=True,padding='max_length',max_length=self.max_length,return_tensors='pt')return {'input_ids': encoding['input_ids'].flatten(),'attention_mask': encoding['attention_mask'].flatten()}# 创建数据加载器
dataset = TextDataset(texts, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

8.4 数据版本管理

# DVC (Data Version Control)
dvc init
dvc add data/corpus.txt
dvc remote add -d storage s3://mybucket/path
dvc push# Git LFS (Large File Storage)
git lfs track "*.bin"
git add .gitattributes
git add model.bin
git commit -m "Add model file"

9. 质量控制与验证

9.1 数据质量指标

import numpy as np
from collections import Counterdef calculate_data_statistics(texts):stats = {'total_documents': len(texts),'total_tokens': sum(len(text.split()) for text in texts),'avg_length': np.mean([len(text) for text in texts]),'std_length': np.std([len(text) for text in texts]),'vocabulary_size': len(set(' '.join(texts).split()))}# 词频分布all_words = ' '.join(texts).split()word_freq = Counter(all_words)stats['top_10_words'] = word_freq.most_common(10)return stats

9.2 数据集分割

from sklearn.model_selection import train_test_splitdef split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):assert train_ratio + val_ratio + test_ratio == 1.0# 第一次分割:训练集和临时集train_data, temp_data = train_test_split(data, test_size=1-train_ratio, random_state=42)# 第二次分割:验证集和测试集val_data, test_data = train_test_split(temp_data, test_size=test_ratio/(val_ratio+test_ratio), random_state=42)return {'train': train_data,'validation': val_data,'test': test_data}

10. 最佳实践

10.1 数据处理Pipeline

class DataProcessingPipeline:def __init__(self, config):self.config = configself.tokenizer = self._load_tokenizer()def _load_tokenizer(self):return AutoTokenizer.from_pretrained(self.config['tokenizer'])def process(self, raw_data):# 1. 清洗cleaned_data = self.clean(raw_data)# 2. 去重deduped_data = self.deduplicate(cleaned_data)# 3. 过滤filtered_data = self.filter(deduped_data)# 4. 规范化normalized_data = self.normalize(filtered_data)# 5. 分词tokenized_data = self.tokenize(normalized_data)return tokenized_datadef clean(self, data):return [clean_text(text) for text in data]def deduplicate(self, data):return exact_dedup(data)def filter(self, data):return [text for text in data if quality_filter(text)]def normalize(self, data):return [normalize_text(text) for text in data]def tokenize(self, data):return [self.tokenizer(text) for text in data]

10.2 性能优化建议

  1. 并行处理: 使用multiprocessing或Ray进行并行数据处理
  2. 批处理: 批量处理数据而非逐条处理
  3. 缓存机制: 缓存中间结果避免重复计算
  4. 内存管理: 使用生成器处理大文件,避免内存溢出
  5. 分布式存储: 使用HDFS或对象存储处理海量数据

10.3 常见问题与解决方案

问题解决方案
内存不足使用流式处理,分批加载数据
处理速度慢并行化处理,使用更高效的算法
数据不平衡采样策略,数据增强
编码问题统一使用UTF-8,使用ftfy修复
分词不准确使用领域特定词典,自定义分词规则

11. 监控与日志

import logging
from tqdm import tqdm# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('data_processing.log'),logging.StreamHandler()]
)def process_with_monitoring(data, process_func):logger = logging.getLogger(__name__)processed_data = []failed_items = []for item in tqdm(data, desc="Processing data"):try:result = process_func(item)processed_data.append(result)except Exception as e:logger.error(f"Failed to process item: {e}")failed_items.append(item)logger.info(f"Successfully processed: {len(processed_data)}")logger.info(f"Failed items: {len(failed_items)}")return processed_data, failed_items

12. 总结

数据准备是大语言模型训练的基础,直接影响模型的最终性能。关键要点:

  1. 数据质量优于数量: 高质量的小数据集往往优于低质量的大数据集

  2. 保持数据多样性: 确保数据覆盖不同领域、风格和难度

  3. 注重隐私保护: 严格处理PII信息,遵守相关法规

  4. 建立可复现的Pipeline: 版本控制、文档化、自动化

  5. 持续迭代优化: 根据模型表现不断改进数据处理策略

  6. 数据收集:介绍了常用的公开数据集(如Common Crawl、Wikipedia等)和数据收集工具

  7. 数据清洗

    • 精确去重和模糊去重技术
    • 质量过滤(语言检测、内容质量评估)
    • 隐私信息处理(PII检测和脱敏)
  8. 数据预处理:文本规范化、HTML清理、编码修复等技术

  9. 分词与编码:详细介绍了BPE、WordPiece、SentencePiece等主流分词器,以及中文分词的特殊处理

  10. 数据集构建:预训练和微调数据的不同格式要求

  11. 数据增强:回译、同义词替换等技术

  12. 技术栈

    • 大数据处理框架(Spark、Dask、Ray)
    • NLP工具(spaCy、NLTK、Stanza)
    • 深度学习框架集成(PyTorch、TensorFlow)
    • 数据版本管理(DVC、Git LFS)
  13. 质量控制:数据统计、数据集分割、监控日志等

  14. 最佳实践:完整的数据处理Pipeline设计和性能优化建议

http://www.xdnf.cn/news/19571.html

相关文章:

  • 基于 DNA 的原核生物与微小真核生物分类学:分子革命下的范式重构​
  • Shell编程(二):正则表达式
  • FastK v1.1 安装与使用-生信工具59
  • Gradle vs. Maven,Java 构建工具该用哪个?
  • 喜讯!华清远见参与制定的《电子产品印制电路板可制造性设计(DFM)和可靠性设计规范》正式发布
  • 【无标题】训练、推理适用的数据类型
  • 专题:2025全球新能源汽车供应链核心领域研究报告|附300+份报告PDF、数据仪表盘汇总下载
  • 关闭页面强制清除所有循环定时器
  • ES6手录02-字符串与函数的扩展
  • Kotlin 协程异步任务工具类:高效处理异步操作与超时控制
  • UE5 为啥原生的NotifyState写逻辑会有问题
  • 开源低代码平台(NocoBase)
  • 20250828的学习笔记
  • 9.1日IO作业
  • 2025年09月01日Github流行趋势
  • 99、23种设计模式之组合模式(8/23)
  • 09.《路由基础知识解析和实践》
  • 基于外部对照数据借用的临床试验统计分析方案设计与仿真研究
  • PitVis-2023挑战赛:内镜下垂体瘤手术视频中的手术流程识别|文献速递-深度学习人工智能医疗图像
  • 如何把指定阿里云文件夹下的所有文件移动到另一个文件夹下,移动文件时把文件名称(不包括文件后缀)进行md5编码
  • 从理论到实践,深入剖析数据库水平拆分的安全平滑落地
  • Spark自定义累加器实现高效WordCount
  • Spark和Spring整合处理离线数据
  • promptoMANIA-AI绘画提示词生成器
  • Electron使用WebAssembly实现CRC-16 CCITT校验
  • macOS中Homebrew安装PHP的详细步骤(五)
  • 深入了解Flink核心:Slot资源管理机制
  • PostgreSQL 索引大全
  • 深入理解Docker容器技术:原理与实践
  • 如何安装CUDA????