当前位置: 首页 > java >正文

基于LangChain + Milvus 实现RAG

使用 LangChain + Milvus(Lite 本地部署),并用 Hugging Face 的 thenlper/gte-large-zh 做 embedding。方案覆盖:环境准备、Milvus Lite 用法、embedding 实现、文本切片(chunking)、向量和文本入库、检索示例,以及常见注意点与扩展建议。


一、总结(要实现的目标)

用户输入一份大文本 → LangChain 把文本切片成多个 chunk → 用 thenlper/gte-large-zh 对每个 chunk 产生 embedding → 把 embedding 与对应 chunk(和元数据)写入 Milvus Lite(本地文件形式) → 支持后续相似检索 / RAG 等。


二、环境与先决条件

  • 操作系统:Ubuntu / macOS(Milvus Lite 支持)。

  • Python:建议 Python 3.10+。

  • 建议有 GPU(可选),否则在 CPU 上也能跑但较慢(亲测真的很慢)。

  • 需要网络以下载 HuggingFace 模型文件。

关键安装包(下面命令在虚拟环境中执行):

python -m venv venv && source venv/bin/activate
pip install -U pip# 必要包
pip install langchain pymilvus>=2.4.2 transformers torch sentencepiece# 可选:如果想用 langchain 社区的 Milvus 集成(取决于 langchain 版本)
pip install langchain-milvus langchain-community

说明:pymilvus >= 2.4.2 包含 Milvus Lite;使用本地文件 URI 会自动启用 Milvus Lite。(Milvus)


三、设计要点(简要)

  1. Chunk 切分:因为 gte-large-zh 的最大序列长度是 512 tokens(会截断更长的输入),所以 chunk 的长度要小于或等于 512 tokens。中文中 tokens 与字符近似但不完全等价,建议字符级 chunk_size 约 400–500 字符,并使用例如 50–100 的 overlap。这样能兼顾上下文与不超长截断。(Hugging Face)

  2. Embedding 实现:可直接用 transformersAutoTokenizer + AutoModel,从 outputs.last_hidden_state[:,0](CLS)或 mean-pooling 得到向量。thenlper/gte-large-zh 官方示例使用 outputs.last_hidden_state[:,0](CLS token)并做 L2 归一化作为 embedding。embedding 维度为 1024(gte-large-zh)。(Hugging Face)

  3. 向量库:使用 Milvus Lite(本地文件存储)。Milvus Lite 支持 FLAT 索引(受限),适用于原型 / 本地测试;生产上应使用 Milvus Standalone/Cluster。(Milvus)

  4. LangChain 集成:使用 LangChain 的 Milvus 向量存储适配器(示例使用 Milvus.from_documents 或构造器),把 Embeddings 对象传入。(python.langchain.com)


四、完整可运行示例代码

把下面内容保存为 ingest_and_query.py。它包含:读取文本文件、切分、embedding(基于 transformers + CLS pooling)、写入 Milvus Lite(通过 LangChain),并提供一个查询示例。

