当前位置: 首页 > backend >正文

血缘元数据采集开放标准:OpenLineage Integrations Manually Annotated Lineage

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

手动标注血缘关系

Manually Annotated Lineage

本页面主要面向 Airflow 版本 <2.7 的外部集成。
若使用 Airflow 2.7+,请查看原生 Airflow OpenLineage 提供方文档。

后续开发与增强将聚焦于 apache-airflow-providers-openlineage 包,而 openlineage-airflow 仅做缺陷修复。参见此集成支持的所有 Airflow 版本。

此功能仅支持 Airflow 2.1.0 及以上版本。

Airflow 允许通过为运算符指定输入和输出来追踪血缘关系,这些输入输出称为 inlets 和 outlets。OpenLineage 会尝试通过内置或自定义提取器获取 Airflow 作业的输入输出数据集;若失败,则回退到使用 Airflow 作业的 inlets 和 outlets。

OpenLineage 仅对部分运算符支持自动血缘提取。对于其余运算符以及自定义运算符,用户必须编写自己的自定义提取器(实现 extract / extract_on_complete 方法),以指明对应任务的输入输出数据集。
通过直接指定运算符的 inlets 与 outlets 可以绕过这一限制;当提取器未成功提取任何输入输出数据集时,OpenLineage 将默认使用 inlets 与 outlets。

在定义 DAG 时,可以为每个运算符提供 inlets 与 outlets 列表,列表元素为 Table。

Airflow 支持 inlets 与 outlets 为 Table、Column、File 或 User 实体,但目前 OpenLineage 仅通过 Table 实体提取血缘关系。

示例

在 Airflow DAG 中,运算符可如下方式标注 inlets 与 outlets:

"""演示通过 Inlets 与 Outlets 提取血缘的示例 DAG。"""import pendulum
import datetimefrom airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.lineage.entities import Table, Filedef create_table(cluster, database, name):return Table(database=database,cluster=cluster,name=name,)t1 = create_table("c1", "d1", "t1")
t2 = create_table("c1", "d1", "t2")
t3 = create_table("c1", "d1", "t3")
t4 = create_table("c1", "d1", "t4")
f1 = File(url = "http://randomfile")with DAG(dag_id='example_operator',schedule_interval='0 0 * * *',start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),dagrun_timeout=datetime.timedelta(minutes=60),params={"example_key": "example_value"},
) as dag:task1 = BashOperator(task_id='task_1_with_inlet_outlet',bash_command='echo "{{ task_instance_key_str }}" && sleep 1',inlets=[t1, t2],outlets=[t3],)task2 = BashOperator(task_id='task_2_with_inlet_outlet',bash_command='echo "{{ task_instance_key_str }}" && sleep 1',inlets=[t3, f1],outlets=[t4],)task1 >> task2if __name__ == "__main__":dag.cli()

对应的血缘图如下:

marquez_lineage

(该图像由 Marquez UI(OpenLineage 事件元数据收集器)展示。更多信息参见此处)

另请注意,当前血缘事件未捕获 File 实体。

Airflow Table 实体到 OpenLineage Dataset 的转换

采用的命名约定如下:

  1. Table 实体的 CLUSTER 成为 OpenLineage Dataset 的 namespace
  2. Dataset 的名称由 {{DATABASE}}.{{NAME}} 构成,其中 DATABASENAME 为 Airflow Table 实体所指定的属性。

支持的 Airflow 版本

本页介绍的是主要用于 Airflow 版本低于 2.7 的外部集成。如果你使用的是 Airflow 2.7 及以上版本,请查阅原生 Airflow OpenLineage 提供程序文档。

后续的开发与增强将聚焦于 apache-airflow-providers-openlineage 包,而 openlineage-airflow 将主要进行缺陷修复。查看 此集成支持的所有 Airflow 版本

支持的 Airflow 版本
Airflow 2.7+

从 Airflow 2.7.0 开始 不应 使用此包,并且在 Airflow 2.8+ 中 无法 使用。
该包设计为面向 Airflow 版本低于 2.7 的外部集成。
对于 Airflow 2.7+,请使用原生 Airflow OpenLineage 提供程序
apache-airflow-providers-openlineage

Airflow 2.3 - 2.6

注意: 支持 Airflow 2.3-2.4 的最后一个 openlineage-airflow 版本为 1.33.0

从 Airflow 2.3 开始,只要该集成已安装在 Airflow 工作节点的 Python 环境中,它会自动注册。
这意味着除了配置事件发送位置(详见 配置 章节)外,无需额外操作。

Airflow 2.1 - 2.2

注意: 支持 Airflow 2.1-2.2 的最后一个 openlineage-airflow 版本为 1.14.0

这些版本下的集成存在限制:不支持跟踪失败任务,
任务启动仅在任务结束时才被记录(基于 LineageBackend 的方法会在每个任务完成时收集该任务的所有元数据)。

为使 OpenLineage 生效,除安装 openlineage-airflow 外,还需在 airflow.cfg 中或通过环境变量 AIRFLOW__LINEAGE__BACKEND 指定 LineageBackend

openlineage.lineage_backend.OpenLineageBackend

OpenLineageBackend 不会考虑手动配置的 inlets 与 outlets。

Airflow <2.1

OpenLineage 不支持低于 Airflow 2.1 的版本。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.xdnf.cn/news/18988.html

相关文章:

  • 企业级数据库管理实战(二):数据库权限最小化原则的落地方法
  • 【分治法 BFS 质因数分解】P12255 [蓝桥杯 2024 国 Java B] 园丁|普及+
  • 智慧养老建设方案(PPT)
  • 开源大语言模型(Qwen3)
  • 深入探讨可视化技术如何实现安全监测
  • 【小白笔记】Visual Studio 在 2025年7月更新的功能说明(英文单词记忆)
  • 智慧工地系统:基于Java微服务与信创国产化的建筑施工数字化管理平台
  • 171-178CSS3新增
  • NullPointerException 空指针异常,为什么老是遇到?
  • 评价指标FID/R Precision
  • vscode编辑器中设置断点,不会自动启动调试器
  • 介绍⼀下Llama的结构
  • Spring Boot 整合 MongoDB:CRUD 与聚合查询实战
  • Jenkins 全方位指南:安装、配置、部署与实战应用(含图解)
  • 如何规划一年、三年、五年的IP发展路线图?
  • 01.<<基础入门:了解网络的基本概念>>
  • Leetcode 深度优先搜索 (15)
  • WINTRUST!_ExplodeMessag函数中的pCatAdd
  • Yolov8 pose 推理部署笔记
  • Vue开发避坑:箭头函数与普通函数的正确使用指南
  • LeetCode 刷题【55. 跳跃游戏】
  • 从协作机器人到智能协作机器人:工业革命的下一跳
  • 【JavaScript】递归的问题以及优化方法
  • 安宝特方案丨安宝特工业AR全链路解决方案
  • Unity游戏打包——iOS打包基础、上传
  • java后端的各种注解
  • Linux 禁止 su 的几种限制手段:从 NoNewPrivileges 到 PAM 配置
  • GitHub 宕机自救指南:确保开发工作不间断
  • 大数据毕业设计选题推荐-基于大数据的存量房网上签约月统计信息可视化分析系统-Hadoop-Spark-数据可视化-BigData
  • 学习嵌入式之驱动——I2C子系统