当前位置: 首页 > news >正文

使用 NV‑Ingest、Unstructured 和 Elasticsearch 处理非结构化数据

作者:来自 Elastic Ajay Krishnan Gopalan

了解如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 为 RAG 应用构建可扩展的非结构化文档数据管道。

Elasticsearch 原生集成了行业领先的生成式 AI 工具和提供商。查看我们的网络研讨会,了解如何超越 RAG 基础,或使用 Elastic 向量数据库构建可投入生产的应用。

为了为你的用例构建最佳搜索解决方案,现在就开始免费云试用,或在本地机器上试用 Elastic。


在这篇博客中,我们将讨论如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 实现一个可扩展的数据处理流水线。该流水线将来自数据源的非结构化数据转换为结构化、可搜索的内容,为下游的 AI 应用(如 RAG)做好准备。检索增强生成(RAG)是一种 AI 技术,它为大语言模型(LLMs)提供外部知识,以生成对用户查询的响应。这使得 LLM 的回答能够根据特定上下文进行定制,从而使答案更准确、更相关。

在开始之前,让我们先了解一下实现该流水线的关键组件,以及它们各自的作用。

流水线组件

NV-Ingest 是一组微服务,用于将非结构化文档转换为结构化内容和元数据。它可以大规模处理文档解析、视觉结构识别和 OCR 处理。

Unstructured 是一个 ETL+ 平台,用于协调整个非结构化数据处理流程:从从多个数据源中摄取非结构化数据、通过可配置的工作流引擎将原始非结构化文件转换为结构化数据、使用附加转换丰富数据,一直到将结果上传到向量存储、数据库和搜索引擎。它提供了可视化 UI、API 和可扩展的后端基础设施,在一个工作流中协调文档解析、数据丰富和嵌入处理。

Elasticsearch 是业界领先的搜索和分析引擎,现在具备原生的向量搜索能力。它既可以作为传统的文本数据库,也可以作为向量数据库,支持像 k-NN 相似度搜索这样的功能,实现大规模语义搜索。

现在我们已经介绍了核心组件,接下来让我们看看它们在典型工作流程中是如何协同工作的,然后再深入了解具体实现。

使用 NV-Ingest - Unstructured - Elasticsearch 实现 RAG

虽然这里我们只提供关键要点,你可以在此处查看完整的 notebook。

本博客分为三个部分:

  1. 设置源和目标连接器

  2. 使用 Unstructured API 设置工作流

  3. 基于处理后的数据进行 RAG

Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向无环图)的形式表示,节点称为连接器,用于控制数据的摄取来源以及处理结果的上传目标。这些节点在任何工作流中都是必需的。源连接器配置原始数据从数据源的摄取,目标连接器配置处理后数据上传到向量存储、搜索引擎或数据库。

在本博客中,我们将研究论文存储在 Amazon S3 中,并希望将处理后的数据传送到 Elasticsearch 用于下游用途。这意味着,在构建数据处理工作流之前,我们需要通过 Unstructured API 创建一个 Amazon S3 的源连接器和一个 Elasticsearch 的目标连接器。

步骤 1:设置 S3 源连接器

在创建源连接器时,你需要为其指定一个唯一名称,明确其类型(例如 S3 或 Google Drive),并提供配置,通常包括你要连接的数据源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夹)以及身份验证信息。

