当前位置: 首页 > ai >正文

血缘元数据采集开放标准:OpenLineage Dataset Facets

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

数据集 Facets

Dataset Facets

数据集 Facets 一般由在 OpenLineage 事件的 inputsoutputs 中共同使用的通用 facet 组成,也存在专为输入或输出数据集设计的 facet。

{..."inputs": [{"namespace": "postgres://workshop-db:None","name": "workshop.public.taxes-in","facets": {# 通用数据集 facets 位于此处},"inputFacets": {# 输入数据集 facets 位于此处}}],"outputs": [{"namespace": "postgres://workshop-db:None","name": "workshop.public.taxes-out","facets": {# 通用数据集 facets 位于此处},"outputFacets": {# 输出数据集 facets 位于此处}}],...
}

在上述示例中,请注意区分适用于输入与输出数据集的通用 facets,以及仅针对输入或输出的特定 facets。通用 facets 均位于 facets 属性下;而输入或输出特定的 facets 则分别位于 inputFacetsoutputFacets 属性中。

目录数据集 Facet

Catalog Dataset Facet

该 facet 包含处理引擎在访问此数据集时所使用的目录信息。

字段说明:

  • framework:为该目录配置的存储框架(例如:iceberg、delta、hive)。
  • type:目录的类型(例如:jdbc、glue、polaris)。
  • name:在源系统中配置的目录名称(例如:my_iceberg_catalog)。
  • metadataUri:指向目录的 URI 或连接字符串(如适用)(例如:jdbc:mysql://host:3306/iceberg_database)。
  • warehouseUri:指向该目录所描述数据的物理位置的 URI 或连接字符串(例如:s3://bucket/path/to/iceberg/warehouse)。
  • source:配置目录的源系统(例如:spark、flink、hive)。

frameworktypename 为必填字段。

示例:

{..."inputs": {"facets": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/CatalogDatasetFacet.json","framework": "iceberg","type": "polaris","name": "my_iceberg_catalog","metadataUri": "http://host:1234/iceberg_database","warehouseUri": "s3://bucket/path/to/iceberg/warehouse","source": "spark"}}...
}

该 facet 的规范可在此处找到。

数据质量断言 Facet

Data Quality Assertions Facet

示例:

{..."inputs": {"facets": {"dataQualityAssertions": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DataQualityAssertionsDatasetFacet.json","assertions": [{"assertion": "not_null","success": true,"column": "user_name"},{"assertion": "is_string","success": true,"column": "user_name"}]}}}...
}

该 facet 的规范可在此处找到。

列级血缘数据集 Facet

Column Level Lineage Dataset Facet

列级血缘(Column level lineage)提供了数据集依赖关系的细粒度信息。我们不仅能知道依赖存在,还能了解哪些输入列被用于生成哪些输出列,以及具体的转换方式。由此可以回答诸如 构建列 x 的源输入列有哪些? 的问题。

例如,某作业执行以下 SQL:

INSERT INTO top_delivery_times (order_id,order_placed_on,order_delivered_on,order_delivery_time
)
SELECTorder_id,order_placed_on,order_delivered_on,DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time
FROM delivery_7_days
ORDER BY order_delivery_time DESC
LIMIT 1;

这将在 delivery_7_days 表与 top_delivery_times 表之间建立如下关系:

在这里插入图片描述

使用列级血缘 facet 表示该查询的 OpenLineage run state update 示例如下:

{"eventType": "START","eventTime": "2020-02-22T22:42:42.000Z","run": ...,"job": ...,"inputs": [{"namespace": "food_delivery","name": "public.delivery_7_days"}],"outputs": [{"namespace": "food_delivery","name": "public.top_delivery_times","facets": {"columnLineage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json","fields": {"order_id": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_id","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_placed_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_delivered_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_delivery_time": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "TRANSFORMATION","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "DIRECT","subtype": "TRANSFORMATION","description": "","masking": false}]}]}},"dataset": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]}}}],...
}

该 facet 的规范可在此处找到。

Transformation Type

为尽可能提供每个字段血缘的详细信息,每个输出的 inputField 可包含 transformations 字段。该字段描述输入列与输出列之间的关系性质。
每个转换由 4 个字段描述:typesubtypedescriptionmasking

Type

指示关系的直接程度,例如在查询

SELECTsource AS result
FROM TAB
WHERE pred = true;

中:

  1. DIRECT — 输出列值以某种方式源自 inputField 值。上例中 result 的值源自 source
  2. INDIRECT — 输出列值受 inputField 列值影响,但并非由其派生。上例中 result 的值没有任何部分源自 pred,但 pred 影响输出数据集中 result 的值。

Subtype

包含更具体的转换信息

Direct:

  • IDENTITY — 输出值直接取自输入
  • TRANSFORMATION — 输出值是对输入行源值的转换
  • AGGREGATION — 输出值是对多行源值的聚合

Indirect:

  • JOIN — 输入用于连接条件
  • GROUP_BY — 输出基于输入进行聚合(如 GROUP BY 子句)
  • FILTER — 输入作为过滤条件(如 WHERE 子句)
  • SORT — 输出基于输入字段排序(如 ORDER BY 子句)
  • WINDOW — 输出基于输入字段开窗
  • CONDITIONAL — 输入值用于 IFCASE WHENCOALESCE 语句

Masking

布尔值,指示在转换过程中输入值是否被脱敏。
示例:TRANSFORMATIONhashAGGREGATIONcount
具体哪些方法被视为脱敏,取决于源系统。

Legacy representation

对于 Spark,上述结果通过配置选项
spark.openlineage.columnLineage.datasetLineageEnabled=True 产生。
默认值为 False,此时所有列从 "dataset" 字段移至 "fields"

{"columnLineage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json","fields": {"order_id": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_id","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]},"order_placed_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]}// ... other fields},"dataset": [] // empty}
}

