使用Dagster定义数据资产:从入门到实践
本文将介绍如何使用Dagster框架通过Python函数定义数据资产,涵盖单资产、多资产、复杂流程等场景,并解析关键概念如依赖关系、上下文管理和版本控制,帮助您高效构建数据管道。
一、Dagster资产定义基础
在Dagster中,数据资产通过装饰器标记Python函数实现。核心组件包括:
- AssetKey:资产的唯一标识符
- 依赖关系:上游资产列表
- 计算逻辑:包含业务处理的Python函数
示例:单资产定义
from dagster import asset@asset
def daily_sales():# 计算每日销售数据的逻辑pass@asset(deps=[daily_sales], group_name="sales")
def weekly_sales():# 基于每日销售聚合周数据的逻辑pass
二、多资产场景处理
当需要一次操作生成多个资产时,使用@multi_asset
:
from dagster import multi_asset, AssetSpec@multi_asset(specs=[AssetSpec("asset_one"),AssetSpec("asset_two")]
)
def my_multi_asset():yield MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10})yield MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})
适用场景:
- 单次API调用返回多张表
- 同一内存对象生成多个结果
三、复杂流程抽象
对于需要多步骤处理的资产,使用@graph_asset
:
from dagster import op, graph_asset, RetryPolicy@op(retry_policy=RetryPolicy(max_retries=5))
def step_one() -> int:# 可能失败的步骤return 42@op
def step_two(num: int):return num ** 2@graph_asset
def complex_asset():return step_two(step_one())
优势:
- 隐藏中间过程
- 统一错误处理
- 简化依赖管理
四、高级功能实践
-
上下文管理
通过AssetExecutionContext
访问运行时信息:@asset def logged_asset(context: AssetExecutionContext):context.log.info(f"Running in {context.run.run_id}")
-
版本控制
标记代码变更避免重复计算:@asset(code_version="1.2.0") def versioned_asset():# 处理逻辑pass
-
多级键命名
使用key_prefix
组织层级结构:@asset(key_prefix=["region", "us-west"]) def regional_data():pass
@asset和
@dg.asset 区别
在 Dagster 中,@asset
和 @dg.asset
都是用于定义资产的装饰器(decorator),但它们的来源和用法略有不同:
1. @asset
-
来源:直接来自
dagster
模块(from dagster import asset
)。 -
用途:用于定义一个 Dagster 资产(Asset),表示一个可计算的数据集或计算结果。
-
示例:
from dagster import asset@asset def my_asset():return [1, 2, 3]
-
特点:
- 是 Dagster 的核心 API,适用于大多数场景。
- 可以单独使用,不需要额外导入
dagster
的子模块。
2. @dg.asset
-
来源:通常来自
dagster._core.decorator
或某些内部/实验性模块(dg
可能是dagster
的别名或内部命名空间)。 -
用途:在某些情况下,Dagster 的内部实现可能会使用
@dg.asset
来定义资产,但 官方推荐使用@asset
。 -
示例:
import dagster as dg@dg.asset def my_asset():return [1, 2, 3]
-
特点:
- 不是官方推荐的方式,可能是内部实现或旧版 API。
- 在较新的 Dagster 版本中,
@dg.asset
可能已被弃用或移除。 - 如果你看到
@dg.asset
,建议检查是否使用了正确的导入方式(from dagster import asset
)。
主要区别
特性 | @asset | @dg.asset |
---|---|---|
来源 | dagster 官方 API | 可能是内部/实验性 API |
推荐使用 | ✅ 推荐 | ❌ 不推荐 |
稳定性 | 稳定 | 可能不稳定或已弃用 |
常见场景 | 标准资产定义 | 内部实现或旧代码 |
- 始终使用
@asset
(从dagster
导入),这是 Dagster 官方推荐的方式。 - 如果你看到
@dg.asset
,可能是旧代码或内部实现,建议迁移到@asset
。
如果你在某个教程或代码库中看到 @dg.asset
,可以检查:
- 是否使用了正确的导入方式(
from dagster import asset
)。 - 是否使用了较旧的 Dagster 版本(建议升级到最新版)。
- 是否是自定义封装(某些团队可能会重命名装饰器)。
最后总结
Dagster通过装饰器模式提供了灵活的数据资产定义方式:
- 简单场景:
@asset
快速定义单资产 - 批量处理:
@multi_asset
高效管理多结果 - 复杂流程:
@graph_asset
封装多步骤操作
掌握这些工具后,您可以:
- 清晰表达数据依赖关系
- 实现可复用的数据处理逻辑
- 获得完整的执行监控能力
建议从简单资产开始,逐步引入多资产和上下文功能,最终构建可维护的数据管道系统。