"""
ingest_and_query.py
Usage:python ingest_and_query.py ingest /path/to/large_text.txtpython ingest_and_query.py query "你的检索问题"
"""import sys
import os
from typing import List, Optional
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.documents import Document
# 如果你的 langchain 版本使用不同命名空间,请调整导入(下面两种导入形式二选一)
try:# newer: langchain-community / langchain_milvusfrom langchain_milvus import Milvus
except Exception:try:from langchain_community.vectorstores import Milvusexcept Exception:# 最后兜底:从 langchain.vectorstores 导入(视版本可能不可用)from langchain.vectorstores import Milvus  # type: ignorefrom langchain.embeddings.base import Embeddings# -------------------------
# Embeddings wrapper using thenlper/gte-large-zh
# -------------------------
class GTEEmbeddings(Embeddings):def __init__(self, model_name: str = "thenlper/gte-large-zh", device: Optional[str] = None, batch_size: int = 8):self.model_name = model_nameself.device = device or ("cuda" if torch.cuda.is_available() else "cpu")self.batch_size = batch_size# load tokenizer + modelself.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)self.model = AutoModel.from_pretrained(model_name).to(self.device)self.model.eval()# model dimension: checked on HF model card (gte-large-zh -> 1024)# self.dim = 1024  # optionaldef _embed_batch(self, texts: List[str]) -> List[List[float]]:all_embs = []with torch.no_grad():for i in range(0, len(texts), self.batch_size):batch = texts[i:i + self.batch_size]batch_inputs = self.tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device)outputs = self.model(**batch_inputs)# official example uses CLS token as embedding:embs = outputs.last_hidden_state[:, 0, :]  # shape (B, D)# L2 normalizeembs = F.normalize(embs, p=2, dim=1).cpu()all_embs.extend(embs.tolist())return all_embsdef embed_documents(self, texts: List[str]) -> List[List[float]]:return self._embed_batch(texts)def embed_query(self, text: str) -> List[float]:return self._embed_batch([text])[0]# -------------------------
# Helper: chunk text -> documents
# -------------------------
def chunk_text_to_documents(text: str, chunk_size: int = 400, chunk_overlap: int = 50, source: str = "input") -> List[Document]:splitter = CharacterTextSplitter(separator="\n", chunk_size=chunk_size, chunk_overlap=chunk_overlap)chunks = splitter.split_text(text)docs = []for i, c in enumerate(chunks):docs.append(Document(page_content=c, metadata={"source": source, "chunk": i}))return docs# -------------------------
# Ingest pipeline: read, chunk, embed, write to Milvus Lite via LangChain
# -------------------------
def ingest_file_to_milvus(txt_path: str, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):assert os.path.exists(txt_path), f"{txt_path} not found"with open(txt_path, "r", encoding="utf-8") as f:text = f.read()print("Splitting text...")docs = chunk_text_to_documents(text, chunk_size=400, chunk_overlap=80, source=os.path.basename(txt_path))print(f"Created {len(docs)} chunks.")print("Loading embedding model (this may take a while)...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")print("Writing to Milvus Lite (via LangChain)...")# from_documents 会对 documents 使用 embedding 计算向量并写入 Milvusvectorstore = Milvus.from_documents(documents=docs,embeddings=embeddings,collection_name=collection_name,connection_args={"uri": uri},drop_old=True,  # 如果希望覆盖旧 collection,设为 True)print("Ingest finished.")return vectorstore# -------------------------
# Query demo
# -------------------------
def query_milvus(query: str, k: int = 3, collection_name: str = "docs_collection", uri: str = "./milvus_demo.db"):print("Loading embeddings and vectorstore handle...")embeddings = GTEEmbeddings(model_name="thenlper/gte-large-zh")vs = Milvus(embeddings=embeddings, collection_name=collection_name, connection_args={"uri": uri})print("Searching...")docs = vs.similarity_search(query, k=k)for i, d in enumerate(docs):print(f"=== Result {i+1} ===")print("Score/metadata:", d.metadata)print(d.page_content[:600].rstrip())print()# -------------------------
# CLI
# -------------------------
def main():if len(sys.argv) < 2:print(__doc__)returncmd = sys.argv[1]if cmd == "ingest":if len(sys.argv) < 3:print("Usage: ingest /path/to/large_text.txt")returningest_file_to_milvus(sys.argv[2])elif cmd == "query":if len(sys.argv) < 3:print('Usage: query "你的检索问题"')returnquery = sys.argv[2]query_milvus(query)else:print("Unknown command:", cmd)if __name__ == "__main__":main()

五、如何运行(示例)

  1. 启动虚拟环境并安装依赖(见上文)。

  2. 保存上面的脚本 ingest_and_query.py

  3. 准备一个大文本文件 large_text.txt(UTF-8)。

  4. 执行入库:

python ingest_and_query.py ingest ./large_text.txt
# 这会创建本地 Milvus Lite 文件: ./milvus_demo.db
  1. 运行检索示例:

python ingest_and_query.py query "中国的首都是哪里?"

六、设计与工程注意点(重要)

  1. Chunk 大小与模型截断:gte-large-zh 最大 512 tokens,会截断更长文本,所以请确保每个 chunk 在这个长度内(示例用 400 字符 + overlap)。若你想在 chunk 中保留更多语义,可做层次切分(先大段拆为若干段,再在需要时再二次细拆)。(Hugging Face)

  2. 向量维度 & 集合 schema:gte-large-zh 输出维度 1024。Milvus collection 的 dimension 要与之对应(LangChain 会处理此细节,但如果你用 pymilvus 原生接口建 collection,要设置 dimension=1024)。(Hugging Face)

  3. Milvus Lite 限制:Milvus Lite 只适合原型/本地测试,只支持 FLAT 索引、不支持 partition 的一些高级特性(比如用于多租户的 partition_key)。若数据量大或需要 partition、高级索引,请部署 Milvus Standalone/Cluster。(Milvus)

  4. 持久化:Milvus Lite 使用本地文件(e.g. ./milvus_demo.db),数据会持久化到该文件。你可以用 milvus-lite dump 导出数据迁移到集群。(Milvus)

  5. 性能 / 批量:对大量文本要批量计算 embedding 并批量 insert(脚本里用 LangChain.from_documents,内部会分批),如果数据量很大,考虑使用 pymilvus[bulk_writer] 的 bulk 插入或把 embedding 计算分布到多台机器上。(Milvus)


