血缘元数据采集开放标准:OpenLineage Dataset Facets
OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。
数据集 Facets
Dataset Facets
数据集 Facets 一般由在 OpenLineage 事件的 inputs
与 outputs
中共同使用的通用 facet 组成,也存在专为输入或输出数据集设计的 facet。
{..."inputs": [{"namespace": "postgres://workshop-db:None","name": "workshop.public.taxes-in","facets": {# 通用数据集 facets 位于此处},"inputFacets": {# 输入数据集 facets 位于此处}}],"outputs": [{"namespace": "postgres://workshop-db:None","name": "workshop.public.taxes-out","facets": {# 通用数据集 facets 位于此处},"outputFacets": {# 输出数据集 facets 位于此处}}],...
}
在上述示例中,请注意区分适用于输入与输出数据集的通用 facets,以及仅针对输入或输出的特定 facets。通用 facets 均位于 facets
属性下;而输入或输出特定的 facets 则分别位于 inputFacets
或 outputFacets
属性中。
目录数据集 Facet
Catalog Dataset Facet
该 facet 包含处理引擎在访问此数据集时所使用的目录信息。
字段说明:
framework
:为该目录配置的存储框架(例如:iceberg、delta、hive)。type
:目录的类型(例如:jdbc、glue、polaris)。name
:在源系统中配置的目录名称(例如:my_iceberg_catalog)。metadataUri
:指向目录的 URI 或连接字符串(如适用)(例如:jdbc:mysql://host:3306/iceberg_database)。warehouseUri
:指向该目录所描述数据的物理位置的 URI 或连接字符串(例如:s3://bucket/path/to/iceberg/warehouse)。source
:配置目录的源系统(例如:spark、flink、hive)。
framework
、type
与 name
为必填字段。
示例:
{..."inputs": {"facets": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/CatalogDatasetFacet.json","framework": "iceberg","type": "polaris","name": "my_iceberg_catalog","metadataUri": "http://host:1234/iceberg_database","warehouseUri": "s3://bucket/path/to/iceberg/warehouse","source": "spark"}}...
}
该 facet 的规范可在此处找到。
数据质量断言 Facet
Data Quality Assertions Facet
示例:
{..."inputs": {"facets": {"dataQualityAssertions": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DataQualityAssertionsDatasetFacet.json","assertions": [{"assertion": "not_null","success": true,"column": "user_name"},{"assertion": "is_string","success": true,"column": "user_name"}]}}}...
}
该 facet 的规范可在此处找到。
列级血缘数据集 Facet
Column Level Lineage Dataset Facet
列级血缘(Column level lineage)提供了数据集依赖关系的细粒度信息。我们不仅能知道依赖存在,还能了解哪些输入列被用于生成哪些输出列,以及具体的转换方式。由此可以回答诸如 构建列 x 的源输入列有哪些? 的问题。
例如,某作业执行以下 SQL:
INSERT INTO top_delivery_times (order_id,order_placed_on,order_delivered_on,order_delivery_time
)
SELECTorder_id,order_placed_on,order_delivered_on,DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time
FROM delivery_7_days
ORDER BY order_delivery_time DESC
LIMIT 1;
这将在 delivery_7_days
表与 top_delivery_times
表之间建立如下关系:
使用列级血缘 facet 表示该查询的 OpenLineage run state update 示例如下:
{"eventType": "START","eventTime": "2020-02-22T22:42:42.000Z","run": ...,"job": ...,"inputs": [{"namespace": "food_delivery","name": "public.delivery_7_days"}],"outputs": [{"namespace": "food_delivery","name": "public.top_delivery_times","facets": {"columnLineage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json","fields": {"order_id": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_id","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_placed_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_delivered_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]}]},"order_delivery_time": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "TRANSFORMATION","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "DIRECT","subtype": "TRANSFORMATION","description": "","masking": false}]}]}},"dataset": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]}}}],...
}
该 facet 的规范可在此处找到。
Transformation Type
为尽可能提供每个字段血缘的详细信息,每个输出的 inputField
可包含 transformations
字段。该字段描述输入列与输出列之间的关系性质。
每个转换由 4 个字段描述:type
、subtype
、description
和 masking
。
Type
指示关系的直接程度,例如在查询
SELECTsource AS result
FROM TAB
WHERE pred = true;
中:
DIRECT
— 输出列值以某种方式源自inputField
值。上例中result
的值源自source
。INDIRECT
— 输出列值受inputField
列值影响,但并非由其派生。上例中result
的值没有任何部分源自pred
,但pred
影响输出数据集中result
的值。
Subtype
包含更具体的转换信息
Direct:
IDENTITY
— 输出值直接取自输入TRANSFORMATION
— 输出值是对输入行源值的转换AGGREGATION
— 输出值是对多行源值的聚合
Indirect:
JOIN
— 输入用于连接条件GROUP_BY
— 输出基于输入进行聚合(如GROUP BY
子句)FILTER
— 输入作为过滤条件(如WHERE
子句)SORT
— 输出基于输入字段排序(如ORDER BY
子句)WINDOW
— 输出基于输入字段开窗CONDITIONAL
— 输入值用于IF
、CASE WHEN
或COALESCE
语句
Masking
布尔值,指示在转换过程中输入值是否被脱敏。
示例:TRANSFORMATION
的 hash
、AGGREGATION
的 count
。
具体哪些方法被视为脱敏,取决于源系统。
Legacy representation
对于 Spark,上述结果通过配置选项
spark.openlineage.columnLineage.datasetLineageEnabled=True
产生。
默认值为 False
,此时所有列从 "dataset"
字段移至 "fields"
:
{"columnLineage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json","fields": {"order_id": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_id","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]},"order_placed_on": {"inputFields": [{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "DIRECT","subtype": "IDENTITY","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_placed_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]},{"namespace": "food_delivery","name": "public.delivery_7_days","field": "order_delivered_on","transformations": [{"type": "INDIRECT","subtype": "SORT","description": "","masking": false}]}]}// ... other fields},"dataset": [] // empty}
}
因此每个目标数据集字段都通过 INDIRECT
列级血缘依赖于每个源数据集字段,几乎产生所有数据集字段的笛卡尔积,极为低效。
建议始终启用 spark.openlineage.columnLineage.datasetLineageEnabled=True
,以获得更紧凑的列级血缘表示。未来版本的 OpenLineage 可能会更改默认值。
数据源 Facet
Datasource Facet
示例:
{..."inputs": {"facets": {"dataSource": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json","name": "datasource_one","url": "https://some.location.com/datsource/one"}}}...
}
该 facet 的规范可在此处找到。
数据集文档 Facet
Dataset Documentation Facet
包含数据集的文档或描述。
示例:
{..."job": {"facets": {"documentation": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/DocumentationDatasetFacet.json","description": "This is the documentation of something.","contentType": "text/markdown"}}}...
}
该 facet 的规范可在此处找到。
生命周期状态变更 Facet
Lifecycle State Change Facet
示例:
{..."outputs": {"facets": {"lifecycleStateChange": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json","lifecycleStateChange": "CREATE"}}}...
}
{..."outputs": {"facets": {"lifecycleStateChange": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/LifecycleStateChangeDatasetFacet.json","lifecycleStateChange": "RENAME","previousIdentifier": {"namespace": "example_namespace","name": "example_table_1"}}}}...
}
该 facet 的规范可在此处找到。
所有权数据集 Facet
Ownership Dataset Facet
示例:
{..."inputs": {"facets": {"ownership": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json","owners": [{"name": "maintainer_one","type": "MAINTAINER"}]}}}...
}
该 facet 的规范可在此处找到。
数据集 Schema Facet
Schema Dataset Facet
数据集 Schema Facet 用于描述特定数据集的 schema。除字段名称外,还可为每个字段提供可选的类型与描述。支持嵌套字段。
示例:
{..."inputs": {"facets": {"schema": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json","fields": [{"name": "id","type": "int","description": "Customer's identifier"},{"name": "name","type": "string","description": "Customer's name"},{"name": "is_active","type": "boolean","description": "Has customer completed activation process"},{"name": "phones","type": "array","description": "List of phone numbers","fields": [{"name": "_element","type": "string","description": "Phone number"}]},{"name": "address","type": "struct","description": "Customer address","fields": [{"name": "type","type": "string","description": "Address type, g.e. home, work, etc."},{"name": "country","type": "string","description": "Country name"},{"name": "zip","type": "string","description": "Zip code"},{"name": "state","type": "string","description": "State name"},{"name": "street","type": "string","description": "Street name"}]},{"name": "custom_properties","type": "map","fields": [{"name": "key","type": "string"},{"name": "value","type": "union","fields": [{"name": "_0","type": "string"},{"name": "_1","type": "int64"}]}]}]}}}...
}
该 facet 的规范可在此处找到。
存储 Facet
Storage Facet
示例:
{..."inputs": {"facets": {"storage": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/StorageDatasetFacet.json","storageLayer": "iceberg","fileFormat": "csv"}}}...
}
该 facet 的规范可在此处找到。
Symlinks Facet
示例:
{..."inputs": {"facets": {"symlinks": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SymlinksDatasetFacet.json","identifiers": ["namespace": "example_namespace","name": "example_dataset_1","type": "table"]}}}...
}
该 facet 的规范可在此处找到。
标签数据集 Facet
Tags Dataset Facet
该 facet 包含与数据集关联的标签。
示例:
{..."inputs": {"facets": {"tags": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/TagsDatasetFacet.json","tags": [{"key": "environment","value": "production","source": "CONFIG"},{"key": "classification","value": "PII","source": "CONFIG","field": "tax_id"}]}}}...
}
该 facet 的规范可在此处找到。
数据集类型 Facet
Dataset Type Facet
该 facet 描述数据集在数据库中的类型。
字段说明:
datasetType
:数据集类型,如TABLE
、VIEW
、TOPIC
、MODEL
。subType
:在datasetType
内的子类型,如MATERIALIZED
、EXTERNAL
。
示例:
{..."inputs": {"facets": {"datasetType": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetTypeDatasetFacet.json","datasetType": "VIEW","subType": "MATERIALIZED"}}}...
}
该 facet 的规范可在此处找到。
版本 Facet
Version Facet
示例:
{..."inputs": {"facets": {"version": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetVersionDatasetFacet.json","datasetVersion": "1"}}}...
}
该 facet 的规范可在此处找到。
数据质量指标 Facet
Data Quality Metrics Facet
示例:
{..."inputs": {"inputFacets": {"dataQualityMetrics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-2/DataQualityMetricsInputDatasetFacet.json","rowCount": 123,"fileCount": 5,"bytes": 35602,"columnMetrics": {"column_one": {"nullCount": 132,"distincCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}},"column_two": {"nullCount": 132,"distinctCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}},"column_three": {"nullCount": 132,"distincCount": 11,"sum": 500,"count": 234,"min": 111,"max": 3234,"quantiles": {"0.1": 12,"0.5": 22,"1": 123,"2": 11}}}}}}...
}
该 facet 的规范可在此处找到。
输入统计 Facet
Input Statistics Facet
示例:
{..."inputs": {"inputFacets": {"inputStatistics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json","rowCount": 123,"fileCount": 5,"size": 35602}}}...
}
该 facet 的规范可在此处找到。
输出统计 Facet
Output Statistics Facet
示例:
{..."outputs": {"outputFacets": {"outputStatistics": {"_producer": "https://some.producer.com/version/1.0","_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json","rowCount": 123,"fileCount": 5,"size": 35602}}}...
}
该 facet 的规范可在此处找到。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。