当前位置: 首页 > java >正文

使用Dagster定义数据资产:从入门到实践

本文将介绍如何使用Dagster框架通过Python函数定义数据资产,涵盖单资产、多资产、复杂流程等场景,并解析关键概念如依赖关系、上下文管理和版本控制,帮助您高效构建数据管道。

一、Dagster资产定义基础

在Dagster中,数据资产通过装饰器标记Python函数实现。核心组件包括:

  • AssetKey:资产的唯一标识符
  • 依赖关系:上游资产列表
  • 计算逻辑:包含业务处理的Python函数
    在这里插入图片描述
示例:单资产定义
from dagster import asset@asset
def daily_sales():# 计算每日销售数据的逻辑pass@asset(deps=[daily_sales], group_name="sales")
def weekly_sales():# 基于每日销售聚合周数据的逻辑pass

二、多资产场景处理

当需要一次操作生成多个资产时,使用@multi_asset

from dagster import multi_asset, AssetSpec@multi_asset(specs=[AssetSpec("asset_one"),AssetSpec("asset_two")]
)
def my_multi_asset():yield MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10})yield MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})

适用场景

  • 单次API调用返回多张表
  • 同一内存对象生成多个结果

三、复杂流程抽象

对于需要多步骤处理的资产,使用@graph_asset

from dagster import op, graph_asset, RetryPolicy@op(retry_policy=RetryPolicy(max_retries=5))
def step_one() -> int:# 可能失败的步骤return 42@op
def step_two(num: int):return num ** 2@graph_asset
def complex_asset():return step_two(step_one())

优势

  • 隐藏中间过程
  • 统一错误处理
  • 简化依赖管理

四、高级功能实践

  1. 上下文管理
    通过AssetExecutionContext访问运行时信息:

    @asset
    def logged_asset(context: AssetExecutionContext):context.log.info(f"Running in {context.run.run_id}")
    
  2. 版本控制
    标记代码变更避免重复计算:

    @asset(code_version="1.2.0")
    def versioned_asset():# 处理逻辑pass
    
  3. 多级键命名
    使用key_prefix组织层级结构:

    @asset(key_prefix=["region", "us-west"])
    def regional_data():pass
    

@asset@dg.asset 区别

Dagster 中,@asset@dg.asset 都是用于定义资产的装饰器(decorator),但它们的来源和用法略有不同:

1. @asset

  • 来源:直接来自 dagster 模块(from dagster import asset)。

  • 用途:用于定义一个 Dagster 资产(Asset),表示一个可计算的数据集或计算结果。

  • 示例:

    from dagster import asset@asset
    def my_asset():return [1, 2, 3]
    
  • 特点:

    • 是 Dagster 的核心 API,适用于大多数场景。
    • 可以单独使用,不需要额外导入 dagster 的子模块。

2. @dg.asset

  • 来源:通常来自 dagster._core.decorator 或某些内部/实验性模块(dg 可能是 dagster 的别名或内部命名空间)。

  • 用途:在某些情况下,Dagster 的内部实现可能会使用 @dg.asset 来定义资产,但 官方推荐使用 @asset

  • 示例:

    import dagster as dg@dg.asset
    def my_asset():return [1, 2, 3]
    
  • 特点:

    • 不是官方推荐的方式,可能是内部实现或旧版 API。
    • 在较新的 Dagster 版本中,@dg.asset 可能已被弃用或移除。
    • 如果你看到 @dg.asset,建议检查是否使用了正确的导入方式(from dagster import asset)。

主要区别

特性@asset@dg.asset
来源dagster 官方 API可能是内部/实验性 API
推荐使用✅ 推荐❌ 不推荐
稳定性稳定可能不稳定或已弃用
常见场景标准资产定义内部实现或旧代码
  • 始终使用 @asset(从 dagster 导入),这是 Dagster 官方推荐的方式。
  • 如果你看到 @dg.asset,可能是旧代码或内部实现,建议迁移到 @asset

如果你在某个教程或代码库中看到 @dg.asset,可以检查:

  1. 是否使用了正确的导入方式(from dagster import asset)。
  2. 是否使用了较旧的 Dagster 版本(建议升级到最新版)。
  3. 是否是自定义封装(某些团队可能会重命名装饰器)。

最后总结

Dagster通过装饰器模式提供了灵活的数据资产定义方式:

  • 简单场景@asset快速定义单资产
  • 批量处理@multi_asset高效管理多结果
  • 复杂流程@graph_asset封装多步骤操作

掌握这些工具后,您可以:

  1. 清晰表达数据依赖关系
  2. 实现可复用的数据处理逻辑
  3. 获得完整的执行监控能力

建议从简单资产开始,逐步引入多资产和上下文功能,最终构建可维护的数据管道系统。

http://www.xdnf.cn/news/3231.html

相关文章:

  • Unity编辑器扩展之导出项目中所有预制体中文本组件文字内容
  • 提示词工程(GOT)把思维链推理过程图结构化
  • 移动端akamai风控分析
  • 【阿里云大模型高级工程师ACP习题集】2.7 通过微调增强模型能力 (下篇)(⭐️⭐️⭐️ 重点章节!!!)
  • 【LLM】基于 Ollama 部署 DeepSeek-R1 本地大模型
  • 2025 Java八股文深度解读版:原理+场景+高频追问答案
  • 【Unity】如何解决UI中的Button无法绑定带参数方法的问题
  • 【网工第6版】第6章 网络安全②
  • JESD204B 探究
  • VS Code技巧2:识别FreeCAD对象
  • Spring的源码Spring的上下文怎么存储
  • Electron Forge【实战】自定义菜单 -- 顶部菜单 vs 右键快捷菜单
  • 百度网盘golang实习面经
  • HTML from表单中只有一个input时,按回车键后表单自动提交(form表单的一个小坑)
  • 【C++】频繁分配和释放会产生内存碎片
  • Win下的Kafka安装配置
  • Tauri v1 与 v2 配置对比
  • 全面解析SimHash算法:原理、对比与Spring Boot实践指南
  • transformer-实现解码器Decoder
  • DIT(Diffusion In Transformer)学习笔记
  • Java继承中super的使用方法
  • SI5338-EVB Usage Guide(LVPECL、LVDS、HCSL、CMOS、SSTL、HSTL)
  • 电子病历高质量语料库构建方法与架构项目(智能数据目录篇)
  • SD - WAN 跨境网络专线部署方式介绍
  • 大数据在远程医疗中的创新应用:如何重塑医疗行业的未来
  • python + segno 生成个人二维码
  • 全球气象站点年平均降水数据(1929-2024)
  • 大连理工大学选修课——机器学习笔记(4):NBM的原理及应用
  • 大连理工大学选修课——机器学习笔记(9):线性判别式与逻辑回归
  • 使用 ossutil 上传文件到阿里云 OSS