当前位置: 首页 > news >正文

SmartETL函数式组件的设计与应用

SmartETL框架主要采用了面向对象的设计思想,将ETL过程中的处理逻辑抽象为LoaderProcessor(对应loader模块和iterator模块),所有流程组件需要继承或实现DataProvider(iter方法)或JsonIteratoron_data__process__方法)。

例如以下代码实现将论文结构中的摘要和正文拼接为一个字符串字段,方便后续对论文建立全文索引。

class ConcatPaperContent(JsonIterator):"""arxiv html页面数据处理类"""def on_data(self, data: Any, *args):paper = data['paper']content = ""if paper:abstract = paper.get('abstract')content += f"{abstract}\n"sections = paper.get('sections')for section in sections:content += f"{section['content']}\n"data['content'] = contentreturn data

然而,业务中很多处理逻辑比较简单,以往开发时用少数几行代码就可以搞定,而在SmartETL框架中,则必须实现一个类,正如上面的例子所示。虽然SmartETL支持加载外部包的组件(只要在sys.path中),但如果是需要定制开发则相对繁琐。

此前,在过滤组件(Filter)中考虑到这种情况,解决办法是在流程中定义Lambda表达式。例如以下流程定义中,filter节点通过Lambda表达式abnormal_time实现过滤publish_time字段值小于当前时间的记录的功能,即,对于经过filter节点的记录,仅当其publish_time字段值大于等于当前时间current时才会输出给后续节点。

nodes:current: util.dates.current_ts(True)abnormal_time: "=lambda t, current=current: t >= current "filter: Filter(abnormal_time, key='publish_time')

为了简化业务代码编写,SmartETL新增实现函数式组件,即以函数形式提供核心处理逻辑,而不需要封装成类。Lambda表达式就是一种特殊的函数。

跟C/C++、Java不同,Python语言中函数是一等公民,即开发者可以直接访问和操作函数,支持将函数作为一个对象进行加载、传递和管理,这对于开发一些高级功能,提高程序扩展性非常方便。

SmartETL函数式组件是指将任意编写的数据处理函数作为ETL流程组件,加入到流程处理中。唯一的限制是:除了作为Loader组件的函数外(框架无法提供输入),函数应该以流程数据作为输入参数,并将需要向后续流程传递的数据作为输出参数。以下表格说明了函数的参数与节点类型作用的对应关系:

节点类型是否支持输入是否要求有输出
Loader节点否(可通过配置提供)
Processor节点是(流程数据作为第一个参数)均可

为了使用函数对象,框架设计了函数式Loader组件Function如下:

class Function(DataProvider):"""函数调用包装器 提供调用函数的结果"""def __init__(self, function, *args, **kwargs):""":param function 函数对象或函数对象的完整限定名(如wikidata_filter.util.files.get_lines)"""assert function is not None, "function is None!"if isinstance(function, str):from wikidata_filter.util.mod_util import load_clsfunction = load_cls(function)[0]self.function = functionself.args = argsself.kwargs = kwargsdef iter(self):"""DataProvider的主要API,对提供函数进行调用"""# 注意,使用了组件构造参数res = self.function(*self.args, **self.kwargs)if isinstance(res, GeneratorType):for item in res:yield itemelse:yield res

类似的,框架实现了Function(JsonIterator)。常用的Map组件也支持提供函数对象或函数对象完整限定名。

基于函数式组件对本文开头的示例进行改写,代码如下:

def concat_paper_content(paper: dict):paper = paper or {}abstract = paper.get('abstract')content = f"{abstract}\n"sections = paper.get('sections')for section in sections:content += f"{section['content']}\n"return content

在yaml流程中进行引用,如下所示:

nodes:concat: Map('gestata.arxiv.concat_paper_content')

或者:

nodes:concat: Function('wikidata_filter.gestata.arxiv.concat_paper_content')

流程说明:通过yaml流程文件,将concat_content函数与Map进行绑定(假设该函数定义在wikidata_filter.gestata.arxiv模块中),实现对基于paper的处理,并将函数调用返回值作为content字段值。

注意,为了支持Function使用自定义组件(可能在任意sys.path可访问模块),需要提供完整的函数对象限定名,本示例中包括顶层模块wikidata_filter

那么,MapFunction有什么区别呢?主要区别是Map主要是为了支持wikidata_filter.gestatawikidata_filter.util模块中定义的函数,且支持指定要处理的字段(通过key参数)和目标字段(通过target_key参数)。

从示例中可以看出,使用函数式组件至少有几点好处:

  • 代码更简洁:只需要实现一个提供核心处理逻辑的函数即可。
  • 配置更加灵活:通过流程指定输入字段和输出字段,可以灵活适配不同业务数据。
  • 复用性更好:可以通过代码或yaml配置进行复用。

在此前arXiv论文数据处理应用流程中,大量采用了函数式组件。具体可查看https://github.com/ictchenbo/SmartETL/blob/main/wikidata_filter/gestata/arxiv.py了解详情。

http://www.xdnf.cn/news/510553.html

相关文章:

  • 【大模型面试每日一题】Day 22:若训练中发现Loss突然剧烈波动(Spike),可能有哪些原因?如何定位和修复?
  • nginx模块使用、过滤器模块以及handler模块
  • 自适应Prompt技术:让LLM精准理解用户意图的进阶策略
  • JMeter 教程:使用 HTTP 请求的参数列表发送 POST 请求(form 表单格式)
  • 贝塞尔曲线原理
  • Android studio Could not move temporary workspace
  • 使用AI 生成PPT 最佳实践方案对比
  • ChatGPT:OpenAI Codex—一款基于云的软件工程 AI 代理,赋能 ChatGPT,革新软件开发模式
  • window自带截图快捷键
  • C++学习:六个月从基础到就业——C++20:范围(Ranges)基础
  • 【OpenCV基础 1】几何变换、形态学处理、阈值分割、区域提取和脱敏处理
  • MLLM常见概念通俗解析(一)
  • 【基于Spring Boot 的图书购买系统】深度讲解 用户注册的前后端交互,Mapper操作MySQL数据库进行用户持久化
  • 如何利用内网穿透实现Cursor对私有化部署大模型的跨网络访问实践
  • 【图像生成大模型】CogVideoX-5b:开启文本到视频生成的新纪元
  • lvs-dr部署
  • c++学习之--- list
  • C语言链表的操作
  • 数字人技术的核心:AI与动作捕捉的双引擎驱动(210)
  • defer关键字:延迟调用机制-《Go语言实战指南》
  • 8.1UDP点对点聊天小项目
  • 软件架构之--论微服务的开发方法1
  • 软件工程各种图总结
  • 数据库MySQL基础2
  • 【回溯 剪支 状态压缩】# P10419 [蓝桥杯 2023 国 A] 01 游戏|普及+
  • Java大厂面试:从Web框架到微服务技术的场景化提问与解析
  • FAST-DDS源码分析PDP(一)
  • NoSQL实战指南:MongoDB与Redis企业级开发实战
  • Vue 3 动态 ref 的使用方式(表格)
  • 【Linux高级全栈开发】2.1.3 http服务器的实现