dagster的etl实现
本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码
注意dagster版本
dagster, version 1.10.14
"""
实现dagster的etl实践案例。
"""from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List@op(out=DynamicOut(int))
def generate_tasks(context):"""生成动态任务,每个任务对应一个整数值"""context.log.info("开始生成动态任务...")for i in range(3):yield DynamicOutput(value=i, mapping_key=f"task_{i}")@op
def process_task(context, num: int) -> int:"""处理单个任务,将输入值乘以2"""context.log.info(f"处理任务 {num}")return num * 2@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:"""收集所有处理结果并返回列表"""context.log.info(f"收集到 {len(results)} 个处理结果")context.log.info(f"数据细节:{results}")return results@op(out = Out(int))
def summarize_results(context, results: list):"""汇总处理结果,计算总和"""total = sum(results)total = int(total)context.log.info(f"所有任务处理完成,总和为: {total}")yield AssetMaterialization(asset_key="final_result",description="所有任务处理结果的总和",metadata={"total": total})yield Output(total, output_name="result")@graph
def dynamic_pipeline():"""定义动态任务处理流程"""results = generate_tasks().map(process_task)collected = collect_results(results.collect())summarize_results(collected)# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():dynamic_pipeline()# 定义可执行实体
defs = Definitions(jobs=[run_dynamic_pipeline]
)# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":result = run_dynamic_pipeline.execute_in_process()print("执行结果:", result.success)