langchain源码概览
概述
LangChain 是一个用于构建大语言模型应用的强大框架,它通过可组合的组件和第三方集成简化AI应用开发,同时在底层技术发展时保持决策的前瞻性。
🌟 核心特点
🧩 模块化设计
独立的抽象组件,不绑定特定模型提供商
🔗 统一接口
通过Runnable接口提供一致的调用方式
📝 声明式编程
LCEL表达式语言简化组件组合
🚀 生产就绪
内置流式处理、异步支持、跟踪等企业级特性
项目架构
🏗️ 分层架构设计
LangChain 生态系统
├── langchain-core (核心抽象层)
│ ├── Runnable (通用调用接口)
│ ├── Language Models (语言模型抽象)
│ ├── Prompts (提示模板)
│ ├── Output Parsers (输出解析器)
│ ├── Retrievers (检索器接口)
│ ├── Vectorstores (向量存储)
│ ├── Messages (消息类型)
│ └── Tools (工具接口)
├── langchain (主要实现)
│ ├── Agents (智能代理)
│ ├── Chains (调用链)
│ ├── LLMs (大语言模型)
│ ├── Chat Models (对话模型)
│ ├── Document Loaders (文档加载器)
│ ├── Embeddings (嵌入模型)
│ ├── Memory (记忆机制)
│ ├── Retrievers (检索器实现)
│ └── Callbacks (回调系统)
├── partners/ (第三方集成)
│ ├── OpenAI
│ ├── Anthropic
│ ├── Hugging Face
│ ├── Chroma │
└── Ollama
├── langchain-cli (命令行工具)
└── text-splitters (文本分割器)
📁 目录结构解析
/libs/core/ - 核心抽象层
- 职责: 定义所有基础接口和抽象
- 特点: 轻量级依赖,无第三方集成
- 关键模块:
runnables/
- Runnable接口实现language_models/
- 语言模型基类prompts/
- 提示模板系统output_parsers/
- 输出解析器
/libs/langchain/ - 主要实现层
- 职责: 具体功能实现和高级抽象
- 关键模块:
agents/
- 智能代理系统chains/
- 复合调用链tools/
- 外部工具集成memory/
- 对话记忆机制
/libs/partners/ - 第三方集成层
- 职责: 各种服务商和工具的集成
- 包含: OpenAI、Anthropic、HuggingFace等
核心概念详解
🔑 1. Runnable 接口 - 统一调用协议
Runnable 是整个LangChain的核心接口,为所有组件提供统一的调用方式:
# 基础调用方法
.invoke(input) # 单次同步调用
.ainvoke(input) # 单次异步调用
.batch(inputs) # 批量处理
.stream(input) # 流式输出
.astream(input) # 异步流式输出# 高级功能
.with_config(config) # 配置管理
.with_fallbacks([...]) # 故障转移
.with_retry() # 重试机制
Runnable 的优势:
- ✅ 统一接口 - 所有组件都有相同的调用方式
- ✅ 自动优化 - 内置并行化和流式处理
- ✅ 可组合性 - 轻松链接不同组件
- ✅ 可观测性 - 自动跟踪和监控
🔗 2. LCEL (LangChain Expression Language)
LCEL 是一种声明式语言,用于组合LangChain组件:
# 基础语法
chain = prompt | model | output_parser# 并行处理
chain = {"context": retriever,"question": RunnablePassthrough()
} | prompt | model | output_parser# 条件分支
chain = RunnableBranch((lambda x: "math" in x["topic"], math_chain),(lambda x: "science" in x["topic"], science_chain),general_chain
)
LCEL 的特性:
- 🔄 自动并行化 - 自动识别可并行的操作
- 🌊 内置流式支持 - 实时输出中间结果
- 📊 自动跟踪 - 完整的执行链路跟踪
- ⚡ 异步支持 - 原生支持异步操作
🧩 3. 组件抽象层
语言模型层次结构
BaseLanguageModel (抽象基类)
├── LLM (文本输入输出)
│ ├── OpenAI
│ ├── Anthropic
│ └── HuggingFace
└── ChatModel (消息格式)
├── ChatOpenAI
├── ChatAnthropic
└── ChatOllama
提示模板层次结构
BasePromptTemplate
├── StringPromptTemplate
│ ├── PromptTemplate
│ └── FewShotPromptTemplate
└── BaseChatPromptTemplate
├── ChatPromptTemplate
└── MessagesPlaceholder
主要模块分析
🤖 1. Agents (智能代理)
位置: libs/langchain/langchain/agents/
智能代理是LangChain的高级功能,能够根据输入动态决定执行哪些操作:
核心组件:
- Agent - 决策制定者
- AgentExecutor - 执行器和安全控制
- Tools - 可用工具集合
- Memory - 对话历史记忆
常见代理类型:
# ReAct代理 - 推理和行动
agent = create_react_agent(llm, tools, prompt)# OpenAI Functions代理 - 函数调用
agent = create_openai_functions_agent(llm, tools, prompt)# Tool Calling代理 - 工具调用
agent = create_tool_calling_agent(llm, tools, prompt)
⛓️ 2. Chains (调用链)
位置: libs/langchain/langchain/chains/
Chains将多个组件组合成更复杂的应用:
核心链类型:
LLMChain - 基础链
from langchain.chains import LLMChainchain = LLMChain(llm=model,prompt=prompt,output_parser=parser
)
RetrievalQA - 检索问答链
from langchain.chains import RetrievalQAqa_chain = RetrievalQA.from_chain_type(llm=model,chain_type="stuff", # map_reduce, refine, map_rerankretriever=vectorstore.as_retriever()
)
ConversationalRetrievalChain - 对话检索链
from langchain.chains import ConversationalRetrievalChainconversation_chain = ConversationalRetrievalChain.from_llm(llm=model,retriever=vectorstore.as_retriever(),memory=memory
)
🧠 3. Memory (记忆机制)
位置: libs/langchain/langchain/memory/
为对话提供上下文记忆:
记忆类型:
# 对话缓冲记忆
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory()# 对话摘要记忆
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(llm=model)# 对话知识图谱记忆
from langchain.memory import ConversationKGMemory
memory = ConversationKGMemory(llm=model)
🔍 4. Retrievers (检索器)
位置: libs/langchain/langchain/retrievers/
用于从外部数据源检索相关信息:
检索器类型:
# 向量存储检索器
retriever = vectorstore.as_retriever(search_type="similarity",search_kwargs={"k": 4}
)# 多查询检索器
from langchain.retrievers import MultiQueryRetriever
retriever = MultiQueryRetriever.from_llm(retriever=base_retriever,llm=model
)# 压缩检索器
from langchain.retrievers import ContextualCompressionRetriever
retriever = ContextualCompressionRetriever(base_compressor=compressor,base_retriever=base_retriever
)
代码示例与最佳实践
🚀 1. 基础链式调用
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic# 组件定义
prompt = ChatPromptTemplate.from_template("你是一个专业的{domain}专家。请回答:{question}"
)
model = ChatAnthropic(model="claude-3-sonnet")
parser = StrOutputParser()# LCEL链式组合
chain = prompt | model | parser# 调用
result = chain.invoke({"domain": "机器学习","question": "什么是梯度下降?"
})
print(result)
🛠️ 2. 自定义工具和代理
from langchain_core.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate# 定义自定义工具
@tool
def calculate_area(length: float, width: float) -> float:"""计算矩形面积"""return length * width@tool
def get_weather(city: str) -> str:"""获取指定城市的天气信息"""# 实际应用中这里会调用天气APIreturn f"{city}今天晴,温度25°C"# 创建代理提示
prompt = ChatPromptTemplate.from_messages([("system", "你是一个有用的助手,可以使用提供的工具回答问题。"),("human", "{input}"),("placeholder", "{agent_scratchpad}"),
])# 工具列表
tools = [calculate_area, get_weather]# 创建并运行代理
agent = create_tool_calling_agent(model, tools, prompt)
agent_executor = AgentExecutor(agent=agent,tools=tools,verbose=True,handle_parsing_errors=True
)# 测试代理
result = agent_executor.invoke({"input": "请帮我计算一个长5米、宽3米的房间面积,然后告诉我北京的天气"
})
📚 3. RAG检索增强生成系统
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA# 1. 文档加载和分割
loader = TextLoader("knowledge_base.txt")
documents = loader.load()text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200
)
texts = text_splitter.split_documents(documents)# 2. 向量化存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents=texts,embedding=embeddings,persist_directory="./chroma_db"
)# 3. 创建检索QA链
qa_chain = RetrievalQA.from_chain_type(llm=model,chain_type="stuff",retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),return_source_documents=True
)# 4. 查询
query = "LangChain的核心特性是什么?"
result = qa_chain.invoke({"query": query})print("答案:", result["result"])
print("来源文档:", result["source_documents"])
💬 4. 对话式应用
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain# 设置记忆
memory = ConversationBufferMemory(memory_key="chat_history",return_messages=True
)# 创建对话检索链
conversation_chain = ConversationalRetrievalChain.from_llm(llm=model,retriever=vectorstore.as_retriever(),memory=memory,verbose=True
)# 多轮对话
print("开始对话(输入'quit'退出):")
while True:question = input("你: ")if question.lower() == 'quit':breakresponse = conversation_chain.invoke({"question": question})print(f"AI: {response['answer']}")
🔄 5. 流式处理示例
async def streaming_example():"""展示流式处理能力"""# 创建流式链chain = prompt | model | StrOutputParser()print("流式输出:")async for chunk in chain.astream({"topic": "人工智能的未来"}):print(chunk, end="", flush=True)print("\n")# 运行流式示例
import asyncio
asyncio.run(streaming_example())