当前位置: 首页 > ds >正文

Dagster Pipes系列-1:调用外部Python脚本

本文是"Dagster Pipes教程"的第一部分,介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先,创建Dagster资产subprocess_asset,利用PipesSubprocessClient资源执行外部脚本external_code.py,实现跨进程的数据处理。通过dagster dev启动UI,可在Dagster界面中监控子进程的执行状态和日志输出,包括标准输出(stdout)内容。本文详细讲解了资产定义、资源注入及命令执行的完整流程,为后续修改外部代码以支持Dagster Pipes通信奠定基础。此方法适用于需要将现有脚本集成到Dagster数据管道的场景,提升自动化与可观测性。完成本部分后,读者可继续学习第二部分,掌握如何增强外部脚本与Dagster的交互能力。

教程概述

本教程将指导你完成以下步骤:

  1. 创建一个调用外部Python脚本的Dagster资产
  2. 定义必要的Dagster资源(resources)
  3. 在Dagster UI中运行并查看结果
    在这里插入图片描述

前提条件

在开始之前,请确保你已经:

  • 安装了Dagster
  • 创建了一个名为external_code.py的独立Python脚本,内容如下:
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2],"item_id": [432, 878]})total_orders = len(orders_df)print(f"processing total {total_orders} orders")

第一步:定义Dagster资产

首先,在与external_code.py相同的目录下创建一个名为dagster_code.py的新文件。

1.1 创建资产定义

将以下代码复制到dagster_code.py中:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()

代码解析:

  • 我们创建了一个名为subprocess_asset的资产
  • 使用AssetExecutionContext作为上下文参数,它提供了系统信息如资源、配置和日志记录
  • 指定了PipesSubprocessClient资源
  • 构建了一个命令列表来执行外部脚本
  • 使用pipes_subprocess_client.run()方法在管道会话中同步执行子进程

1.2 从资产调用外部代码

上述代码中的关键部分是:

pipes_subprocess_client.run(command=cmd,context=context
).get_materialize_result()

这段代码做了什么:

  • PipesSubprocessClient资源暴露了一个run方法
  • 当资产执行时,这个方法会在管道会话中同步执行子进程
  • 返回一个PipesClientCompletedInvocation对象
  • 可以使用get_materialize_result()方法访问子进程报告的MaterializeResult事件

第二步:定义Definitions对象

为了让Dagster工具(如CLI、UI和Dagster+)能够加载和访问资产及子进程资源,我们需要创建一个Definitions对象。

dagster_code.py文件末尾添加以下代码:

from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

此时,dagster_code.py文件应该如下所示:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

第三步:从Dagster UI运行子进程

现在,让我们在Dagster UI中执行我们创建的子进程资产。

  1. 在新的命令行会话中运行以下命令启动UI:

    dagster dev -f dagster_code.py
    
  2. 点击右上角的"Materialize"按钮来运行你的代码

  3. 导航到"Run details"页面,在这里你可以看到运行的日志

  4. external_code.py中,我们有一个打印语句将输出到stdout。Dagster会在UI的原始计算日志视图中显示这些内容。

  5. 要查看stdout日志,切换日志部分到stdout:

在这里插入图片描述

下一步

到目前为止,你已经创建了一个调用外部Python脚本的Dagster资产,在子进程中执行了代码,并在Dagster UI中查看了结果。接下来,你将学习如何修改外部代码以与Dagster Pipes配合工作,将信息发送回Dagster。

总结

通过本教程的第一部分,我们实现了:

  • 创建了一个Dagster资产来调用外部Python脚本
  • 配置了必要的资源来支持子进程执行
  • 在Dagster UI中成功运行并查看了结果

这个基础设置为你在后续步骤中实现更复杂的管道通信打下了良好的基础。

http://www.xdnf.cn/news/5715.html

相关文章:

  • 【CF】Day57——Codeforces Round 955 (Div. 2, with prizes from NEAR!) BCD
  • 利用散点图探索宇航员特征与太空任务之间的关系
  • BUUCTF 大流量分析(三) 1
  • 开源链动2+1模式AI智能名片S2B2C商城小程序赋能新微商服务能力升级研究
  • 主从架构:技术原理与实现
  • python实现usb热插拔检测(linux)
  • 【Nova UI】十三、打造组件库之按钮组件(中):样式雕琢全攻略
  • 【学习笔记】机器学习(Machine Learning) | 第六章(2)| 过拟合问题
  • 编程题 02-线性结构3 Reversing Linked List【PAT】
  • WebFlux vs WebMVC vs Servlet 对比
  • spark的处理过程-转换算子和行动算子
  • Spark,RDD中的转换算子
  • NVMe-oF(NVMe over Fabrics)
  • 车联网大数据:从数据到场景的闭环实践
  • Linux 软件包|服务管理
  • 极狐GitLab 通用软件包存储库功能介绍
  • Excel-to-JSON插件专业版功能详解:让Excel数据转换更灵活
  • 什么是内存刷新
  • 中国黄土高原中部XF剖面磁化率和粒度数据
  • 鸿蒙HarmonyOS list优化一: list 结合 lazyforeach用法
  • dp自动化登陆之hCaptcha 验证码
  • http接口性能优化方案
  • uniapp|实现手机通讯录、首字母快捷导航功能、多端兼容(H5、微信小程序、APP)
  • 键盘输出希腊字符方法
  • .net 公共变量 线程安全
  • 高并发内存池(三):TLS无锁访问以及Central Cache结构设计
  • Python文字转语音TTS库示例(edge-tts)
  • keil 解决 Error: CreateProcess failed, Command: ‘XXX\ARM\ARMCC\bin\fromelf.exe
  • 精益数据分析(55/126):双边市场模式的挑战、策略与创业阶段关联
  • Leetcode (力扣)做题记录 hot100(34,215,912,121)