因此每个目标数据集字段都通过 INDIRECT 列级血缘依赖于每个源数据集字段,几乎产生所有数据集字段的笛卡尔积,极为低效。

建议始终启用 spark.openlineage.columnLineage.datasetLineageEnabled=True,以获得更紧凑的列级血缘表示。未来版本的 OpenLineage 可能会更改默认值。

数据源 Facet

Datasource Facet

示例:

{..."inputs": {"facets": {"dataSource": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json","name": "datasource_one","url": "https://some.location.com/datsource/one"}}}...
}

该 facet 的规范可在此处找到。

数据集文档 Facet

Dataset Documentation Facet

包含数据集的文档或描述。

示例:

{..."job": {"facets": {"documentation": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/DocumentationDatasetFacet.json","description": "This is the documentation of something.","contentType": "text/markdown"}}}...
}

该 facet 的规范可在此处找到。

生命周期状态变更 Facet

Lifecycle State Change Facet

示例:

{..."outputs": {"facets": {"lifecycleStateChange": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json","lifecycleStateChange": "CREATE"}}}...
}
{..."outputs": {"facets": {"lifecycleStateChange": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json","lifecycleStateChange": "RENAME","previousIdentifier": {"namespace": "example_namespace","name": "example_table_1"}}}}...
}

该 facet 的规范可在此处找到。

所有权数据集 Facet

Ownership Dataset Facet

示例:

{..."inputs": {"facets": {"ownership": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json","owners": [{"name": "maintainer_one","type": "MAINTAINER"}]}}}...
}

该 facet 的规范可在此处找到。

数据集 Schema Facet

Schema Dataset Facet

数据集 Schema Facet 用于描述特定数据集的 schema。除字段名称外,还可为每个字段提供可选的类型与描述。支持嵌套字段。

示例:

{..."inputs": {"facets": {"schema": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json","fields": [{"name": "id","type": "int","description": "Customer's identifier"},{"name": "name","type": "string","description": "Customer's name"},{"name": "is_active","type": "boolean","description": "Has customer completed activation process"},{"name": "phones","type": "array","description": "List of phone numbers","fields": [{"name": "_element","type": "string","description": "Phone number"}]},{"name": "address","type": "struct","description": "Customer address","fields": [{"name": "type","type": "string","description": "Address type, g.e. home, work, etc."},{"name": "country","type": "string","description": "Country name"},{"name": "zip","type": "string","description": "Zip code"},{"name": "state","type": "string","description": "State name"},{"name": "street","type": "string","description": "Street name"}]},{"name": "custom_properties","type": "map","fields": [{"name": "key","type": "string"},{"name": "value","type": "union","fields": [{"name": "_0","type": "string"},{"name": "_1","type": "int64"}]}]}]}}}...
}

该 facet 的规范可在此处找到。

存储 Facet

Storage Facet

示例:

{..."inputs": {"facets": {"storage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/StorageDatasetFacet.json","storageLayer": "iceberg","fileFormat": "csv"}}}...
}

该 facet 的规范可在此处找到。

Symlinks Facet

示例:

{..."inputs": {"facets": {"symlinks": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json","identifiers": ["namespace": "example_namespace","name": "example_dataset_1","type": "table"]}}}...
}

该 facet 的规范可在此处找到。

标签数据集 Facet

Tags Dataset Facet

该 facet 包含与数据集关联的标签。

示例:

{..."inputs": {"facets": {"tags": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/TagsDatasetFacet.json","tags": [{"key": "environment","value": "production","source": "CONFIG"},{"key": "classification","value": "PII","source": "CONFIG","field": "tax_id"}]}}}...
}

该 facet 的规范可在此处找到。

数据集类型 Facet

Dataset Type Facet

该 facet 描述数据集在数据库中的类型。

字段说明:

  • datasetType:数据集类型,如 TABLEVIEWTOPICMODEL
  • subType:在 datasetType 内的子类型,如 MATERIALIZEDEXTERNAL

示例:

{..."inputs": {"facets": {"datasetType": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetTypeDatasetFacet.json","datasetType": "VIEW","subType": "MATERIALIZED"}}}...
}

该 facet 的规范可在此处找到。

版本 Facet

Version Facet

示例:

{..."inputs": {"facets": {"version": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json","datasetVersion": "1"}}}...
}

该 facet 的规范可在此处找到。

数据质量指标 Facet

Data Quality Metrics Facet

示例:

{..."inputs": {"inputFacets": {"dataQualityMetrics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json","rowCount": 123,"fileCount": 5,"bytes": 35602,"columnMetrics": {"column_one": {"nullCount": 132,"distincCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}},"column_two": {"nullCount": 132,"distinctCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}},"column_three": {"nullCount": 132,"distincCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}}}}}}...
}

该 facet 的规范可在此处找到。

输入统计 Facet

Input Statistics Facet

示例:

{..."inputs": {"inputFacets": {"inputStatistics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json","rowCount": 123,"fileCount": 5,"size": 35602}}}...
}

该 facet 的规范可在此处找到。

输出统计 Facet

Output Statistics Facet

示例:

{..."outputs": {"outputFacets": {"outputStatistics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json","rowCount": 123,"fileCount": 5,"size": 35602}}}...
}

该 facet 的规范可在此处找到。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.xdnf.cn/news/18285.html

相关文章:

  • java开发面试题(提高篇)
  • 大数据毕业设计选题推荐-基于大数据的北京气象站数据可视化分析系统-Hadoop-Spark-数据可视化-BigData
  • JavaScript基础语法five
  • Python学习 -- MySQL数据库的查询及案例
  • 计算两幅图像在特定交点位置的置信度评分。置信度评分反映了该位置特征匹配的可靠性,通常用于图像处理任务(如特征匹配、立体视觉等)
  • redis-缓存-双写一致性
  • git 常用命令整理
  • 【倍增 桶排序】后缀数组
  • 【Java后端】Spring Boot 全局异常处理最佳实践
  • Firefox 142 引入 CRLite 用于私有证书撤销
  • LeetCode热题100--101. 对称二叉树--简单
  • 【clion】visual studio的sln转cmakelist并使用clion构建32位
  • 游戏本不插电源适配器不卡设置教程
  • 数据库架构开发知识库体系
  • Pub/Sub是什么意思
  • 常见的学术文献数据库
  • 好家园房产中介网后台管理完整(python+flask+mysql)
  • 开源的实时 Web 日志分析器GoAccess安装使用指南
  • 【数据结构】快速排序算法精髓解析
  • Vue 3 高性能实践 全面提速剖析!
  • Android 资源替换:静态替换 vs 动态替换
  • [GraphRAG]完全自动化处理任何文档为向量知识图谱:AbutionGraph如何让知识自动“活”起来?
  • sourcetree 拉取代码
  • C++篇(2)C++入门(下)
  • 十二,数据结构-链表
  • Docker Compose命令一览(Docker Compose指令、docker-compose命令)
  • 【基础-判断】@CustomDialog装饰器用于装饰自定义弹窗组件,使得弹窗可以动态设置内容及样式
  • ubuntu下安装vivado2015.2时报错解决方法
  • 1-2前端撸码前的准备,包管理工具和环境搭建
  • SPI 机制深度剖析:Java、Spring、Dubbo 的服务发现哲学与实战指南