【经验分享】Dify+GraphRAG实现知识图谱智能问答
文章目录
- 前情提要
- 先看结果
- 1、问题
- 2、思路
- (1)获取GraphRAG的上下文
- (2)将上下文通过api方式传递给dify
- 3、尝试
- (1)正常执行查询步骤,从中截断上下文并且输出
- a.查询步骤
- b.输出上下文
- c.将执行的查询命令封装成api,并且截取特定部分输出
- (2)添加LocalSearch API模块
- a. 核心思想
- b. 修改search.py
- c. 添加LocalSearchAPI
- d. 封装为api
- 4、接入Dify
- (1)各节点设计
- a. HTTP请求节点
- b. 代码节点
- 5、结果
- 运行
- 至此,问题解决
前情提要
相信各位点进这篇博客的朋友或多或少有与题目相关的需求,目前我的需求是需要将graphrag和dify集成,实现一个大模型根据知识图谱生成回答的功能,那么本篇要求读者拥有以下基础:
- GraphRAG:本篇使用Graph-Local-Ollama,项目较轻,适合测试
- Dify
- FastAPI
先看结果
1、问题
最近在研究知识图谱方面,正好目前有个需求是如何将GraphRAG与Dify集成,能让大模型根据知识图谱返回答案,那么就开始在网上搜集资料,在B站上看到一个UP主讲解了一下,感觉还不错,但是因为GraphRAG更新迭代太快,项目结构一直在变,导致不能适配当前我的项目,评论区也有跟我同样诉求的,但是这位UP主的思路可以借鉴,源码地址 ↓
https://github.com/brightwang/graphrag-dify
2、思路
(1)获取GraphRAG的上下文
因为GraphRAG的查询整体流程是:首先从构建好的parquent文件中索引出社区报告、节点、边、以及原文分块,然后将这些上下文内容传给大模型,让它生成答案。
那么,我们可以不要生成答案的部分,只要graphrag检索出 的上下文。
(2)将上下文通过api方式传递给dify
在第一步生成的上下文,用fastapi写一个接口,在dify里创建工作流,去使用http请求节点去返回上下文,再走大模型回答步骤。
3、尝试
(1)正常执行查询步骤,从中截断上下文并且输出
a.查询步骤
正常的查询命令是这样的,生成的答案也是只有大模型返回的答案,所以需要去源码里找到输出答案的部分,让它把上下文也输出。
python -m graphrag.query --root /to/your/path --method local "your question"
b.输出上下文
在下面的py文件中添加一行,输出上下文,并且手动添加分隔符
# graphrag-local-ollama\graphrag\query\cli.pysearch_engine = get_local_search_engine(config,reports=read_indexer_reports(final_community_reports, final_nodes, community_level),text_units=read_indexer_text_units(final_text_units),entities=entities,relationships=read_indexer_relationships(final_relationships),covariates={"claims": covariates},description_embedding_store=description_embedding_store,response_type=response_type,)result = search_engine.search(query=query)#添加上下文输出,并且手动添加分隔符,方便后期分割reporter.success("---datasets---"+result.context_text + "---END---")reporter.success(f"Local Search Response: {result.response}")return result.response
c.将执行的查询命令封装成api,并且截取特定部分输出
try:# 执行命令并捕获输出result = subprocess.run(command, capture_output=True, text=True, encoding="utf-8",check=True)# 检查输出是否为 Noneif result.stdout is None:print("No output from the command.")else:# 使用分隔符截取需要的内容start_delimiter = "---datasets---"end_delimiter = "---END---"output = result.stdout.strip()# 查找分隔符的位置start_index = output.find(start_delimiter)end_index = output.find(end_delimiter)if start_index != -1 and end_index != -1:# 截取分隔符之间的内容output = output[start_index + len(start_delimiter):end_index].strip()else:output = "Delimiter not found in the output."return {"output": output}
(2)添加LocalSearch API模块
方法1的话,我们会发现,他还是要走大模型那一步,只是最后截取了需要的部分,响应时间还是受影响,没有达到我们的最终目的。
因为这个项目的api模块作者为了轻量化全部删除,所以我们单独写一个LocalSearchAPI模块即可。
a. 核心思想
resopnse_type添加一个search_reponse选项,如果命令走了这个选项,那么大模型的回答设置为空(不走生成回复流程),直接返回上下文。
b. 修改search.py
修改 graphrag/query/structured_search/local_search/search.py
async def asearch(self,query: str,conversation_history: ConversationHistory | None = None,**kwargs,) -> SearchResult:"""Build local search context that fits a single context window and generate answer for the user query."""start_time = time.time()search_prompt = ""context_text, context_records = self.context_builder.build_context(query=query,conversation_history=conversation_history,**kwargs,**self.context_builder_params,)log.info("GENERATE ANSWER: %s. QUERY: %s", start_time, query)try:if self.response_type == "search_prompt":# 如果 response_type 是 "search_prompt",直接返回上下文内容return SearchResult(response="",context_data=context_records,context_text=context_text,completion_time=time.time() - start_time,llm_calls=0,prompt_tokens=0,)else:search_prompt = self.system_prompt.format(context_data=context_text,response_type=self.response_type)search_messages = [{"role": "system","content": search_prompt},{"role": "user","content": query},]if self.response_type == "search_prompt":return SearchResult(response=search_prompt,context_data=context_records,context_text=context_text,completion_time=time.time() - start_time,llm_calls=1,prompt_tokens=num_tokens(search_prompt,self.token_encoder),)response = await self.llm.agenerate(messages=search_messages,streaming=True,callbacks=self.callbacks,**self.llm_params,)return SearchResult(response=response,context_data=context_records,context_text=context_text,completion_time=time.time() - start_time,llm_calls=1,prompt_tokens=num_tokens(search_prompt, self.token_encoder),)
c. 添加LocalSearchAPI
localsearchapi.py
参考最新GraphRAG版本的api模块修改,作为一个平替,核心代码:
def __init__(self, data_dir: Union[str, None], root_dir: Union[str, None]):self.data_dir, self.root_dir, self.config = self._configure_paths_and_settings(data_dir, root_dir)self.description_embedding_store = self._get_embedding_description_store()self.agent = self.search_agent(community_level=2, response_type="search_prompt")def run_search(self, query: str):result = self.agent.search(query=query)if self.agent.response_type == "search_prompt":try:# 尝试直接打印print(result.context_text)print(type(result.context_text))except UnicodeEncodeError:# 如果直接打印失败,显式处理编码print(result.context_text.encode("utf-8", errors="replace").decode("gbk", errors="replace"))return result.context_textelse:print(result.response)return result.response
d. 封装为api
from fastapi import FastAPI, Query, HTTPException
from typing import Optional
import os
from pathlib import Path# 导入你的 LocalSearchEngine 类
from localsearchapi import LocalSearchEngine # 替换为你的模块路径app = FastAPI()# 初始化 LocalSearchEngine 实例
local_search_engine = LocalSearchEngine(data_dir=None, root_dir=os.path.dirname(__file__))@app.get("/search")
async def search(query: Optional[str] = Query(None, description="搜索查询")):if not query:raise HTTPException(status_code=400, detail="Query parameter is required")result = local_search_engine.run_search(query=query)return {"response": result}# 启动 FastAPI 应用
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
4、接入Dify
工作室-创建空白应用-选择工作流
在这里创建了四个节点,开始、结束、HTTP请求节点以及代码执行节点
(1)各节点设计
开始、结束节点就不介绍了,简单介绍中间两个节点
a. HTTP请求节点
此节点的作用主要是调用外部api获取graphrag查询的上下文。
b. 代码节点
主要是获取json信息,提取出response的内容
5、结果
运行
在Dify运行工作流,工作流会自动执行,最后生成结果如下:
至此,问题解决
目前是可以解决我的需求,那么这只是一个初步探索,目前发现传回来的上下文很长,可能会超出大模型的上下文窗口,预计后期会将上下文根据模块再拆分,交给多智能体去处理,优化结果。
本篇博客也是分享一个思路以及对应的实践,可以根据这个结果继续拓展,比如智能体接入工作流;后面自然会有更好的解决办法,我也希望和大家交流心得,共同进步,欢迎随时与我联系。