当前位置: 首页 > java >正文

dagster的etl实现

本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码

注意dagster版本

dagster, version 1.10.14

"""
实现dagster的etl实践案例。
"""from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List@op(out=DynamicOut(int))
def generate_tasks(context):"""生成动态任务,每个任务对应一个整数值"""context.log.info("开始生成动态任务...")for i in range(3):yield DynamicOutput(value=i, mapping_key=f"task_{i}")@op
def process_task(context, num: int) -> int:"""处理单个任务,将输入值乘以2"""context.log.info(f"处理任务 {num}")return num * 2@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:"""收集所有处理结果并返回列表"""context.log.info(f"收集到 {len(results)} 个处理结果")context.log.info(f"数据细节:{results}")return results@op(out = Out(int))
def summarize_results(context, results: list):"""汇总处理结果,计算总和"""total = sum(results)total = int(total)context.log.info(f"所有任务处理完成,总和为: {total}")yield AssetMaterialization(asset_key="final_result",description="所有任务处理结果的总和",metadata={"total": total})yield Output(total, output_name="result")@graph
def dynamic_pipeline():"""定义动态任务处理流程"""results = generate_tasks().map(process_task)collected = collect_results(results.collect())summarize_results(collected)# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():dynamic_pipeline()# 定义可执行实体
defs = Definitions(jobs=[run_dynamic_pipeline]
)# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":result = run_dynamic_pipeline.execute_in_process()print("执行结果:", result.success)
http://www.xdnf.cn/news/6979.html

相关文章:

  • 硬件工程师笔记——二极管Multisim电路仿真实验汇总
  • 一分钟用 MCP 上线一个 2048 小游戏(CodeBuddy版)
  • 84.评论日记
  • Level2.8蛇与海龟(游戏)
  • 在WSL中的Ubuntu发行版上安装Anaconda、CUDA、CUDNN和TensorRT
  • 校平机:金属板料处理的核心工艺装备​
  • 【软件测试】性能测试 —— 工具篇 LoadRunner 介绍与使用
  • 【HCIA】MUX VLAN
  • 【原创】基于视觉大模型gemma-3-4b实现短视频自动识别内容并生成解说文案
  • 从零开发 1688 数据接口:商品详情页实时采集 API 接入详解
  • facebook的Open Molecules 2025 (OMol25) 数据集、评估与模型开源速读
  • Mysql数据库之集群进阶
  • 从ThreadLocal到Scoped Values:Java高效数据共享机制的革命性演进
  • 代码随想录算法训练营第四十二四十三天
  • (保姆级)Win10 安装Oracle Developer Suite教程
  • OpenCV 特征检测全面解析与实战应用
  • C++学习:六个月从基础到就业——C++11/14:auto类型推导
  • 解读 TypeScript 枚举Enum
  • 深入理解 Java 字节码操作码
  • 数据存储与容灾:构建企业级数据安全的全栈解决方案
  • Springboot构建项目时lombok不生效
  • 【鸿蒙开发避坑】使用全局状态变量控制动画时,动画异常甚至动画方向与预期相反的原因分析以及解决方案
  • 新的节能技术和一体化解决方案,推动工厂智能升级和产业转型
  • BG开发者日志517:demo数据分析与修改方向
  • 【SpringBoot】关于MP使用中配置了数据库表前缀的问题
  • C++类与对象--2 对象的初始化和清理
  • 英汉 “语言” 初印象:符号背后的文化底色​
  • Java中调用外部命令:Runtime.exec() vs ProcessBuilder
  • 【基于栈的 Vue3 路由历史管理:优雅处理多系统间的导航】
  • 磁盘I/O子系统