当前位置: 首页 > news >正文

Dagster中的Ops与Assets:数据管道构建的两种选择

Dagster是一个强大的数据编排平台,它提供了多种工具来帮助数据工程师构建可靠的数据管道。在Dagster中,Ops和Assets是两种核心概念,用于定义数据处理逻辑。本文将全面介绍Ops的概念、特性及其使用方法,特别补充了Op上下文和Op工厂等重要内容,并解释为什么对于新用户我们推荐优先使用Assets。

在这里插入图片描述

什么是Ops?

Ops是Dagster中的基本计算单元,代表一个独立的数据处理任务。每个Op应该执行相对简单的任务,例如:

  • 从其他数据集派生新数据集
  • 执行数据库查询
  • 在远程集群中启动Spark作业
  • 查询API并将结果存储到数据仓库
  • 发送电子邮件或Slack消息

Ops的核心特性

1. 灵活的执行策略

Ops是独立于执行策略的逻辑单元,这使得它们可以在开发和生产环境之间无缝转换。Ops可以组合成图(graphs),并通过jobs绑定到适当的执行器上,实现单机执行或在集群中分布式执行。

2. 可插拔的外部系统集成

对于需要与外部系统交互的数据管道,Dagster提供了资源(resources)抽象层。你可以针对抽象资源(如数据库)编写Op逻辑,然后在job级别绑定具体的资源定义。这样,开发阶段可以使用本地替代方案,而生产环境则使用云服务。

3. 输入和输出管理

Ops具有明确的输入和输出,类似于Python函数的参数和返回值。这些输入输出可以附加Dagster类型进行运行时验证,并可以通过IO Manager管理数据存储,实现不同执行环境间的I/O策略切换和中间数据的高效缓存。

4. 配置能力

数据管道中的操作通常需要参数化配置。Ops允许通过配置模式(config schema)定义这些参数,使Ops更加灵活和可重用。例如,可以通过配置指定API端点:

from dagster import Configclass MyOpConfig(Config):api_endpoint: str@op
def my_configurable_op(config: MyOpConfig):data = requests.get(f"{config.api_endpoint}/data").json()return data

5. 事件流

Ops在执行过程中会发出一系列事件,包括默认事件(如开始执行)和通过事件API报告的自定义事件(如数据资产创建、数据质量检查结果等)。这些事件流可以在Dagster UI中可视化,便于调试、检查和实时监控。

6. 可测试性

Ops的设计使其易于测试,可以单独测试或在管道中测试。资源API还允许在需要时替换外部系统(如数据库)的存根(stub)。

定义和使用Ops

使用@op装饰器定义Ops:

@op
def my_op():return "hello"

输入和输出

Ops通过参数接收输入,通过返回值产生输出:

@op
def add(a: int, b: int) -> int:return a + b

对于多输出,可以使用Out对象:

@op(out={"sum": Out(), "product": Out()})
def math_ops(a: int, b: int):yield Output(a + b, "sum")yield Output(a * b, "product")

配置

为Ops添加配置:

from dagster import Config, opclass GreetingConfig(Config):name: str@op(config_schema=GreetingConfig)
def greet(context, config: GreetingConfig):context.log.info(f"Hello, {config.name}!")return f"Hello, {config.name}!"

Op上下文(Op Context)

在编写Op时,用户可以可选地提供一个上下文参数(通常命名为context)。当这个参数被提供时,Dagster会在Op执行时自动注入一个上下文对象,该对象提供了访问系统信息的能力,如日志记录器、当前运行ID等。

上下文对象的作用

  1. 日志记录:通过context.log记录不同级别的日志信息
  2. 访问运行信息:获取当前运行的ID、作业名称等信息
  3. 资源访问:在某些情况下可以访问配置的资源
  4. 错误处理:提供更丰富的错误报告能力

使用示例

from dagster import op, OpExecutionContext@op
def context_op(context: OpExecutionContext):# 记录info级别日志context.log.info(f"My run ID is {context.run_id}")# 记录debug级别日志(默认可能不显示)context.log.debug("This is a debug message")# 在实际业务逻辑中使用上下文try:result = do_something()return resultexcept Exception as e:context.log.error(f"Operation failed: {str(e)}")raise

Op工厂模式

在实际项目中,我们经常需要创建多个相似的Ops,或者需要动态生成Ops。这时,Op工厂模式就非常有用。Op工厂允许我们通过函数来生成Ops,而不是为每个Op手动编写装饰器。