source_connector_response = unstructured_client.sources.create_source(request=CreateSourceRequest(create_source_connector=CreateSourceConnector(name="demo_source1",type=SourceConnectorType.S3,config=S3SourceConnectorConfigInput(key=os.environ['S3_AWS_KEY'],secret=os.environ['S3_AWS_SECRET'],remote_url=os.environ["S3_REMOTE_URL"],recursive=False #True/False)))
)pretty_print_model(source_connector_response.source_connector_information)

步骤 2:设置 Elasticsearch 目标连接器

接下来,我们来设置 Elasticsearch 目标连接器。你使用的 Elasticsearch 索引必须具有与 Unstructured 为你生成的文档架构兼容的架构 —— 你可以在文档中找到所有详细信息。

destination_connector_response = unstructured_client.destinations.create_destination(request=CreateDestinationRequest(create_destination_connector=CreateDestinationConnector(name="demo_dest-3",type=DestinationConnectorType.ELASTICSEARCH,config=ElasticsearchConnectorConfigInput(hosts=[os.environ['es_host']],es_api_key=os.environ['es_api_key'],index_name="demo-index")))
)

步骤 3:使用 Unstructured 创建工作流

一旦你拥有了源连接器和目标连接器,就可以创建一个新的数据处理工作流。我们将通过以下节点构建工作流 DAG:

  • NV-Ingest 用于文档分区

  • Unstructured 的 Image Summarizer、Table Summarizer 和 Named Entity Recognition 节点用于内容丰富

  • Chunker 和 Embedder 节点用于使内容准备好进行相似性搜索

from unstructured_client.models.shared import (WorkflowNode,WorkflowNodeType,WorkflowType,Schedule
)# Partition the content by using NV-Ingest
parition_node = WorkflowNode(name="Ingest",subtype="nvingest",type="partition",settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},)# Summarize each detected image.
image_summarizer_node = WorkflowNode(name="Image summarizer",subtype="openai_image_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Summarize each detected table.
table_summarizer_node = WorkflowNode(name="Table summarizer",subtype="anthropic_table_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(name="Named entity recognizer",subtype="openai_ner",type=WorkflowNodeType.PROMPTER,settings={"prompt_interface_overrides": None}
)# Chunk the partitioned content.
chunk_node = WorkflowNode(name="Chunker",subtype="chunk_by_title",type=WorkflowNodeType.CHUNK,settings={"unstructured_api_url": None,"unstructured_api_key": None,"multipage_sections": False,"combine_text_under_n_chars": 0,"include_orig_elements": True,"max_characters": 1537,"overlap": 160,"overlap_all": False,"contextual_chunking_strategy": None}
)# Generate vector embeddings.
embed_node = WorkflowNode(name="Embedder",subtype="azure_openai",type=WorkflowNodeType.EMBED,settings={"model_name": "text-embedding-3-large"}
)response = unstructured_client.workflows.create_workflow(request={"create_workflow": {"name": f"s3-to-es-NV-Ingest-custom-workflow","source_id": source_connector_response.source_connector_information.id,"destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e","workflow_type": WorkflowType.CUSTOM,"workflow_nodes": [parition_node,image_summarizer_node,table_summarizer_node,named_entity_recognizer_node,chunk_node,embed_node],}}
)workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)job = unstructured_client.workflows.run_workflow(request={"workflow_id": workflow_id,}
)pretty_print_model(job.job_information)

一旦这个工作流的任务完成,数据将被上传到 Elasticsearch,我们就可以继续构建一个基础的 RAG 应用程序。

步骤 4:RAG 设置

让我们继续设置一个简单的检索器,它将连接到数据,接收用户查询,使用与原始数据嵌入相同的模型对其进行嵌入,并计算余弦相似度以检索前 3 个文档。

from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import osembeddings = OpenAIEmbeddings(model="text-embedding-3-large",openai_api_key=os.environ['OPENAI_API_KEY'])vector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",distance_strategy="COSINE"
)retriever = vector_store.as_retriever(search_type="similarity",search_kwargs={"k": 3}  # Number of results to return
)

然后,让我们设置一个工作流来接收用户查询,从 Elasticsearch 中获取相似文档,并使用这些文档作为上下文来回答用户的问题。

from openai import OpenAIclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))def generate_answer(question: str, documents: str):prompt = """You are an assistant that can answer user questions given provided context.Your answer should be thorough and technical.If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'"""augmented_prompt = (f"{prompt}"f"User question: {question}\n\n"f"{documents}")response = client.chat.completions.create(messages=[{'role': 'system', 'content': 'You answer users questions.'},{'role': 'user', 'content': augmented_prompt},],model="gpt-4o-2024-11-20",temperature=0,)return response.choices[0].message.contentdef format_docs(docs):seen_texts = set()useful_content = [doc.page_content for doc in docs]return  "\nRetrieved documents:\n" + "".join([f"\n\n===== Document {str(i)} =====\n" + docfor i, doc in enumerate(useful_content)])
def rag(query):docs = retriever.invoke(query)documents = format_docs(docs)answer = generate_answer(query, documents)return documents, answer

