LangChain文档加载器自动选择器:支持多种文件格式的统一加载方法
📚 LangChain 入门:详解 Loader 与 Splitter 的作用与使用方法
一、前言
随着大语言模型(LLM)和 RAG(Retrieval-Augmented Generation)技术的发展,LangChain 成为了构建 AI 应用的重要工具链之一。其中,Loader
和 Splitter
是两个基础但非常关键的模块。
本文将从“简单易懂 + 实战示例”的角度出发,详细讲解这两个模块的作用、区别以及常用类的使用方法,帮助你快速上手 LangChain!
二、什么是 Loader?
🔍 定义:
Loader
(加载器)是用于**读取各种格式文件中的内容并转换为统一的数据结构(如 Document 对象)**的组件。
✅ 功能特点:
- 支持多种文件格式(PDF、Word、TXT、CSV 等)
- 输出统一格式的
Document
列表 - 可作为后续文本处理(切分、嵌入、检索)的输入
🧰 常见的 Loader 类型:
类名 | 说明 |
---|---|
PDFLoader | 加载 PDF 文件内容 |
CSVLoader | 加载 CSV 表格数据 |
TextLoader | 加载纯文本文件 |
UnstructuredFileLoader | 通用非结构化文件加载器 |
RapidOCRPDFLoader | OCR 提取 PDF 图片中的文字 |
🧪 示例代码:
from langchain.document_loaders import PDFLoaderloader = PDFLoader("example.pdf")
docs = loader.load()
print(docs[0].page_content) # 打印第一页内容
三、什么是 Splitter?
🔍 定义:
Splitter
(切分器)是用于**将一段长文本按规则切分为多个较小块(chunk)**的组件。
✅ 功能特点:
- 控制每段长度(
chunk_size
) - 设置相邻段重叠部分(
chunk_overlap
),保留上下文 - 支持多种切分策略,适应不同场景(如中文、Markdown、句子边界)
🧰 常见的 Splitter 类型:
类名 | 说明 |
---|---|
CharacterTextSplitter | 按字符数量进行切分 |
RecursiveCharacterTextSplitter | 按递归结构切分,默认顺序:\n\n > \n > 空格 |
MarkdownHeaderTextSplitter | 按 Markdown 标题层级切分 |
SpacyTextSplitter | 基于 SpaCy 分句,适合英文语义切分 |
🧪 示例代码:
from langchain.text_splitter import RecursiveCharacterTextSplittersplitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
chunks = splitter.split_documents(docs)for i, chunk in enumerate(chunks):print(f"Chunk {i+1}:\n{chunk.page_content[:200]}...") # 打印前200字预览
四、Loader 与 Splitter 的关系总结
对比项 | Loader | Splitter |
---|---|---|
中文名称 | 文档加载器 | 文本切分器 |
主要功能 | 将文件读取为文本或 Document 对象 | 将长文本切分为小块 |
输入类型 | 文件路径、URL、原始字符串等 | 文本字符串或 Document 对象 |
输出类型 | List[Document] | List[Document](内容更短) |
使用目的 | 获取原始数据 | 准备用于 Embedding 或检索的数据 |
✅ 总结一句话:
Loader 是负责“读进来”,Splitter 是负责“拆开来”。一个管输入,一个管处理。
五、典型应用场景(结合使用)
from langchain.document_loaders import PDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter# Step 1: 加载 PDF 文件
loader = PDFLoader("my_document.pdf")
docs = loader.load()# Step 2: 切分文档
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
chunks = splitter.split_documents(docs)# Step 3: 后续可用于 embedding、存入向量数据库等
六、常见问题与注意事项
问题 | 解答 |
---|---|
如何选择合适的 Chunk Size? | 一般设置为 512~1024,具体视模型最大 token 数而定 |
Chunk Overlap 是否必须? | 推荐设置为 chunk_size 的 10% 左右,保留上下文信息 |
如何处理中文? | 使用 RecursiveCharacterTextSplitter 即可,支持中文切分 |
如何提升切分效率? | 可先加载再切分,避免多次 IO 操作 |
七、结语
在 LangChain 构建的知识问答系统、RAG 系统中,Loader
和 Splitter
是整个流程中最基础也是最关键的两个环节。掌握它们的使用方法,有助于你更好地理解整个系统的构建逻辑。
希望这篇文章能帮助你快速理解 Loader 与 Splitter 的核心概念与实战应用!
八、代码
八一 、路径管理与校验
管理知识库文件夹结构,并列出其中的知识库和文档路径,主要用于构建本地知识库系统。管理知识库根目录、文档目录路径。支持递归列出所有合法文档文件,过滤临时文件与软链接。
def validate_kb_name(knowledge_base_id: str) -> bool:# 检查是否包含预期外的字符或路径攻击关键字if "../" in knowledge_base_id:return Falsereturn Truedef get_kb_path(knowledge_base_name: str):return os.path.join(KB_ROOT_PATH, knowledge_base_name)def get_doc_path(knowledge_base_name: str):return os.path.join(get_kb_path(knowledge_base_name), "content")def get_file_path(knowledge_base_name: str, doc_name: str):return os.path.join(get_doc_path(knowledge_base_name), doc_name)
def list_kbs_from_folder():return [f for f in os.listdir(KB_ROOT_PATH)if os.path.isdir(os.path.join(KB_ROOT_PATH, f))]#列出所有的文件,但是判断,然后zhiqu
def list_files_from_folder(kb_name: str):doc_path = get_doc_path(kb_name)result = []def is_skiped_path(path: str):tail = os.path.basename(path).lower()for x in ["temp", "tmp", ".", "~$"]:if tail.startswith(x):return Truereturn Falsedef process_entry(entry):if is_skiped_path(entry.path):returnif entry.is_symlink():target_path = os.path.realpath(entry.path)with os.scandir(target_path) as target_it:for target_entry in target_it:process_entry(target_entry)elif entry.is_file():file_path = (Path(os.path.relpath(entry.path, doc_path)).as_posix()) # 路径统一为 posix 格式result.append(file_path)elif entry.is_dir():with os.scandir(entry.path) as it:for sub_entry in it:process_entry(sub_entry)with os.scandir(doc_path) as it:for entry in it:process_entry(entry)return result
八二 、文档加载器支持模块
• 根据文件扩展名选择文档加载器。
• 支持 chardet 自动编码识别(如 CSV 文件)。
• 优先使用自定义 document_loaders,否则使用 LangChain 默认加载器。
def get_LoaderClass(file_extension):for LoaderClass, extensions in LOADER_DICT.items():if file_extension in extensions:return LoaderClass# 把一些向量化共用逻辑从KnowledgeFile抽取出来,等langchain支持内存文件的时候,可以将非磁盘文件向量化
def get_loader(loader_name: str, file_path: str, loader_kwargs: Dict = None):'''根据loader_name和文件路径或内容返回文档加载器。'''loader_kwargs = loader_kwargs or {}try:if loader_name in ["RapidOCRPDFLoader", "RapidOCRLoader","FilteredCSVLoader"]:document_loaders_module = importlib.import_module('document_loaders')else:document_loaders_module = importlib.import_module('langchain.document_loaders')#动态倒入模块并获取模块的特定属性DocumentLoader = getattr(document_loaders_module, loader_name)## 获取模块中的 CSVLoader 类except Exception as e:document_loaders_module = importlib.import_module('langchain.document_loaders')DocumentLoader = getattr(document_loaders_module, "UnstructuredFileLoader")if loader_name == "UnstructuredFileLoader":loader_kwargs.setdefault("autodetect_encoding", True)elif loader_name == "CSVLoader":if not loader_kwargs.get("encoding"):# 如果未指定 encoding,自动识别文件编码类型,避免langchain loader 加载文件报编码错误with open(file_path, 'rb') as struct_file:encode_detect = chardet.detect(struct_file.read())if encode_detect is None:encode_detect = {"encoding": "utf-8"}loader_kwargs["encoding"] = encode_detect["encoding"]## TODO:支持更多的自定义CSV读取逻辑elif loader_name == "JSONLoader":loader_kwargs.setdefault("jq_schema", ".")loader_kwargs.setdefault("text_content", False)elif loader_name == "JSONLinesLoader":loader_kwargs.setdefault("jq_schema", ".")loader_kwargs.setdefault("text_content", False)loader = DocumentLoader(file_path, **loader_kwargs)return loader
八三、 文件类封装:KnowledgeFile
封装了一个文件的路径、加载器、分词器、Document 缓存等,用于后续向量化流程。
class KnowledgeFile:def __init__(self,filename: str,knowledge_base_name: str,loader_kwargs: Dict = {},):'''对应知识库目录中的文件,必须是磁盘上存在的才能进行向量化等操作。'''self.kb_name = knowledge_base_nameself.filename = str(Path(filename).as_posix())#使用正斜杠 / 作为路径分隔符,而不管当前操作系统如何。path将会把str转换为路径self.ext = os.path.splitext(filename)[-1].lower()#判断文件的格式if self.ext not in SUPPORTED_EXTS:raise ValueError(f"暂未支持的文件格式 {self.filename}")self.loader_kwargs = loader_kwargsself.filepath = get_file_path(knowledge_base_name, filename)self.docs = Noneself.splited_docs = Noneself.document_loader_name = get_LoaderClass(self.ext)self.text_splitter_name = TEXT_SPLITTER_NAMEdef file2docs(self, refresh: bool = False):#这里代表的是将文件加载为documentif self.docs is None or refresh:#表示是否强制重新加载文档。logger.info(f"{self.document_loader_name} used for {self.filepath}")loader = get_loader(loader_name=self.document_loader_name,file_path=self.filepath,loader_kwargs=self.loader_kwargs)self.docs = loader.load()return self.docs#def file2text(self,zh_title_enhance: bool = ZH_TITLE_ENHANCE,refresh: bool = False,chunk_size: int = CHUNK_SIZE,chunk_overlap: int = OVERLAP_SIZE,text_splitter: TextSplitter = None,):if self.splited_docs is None or refresh:docs = self.file2docs()#加载为Documentreturn docs
八四、并发处理封装
• 将多个 KnowledgeFile 批量转为 LangChain Document,支持线程池与单线程模式。
• 错误日志捕获,结构化返回是否成功和错误信息。
def files2docs_in_thread(files: List[Union[KnowledgeFile, Tuple[str, str], Dict]],chunk_size: int = CHUNK_SIZE,chunk_overlap: int = OVERLAP_SIZE,zh_title_enhance: bool = ZH_TITLE_ENHANCE,
) -> Generator:'''利用多线程批量将磁盘文件转化成langchain Document.如果传入参数是Tuple,形式为(filename, kb_name)生成器返回值为 status, (kb_name, file_name, docs | error)'''def file2docs(*, file: KnowledgeFile, **kwargs) -> Tuple[bool, Tuple[str, str, List[Document]]]:try:return True, (file.kb_name, file.filename, file.file2text(**kwargs))except Exception as e:msg = f"从文件 {file.kb_name}/{file.filename} 加载文档时出错:{e}"logger.error(f'{e.__class__.__name__}: {msg}',exc_info=e if log_verbose else None)return False, (file.kb_name, file.filename, msg)kwargs_list = []for i, file in enumerate(files):kwargs = {}try:if isinstance(file, tuple) and len(file) >= 2:filename = file[0]kb_name = file[1]file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)elif isinstance(file, dict):filename = file.pop("filename")kb_name = file.pop("kb_name")kwargs.update(file)file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)kwargs["file"] = filekwargs["chunk_size"] = chunk_sizekwargs["chunk_overlap"] = chunk_overlapkwargs["zh_title_enhance"] = zh_title_enhancekwargs_list.append(kwargs)print('')except Exception as e:yield False, (kb_name, filename, str(e))for result in run_in_thread_pool(func=file2docs, params=kwargs_list):yield result
def files2docs_in_thread_(files: List[Union[KnowledgeFile, Tuple[str, str], Dict]],chunk_size: int = CHUNK_SIZE,chunk_overlap: int = OVERLAP_SIZE,zh_title_enhance: bool = ZH_TITLE_ENHANCE,
) -> Generator:"""单线程操作"""def file2docs(*, file: KnowledgeFile, **kwargs) -> Tuple[bool, Tuple[str, str, List[Document]]]:try:return True, (file.kb_name, file.filename, file.file2text(**kwargs))except Exception as e:msg = f"从文件 {file.kb_name}/{file.filename} 加载文档时出错:{e}"logger.error(f'{e.__class__.__name__}: {msg}',exc_info=e if log_verbose else None)return False, (file.kb_name, file.filename, msg)for i, file in enumerate(files):kwargs = {}try:kwargs.update({"file": file,"chunk_size": chunk_size,"chunk_overlap": chunk_overlap,"zh_title_enhance": zh_title_enhance})yield file2docs(**kwargs)except Exception as e:yield False, (file.kb_name, file.filename, str(e))
def run_in_thread_pool(func: Callable,params: List[Dict] = [],
) -> Generator:'''在线程池中批量运行任务,并将运行结果以生成器的形式返回。请确保任务中的所有操作是线程安全的,任务函数请全部使用关键字参数。'''tasks = []with ThreadPoolExecutor() as pool:for kwargs in params:thread = pool.submit(func, **kwargs)tasks.append(thread)for obj in as_completed(tasks): # TODO: Ctrl+c无法停止yield obj.result()
八五、JSON 行格式加载器扩展
• 继承 LangChain 的 JSONLoader,强制启用 .jsonl 行式读取。
• 通过 monkey-patch 的方式注册到 langchain.document_loaders。
# patch json.dumps to disable ensure_ascii
def _new_json_dumps(obj, **kwargs):kwargs["ensure_ascii"] = Falsereturn _origin_json_dumps(obj, **kwargs)if json.dumps is not _new_json_dumps:_origin_json_dumps = json.dumpsjson.dumps = _new_json_dumpsclass JSONLinesLoader(langchain.document_loaders.JSONLoader):'''行式 Json 加载器,要求文件扩展名为 .jsonl'''def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._json_lines = Truelangchain.document_loaders.JSONLinesLoader = JSONLinesLoader
八六、文档向量化执行主流程
• folder2db: 遍历知识库文件夹,加载文件→文档→向量化→写入向量库(实际此处尚未实现入库)。
• init_database: 程序入口,调用 folder2db 实现重建 VS。
def folder2db(kb_names: List[str],mode: Literal["recreate_vs", "update_in_db", "increament"],vs_type: Literal["faiss", "milvus", "pg", "chromadb"] = None,#DEFAULT_VS_TYPE = "faiss"embed_model: str = None,#EMBEDDING_MODEL = "qwen-api"chunk_size: int = CHUNK_SIZE,#CHUNK_SIZE = 250chunk_overlap: int = OVERLAP_SIZE,#OVERLAP_SIZE = 50zh_title_enhance: bool = False,#ZH_TITLE_ENHANCE = False
):def files2vs(kb_name: str, kb_files: List[KnowledgeFile]):for success, result in files2docs_in_thread_(kb_files,chunk_size=chunk_size,chunk_overlap=chunk_overlap,zh_title_enhance=zh_title_enhance):if success:_, filename, docs = resultprint(f"正在将 {kb_name}/{filename} 添加到向量库,共包含{len(docs)}条文档")kb_file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)kb_file.splited_docs = docselse:print(result)kb_names = kb_names or list_kbs_from_folder()#['samples', '论文知识库']for kb_name in kb_names:#vs_type = 'faiss';kb_name = 'kb_name';embed_model = 'qwen-api'# 清除向量库,从本地文件重建if mode == "recreate_vs":kb_files = file_to_kbfile(kb_name, list_files_from_folder(kb_name))files2vs(kb_name, kb_files)else:print(f"unspported migrate mode: {mode}")# 定义主逻辑函数
def init_database(recreate_vs=True,kb_name=[],embed_model=None
):start_time = datetime.now()if recreate_vs:folder2db(kb_names=kb_name, mode="recreate_vs", embed_model=embed_model)end_time = datetime.now()print(f"Total time taken: {end_time - start_time}")# 示例调用
if __name__ == "__main__":# 示例调用,可以在此处调整参数直接运行init_database(recreate_vs=True,kb_name=[],embed_model="qwen-api")
🎯 下期预告:
《LangChain 进阶:Embedding 与 VectorStore 的使用指南》
敬请期待!🚀