工厂模式的应用场景

  1. 参数化Op创建:当需要创建多个相似但配置不同的Ops时
  2. 动态Op生成:根据运行时条件或配置动态生成Ops
  3. 代码复用:避免重复的Op定义代码

创建Op工厂

from dagster import op, OpDefinitiondef create_math_op(name: str, operation):"""创建数学运算Op的工厂函数Args:name (str): 新Op的名称operation (callable): 数学运算函数Returns:OpDefinition: 生成的Op定义"""@op(name=name)def math_op(a: float, b: float) -> float:return operation(a, b)return math_op# 使用工厂创建具体的Ops
add_op = create_math_op("add", lambda a, b: a + b)
multiply_op = create_math_op("multiply", lambda a, b: a * b)# 或者更复杂的工厂函数
def advanced_op_factory(config_schema=None, tags=None):"""更高级的Op工厂,支持配置和标签Args:config_schema: Op的配置模式tags: 要附加到Op的标签Returns:函数:接受compute函数并返回OpDefinition"""def decorator(compute_fn):op_def = op(name=compute_fn.__name__,config_schema=config_schema,tags=tags)(compute_fn)return op_defreturn decorator# 使用高级工厂
@advanced_op_factory(config_schema={"precision": int},tags={"team": "analytics"}
)
def divide(a: float, b: float, context) -> float:precision = context.op_config["precision"]result = a / breturn round(result, precision)

工厂模式的注意事项

  1. 性能考虑:工厂模式会引入额外的函数调用层,但在大多数情况下影响可以忽略
  2. 类型提示:使用工厂创建的Ops可能需要额外的类型提示处理
  3. 文档:确保为工厂函数和生成的Ops提供清晰的文档

为什么推荐Assets而非Ops?

虽然Ops功能强大,但对于新用户我们推荐优先使用Assets:

  1. 更高级的抽象:Assets提供了数据版本控制、血缘追踪和自动缓存等高级功能
  2. 声明式API:Assets允许以更声明式的方式定义数据管道
  3. 内置集成:Assets与Dagster的其他功能(如调度、监控)有更好的集成
  4. 简化复杂性:对于大多数用例,Assets可以简化数据管道的定义和维护

结论

Ops是Dagster中强大的计算单元,适合处理复杂的数据处理逻辑。然而,对于新用户或构建标准数据管道的场景,Assets提供了更高级的抽象和更简化的开发体验。随着对Dagster的深入理解,用户可以根据需要选择使用Ops来处理更复杂的场景。

无论选择哪种方式,Dagster都提供了丰富的工具和灵活性来构建可靠、可维护的数据管道。特别是Op上下文和Op工厂模式等高级特性,为复杂的数据工程需求提供了强大的支持。

http://www.xdnf.cn/news/272089.html

相关文章:

  • 主自开发光枪鼠标模拟器实战,使用micro pro板子方式
  • P1537 数字反转(升级版)详解
  • 【C++语法】类和对象(3)
  • 蟋蟀的叫声,大自然的温度计
  • PyTorch学习之张量(Tensor)(一)
  • 【Mytais系列】Datasource模块:数据源连接
  • MCP 探索:browser tools MCP + Cursor 可以实现哪些能力
  • VBA 64位API声明语句第009讲
  • 2025深圳杯(东三省)数学建模竞赛D题完整分析论文(共36页)(含模型、可运行代码、数据结果)
  • (超2万字数详解)C++学习之类与对象
  • SwiftUI-MLX本地大模型开发(二)
  • 射频指标互调与交调简略
  • ubuntu使用apt安装软件
  • python中的yield关键字用法
  • 数据赋能(209)——质量管理——时效性原则
  • 文献分享:抗体治疗癌症综述
  • EMMC存储性能测试方法
  • FramePack部署(从PyCharm解释器创建和使用开始)保姆级教程
  • 基于SpringBoot的篮球竞赛预约平台设计与实现
  • 数据赋能(210)——质量管理——可靠性原则
  • FastAPI系列13:API的安全防护
  • 经典算法 最小生成树(prim算法)
  • QT6 源(70):阅读与注释按钮类 QPushButton,及各种属性验证,
  • 返回倒数第k个节点题解
  • 学习黑客分析案例
  • 山东大学计算机组成与设计第九、十章习题解析
  • 延时启动windows中程序
  • C++Primerplus编程练习 第五章
  • 继V1.5之后,幻方又发布了 DeepSeek-Prover-V2-671B,参数提升100倍
  • 【AI平台】n8n入门6:调用MCP服务(非社区节点)