将所有内容组合在一起,我们得到:

query = "How did the response lengths change with training?"docs, answer = rag(query)print(answer)

和一个响应:

Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.

Elasticsearch 提供了多种增强搜索的策略,包括混合搜索,这是近似语义搜索和基于关键字的搜索的结合。

这种方法可以提高作为上下文使用的 RAG 架构中的 top 文档的相关性。要启用此功能,您需要按照以下方式修改 vector_store 初始化:

from langchain_elasticsearch import DenseVectorStrategyvector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)

结论

良好的 RAG 从准备充分的数据开始,而 Unstructured 简化了这一关键的第一步。通过 NV-Ingest 启用文档分区、对非结构化数据进行元数据丰富并高效地将其摄取到 Elasticsearch,它确保了您的 RAG 管道建立在坚实的基础上,为所有下游任务释放其全部潜力。

原文:Unstructured data processing with NV‑Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs

http://www.xdnf.cn/news/353683.html

相关文章:

  • 利用GPT实现油猴脚本—网页滚动(优化版)
  • 豆包:基于多模态交互的智能心理咨询机器人系统设计与效果评估——情感计算框架下的对话机制创新
  • Spark,在shell中运行RDD程序
  • 【SQL系列】多表关联更新
  • 手持气象仪:能够实时测量多种气象参数,保数据采集的准确性与实时性
  • 掌握Multi-Agent实践(三):ReAct Agent集成Bing和Google搜索功能,采用推理与执行交替策略,增强处理复杂任务能力
  • Spring Boot 框架概述
  • 【计算机视觉】Car-Plate-Detection-OpenCV-TesseractOCR:车牌检测与识别
  • 【css】css统一设置变量
  • 更新 / 安装 Nvidia Driver 驱动 - Ubuntu - 2
  • 数据类型详解(布尔值、整型、浮点型、字符串等)-《Go语言实战指南》
  • istio in action之Gateway流量入口与安全
  • 分析NVIDIA的股价和业绩暴涨的原因
  • Zabbix监控 RabbitMQ 指定消息队列名称(pull_alarms )的消费者
  • 富乐德传感技术盘古信息 | 锚定“未来工厂”新坐标,开启传感器制造行业数字化转型新征程
  • IC解析之TPS92682-Q1(汽车LED灯控制IC)
  • 【C/C++】C语⾔内存函数
  • [Errno 122] Disk quota exceeded
  • Linux59 SSH配置前瞻 JumpServer双网卡ping通
  • 金仓数据库永久增量备份技术原理与操作
  • 电商平台如何做好DDoS 攻防战?
  • 物流基础知识-术语 | 医药物流(1)
  • OpenHarmony平台驱动开发(十),MMC
  • k8s监控方案实践(二):集成Alertmanager告警与钉钉Webhook通知
  • C23 与 MISRA C:2025:嵌入式 C 语言的进化之路
  • 4.3【LLaMA-Factory实战】教育大模型:个性化学习路径生成系统全解析
  • 微服务中 本地启动 springboot 无法找到nacos配置 启动报错
  • 第十六章,网络型攻击防范技术
  • Python 常用内置函数详解(十):help()函数——查看对象的帮助信息
  • 【论文阅读27】-TCN–BiLSTM -滑坡预测