七、常见坑与排查

  • 如果 huggingface 下载模型太慢,可先预先 transformers 下载并缓存,或使用镜像/代理。

  • 若模型加载报错 trust_remote_code,可在 from_pretrained 中传 trust_remote_code=True(若模型包含自定义架构)。但 gte-large-zh 通常可以直接加载。(Hugging Face)

  • 若 LangChain 的 Milvus 类导入失败,请检查你安装的包名(langchain-milvus / langchain-community / langchain_milvus 的 API 随版本变动)。如果出问题,可直接使用 pymilvus 的原生 API:MilvusClient("./milvus_demo.db") 创建、create_collectioninsert 等操作(官方文档给有示例)。(Milvus, python.langchain.com)


八、扩展建议(当你准备从本地原型走向生产)

  1. 切换到 Milvus Standalone/Distributed:支持更高级索引(IVF_PQ、HNSW 等)、partition、并发等。(python.langchain.com)

  2. 索引类型调优:FLAT 适用于小规模,数据变多后换更快的近似索引并调参(nlist / nprobe 等)。

  3. 多租户 / 权限:Milvus Lite 不支持用户/角色;生产环境通过 Milvus 服务或 Zilliz Cloud 管理。(Milvus)

  4. Embedding 服务化:若多人并发或需要降低模型部署成本,可把 embedding 模型放到一个推理服务(如 xinference、TorchServe、HF Inference、ONNX或自建 FastAPI 服务),LangChain 侧只调用该服务拿 embedding。

  5. 缓存 & 重用:对重复 chunk 做去重/缓存,避免重复计算 embedding。可以基于 chunk 的 hash 来判断是否已存在。


九、参考(关键文档)

  • Milvus Lite 文档(如何本地运行、API 示例):官方 Run Milvus Lite。(Milvus)

  • Milvus + LangChain 集成文档(示例、from_documents、connection_args 用法):Milvus 与 LangChain 集成。(python.langchain.com)

  • thenlper/gte-large-zh 模型页(max seq len=512, dimension=1024,示例用 CLS token):Hugging Face 模型页面。(Hugging Face)

http://www.xdnf.cn/news/18453.html

相关文章:

  • Linux学习-网络编程2
  • Zynq开发实践(fpga高频使用的两个场景)
  • Elasticsearch Rails 实战全指南(elasticsearch-rails / elasticsearch-model)
  • VLLM部署gpt-oss-20b踩坑记录
  • chrome driver在Mac上运行时提示安全问题怎么解决
  • STM32 - Embedded IDE - GCC - 重定向printf到串口
  • jmeter
  • [docker/大数据]Spark快速入门
  • DS 0 | 数据结构学习:前言
  • MySQL的事务
  • 24.解构赋值
  • 3 种无误的方式删除 Itel 手机上的短信
  • K8S - NetworkPolicy的使用
  • 【小白笔记】 MNN 移动端大模型部署
  • 【普通地质学】构造运动与地质构造
  • unbuntu 20.04 docker 部署wordpress
  • 一体化伺服电机在特种机器人(炉管爬行器)中的应用案例
  • LLM实践系列:利用LLM重构数据科学流程03- LLM驱动的数据探索与清洗
  • 微服务介绍及Nacos中间件
  • 算法 之 拓 扑 排 序
  • Pycharm SSH连接
  • Android15 AndroidV冻结和解冻的场景
  • 学习Linux嵌入式(正点原子imx课程)开发到底是在学什么
  • 【Linux | 网络】多路转接IO之select
  • Python 面向对象编程入门:从思想到属性操作
  • 图(Graph):关系网络的数学抽象
  • 3维模型导入到3Dmax中的修改色彩简单用法----第二讲
  • 零成本加速:EdgeOne免费套餐3分钟接入指南
  • MYSQL库及表的操作
  • 奈飞工厂:算法优化实战 —— 从推荐系统到内容分发