血缘元数据采集开放标准:OpenLineage Integrations Manually Annotated Lineage
OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。
手动标注血缘关系
Manually Annotated Lineage
本页面主要面向 Airflow 版本 <2.7 的外部集成。
若使用 Airflow 2.7+,请查看原生 Airflow OpenLineage 提供方文档。后续开发与增强将聚焦于
apache-airflow-providers-openlineage
包,而openlineage-airflow
仅做缺陷修复。参见此集成支持的所有 Airflow 版本。此功能仅支持 Airflow 2.1.0 及以上版本。
Airflow 允许通过为运算符指定输入和输出来追踪血缘关系,这些输入输出称为 inlets 和 outlets。OpenLineage 会尝试通过内置或自定义提取器获取 Airflow 作业的输入输出数据集;若失败,则回退到使用 Airflow 作业的 inlets 和 outlets。
OpenLineage 仅对部分运算符支持自动血缘提取。对于其余运算符以及自定义运算符,用户必须编写自己的自定义提取器(实现 extract
/ extract_on_complete
方法),以指明对应任务的输入输出数据集。
通过直接指定运算符的 inlets 与 outlets 可以绕过这一限制;当提取器未成功提取任何输入输出数据集时,OpenLineage 将默认使用 inlets 与 outlets。
在定义 DAG 时,可以为每个运算符提供 inlets 与 outlets 列表,列表元素为 Table。
Airflow 支持 inlets 与 outlets 为 Table、Column、File 或 User 实体,但目前 OpenLineage 仅通过 Table 实体提取血缘关系。
示例
在 Airflow DAG 中,运算符可如下方式标注 inlets 与 outlets:
"""演示通过 Inlets 与 Outlets 提取血缘的示例 DAG。"""import pendulum
import datetimefrom airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.lineage.entities import Table, Filedef create_table(cluster, database, name):return Table(database=database,cluster=cluster,name=name,)t1 = create_table("c1", "d1", "t1")
t2 = create_table("c1", "d1", "t2")
t3 = create_table("c1", "d1", "t3")
t4 = create_table("c1", "d1", "t4")
f1 = File(url = "http://randomfile")with DAG(dag_id='example_operator',schedule_interval='0 0 * * *',start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),dagrun_timeout=datetime.timedelta(minutes=60),params={"example_key": "example_value"},
) as dag:task1 = BashOperator(task_id='task_1_with_inlet_outlet',bash_command='echo "{{ task_instance_key_str }}" && sleep 1',inlets=[t1, t2],outlets=[t3],)task2 = BashOperator(task_id='task_2_with_inlet_outlet',bash_command='echo "{{ task_instance_key_str }}" && sleep 1',inlets=[t3, f1],outlets=[t4],)task1 >> task2if __name__ == "__main__":dag.cli()
对应的血缘图如下:
(该图像由 Marquez UI(OpenLineage 事件元数据收集器)展示。更多信息参见此处)
另请注意,当前血缘事件未捕获 File 实体。
Airflow Table 实体到 OpenLineage Dataset 的转换
采用的命名约定如下:
- Table 实体的
CLUSTER
成为 OpenLineage Dataset 的 namespace - Dataset 的名称由
{{DATABASE}}.{{NAME}}
构成,其中DATABASE
与NAME
为 Airflow Table 实体所指定的属性。
支持的 Airflow 版本
本页介绍的是主要用于 Airflow 版本低于 2.7 的外部集成。如果你使用的是 Airflow 2.7 及以上版本,请查阅原生 Airflow OpenLineage 提供程序文档。
后续的开发与增强将聚焦于
apache-airflow-providers-openlineage
包,而openlineage-airflow
将主要进行缺陷修复。查看 此集成支持的所有 Airflow 版本
支持的 Airflow 版本
Airflow 2.7+
从 Airflow 2.7.0 开始 不应 使用此包,并且在 Airflow 2.8+ 中 无法 使用。
该包设计为面向 Airflow 版本低于 2.7 的外部集成。
对于 Airflow 2.7+,请使用原生 Airflow OpenLineage 提供程序
包 apache-airflow-providers-openlineage
。
Airflow 2.3 - 2.6
注意: 支持 Airflow 2.3-2.4 的最后一个 openlineage-airflow 版本为 1.33.0
从 Airflow 2.3 开始,只要该集成已安装在 Airflow 工作节点的 Python 环境中,它会自动注册。
这意味着除了配置事件发送位置(详见 配置 章节)外,无需额外操作。
Airflow 2.1 - 2.2
注意: 支持 Airflow 2.1-2.2 的最后一个 openlineage-airflow 版本为 1.14.0
这些版本下的集成存在限制:不支持跟踪失败任务,
任务启动仅在任务结束时才被记录(基于 LineageBackend
的方法会在每个任务完成时收集该任务的所有元数据)。
为使 OpenLineage 生效,除安装 openlineage-airflow
外,还需在 airflow.cfg 中或通过环境变量 AIRFLOW__LINEAGE__BACKEND
指定 LineageBackend
为
openlineage.lineage_backend.OpenLineageBackend
OpenLineageBackend 不会考虑手动配置的 inlets 与 outlets。
Airflow <2.1
OpenLineage 不支持低于 Airflow 2.1 的版本。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。