Paimon 建表常用属性分析
Paimon 建表常用属性分析
大家好,我是FP,一名在读硕士研究生,最近在调研湖仓一体时,发现Paimon建表属性存在诸多难点,因此形成如下总结文字,以供学习交流,若有错误之处,欢迎批评指正。
Paimon常用建表属性指定
-- Flink 建表语句
CREATE TABLE `paimon-06-1`.`ods`.`demo_paimon` (id INT COMMENT '用户ID,唯一字段',name STRING COMMENT '用户姓名',age INT COMMENT '用户年龄',gender INT COMMENT '用户性别',dt STRING COMMENT '日期分区字段',PRIMARY KEY (id, dt) NOT ENFORCED -- 添加逗号分隔字段
) COMMENT '用户信息表'
PARTITIONED BY (dt)
WITH (-- 1. 桶相关优化'bucket' = '4', 'bucket-key' = 'id', -- 2. 合并压缩(Compaction)优化,合并原始的的ORC文件'compaction.min.file-num' = '5','compaction.max.file-num' = '20',-- 3. 日志系统与变更捕获 'log.changelog-mode' = 'auto','log.consistency' = 'transactional','changelog-producer' = 'input',-- 4. 合并引擎'merge-engine' = 'deduplicate',-- 5. 写入优化'write-mode' = 'change-log','write-buffer-size' = '256mb',-- 6. 读取优化'scan.mode' = 'latest',-- 7. 快照管理 - 防止元数据膨胀'snapshot.time-retained' = '24h','snapshot.num-retained.min' = '10','snapshot.num-retained.max' = '30',-- 8. 持续发现分区 (可选,根据需求)'continuous.discovery-interval' = '5s'
);
分区分桶
数据的管理和过滤方式,将数据按照分区、分桶组织成相应的目录,能够显著加速按分区键过滤的查询速度,也便于按分区进行过期数据清理。
同时,DataWorks里面有设置如果是分区表,则必须指定分区字段,否则将无法查询。
合并压缩优化
Compaction负责将桶内的小文件合并成大文件,可以有效防止小文件产生,优化查询性能并管理文件数量,节省存储空间。
-- 桶内文件大于5时才可能触发compaction
'compaction.min.file-num' = '5',
-- 桶内文件大于20时必须触发compaction
'compaction.max.file-num' = '20',
日志系统与变更捕获
changelog 变更日志,记录表数据变化(+I/-D/+U/-U)的日志流。
update_before:表示一条记录在更新操作之前的状态(旧值)
-- 为表指定日志变更模式
'log.changelog-mode' = 'full',
-- 指定日志一致性保证
'log.consistency' = 'transactional',
-- 是否重复写入变更日志文件/日志文件产生方式
'changelog-producer' = 'input',
日志变更模式
如何追加日志?
模式 | 行为描述 |
---|---|
Auto | 有主键的表自动设置为Upsert,无主键的表则为All |
Upsert | 日志系统不存储update_before的更改,日志消费作业将自动添加规范化节点,根据状态生成所需的update_before(只会存储+U) |
All | 保存包括update_before在内的所有修改信息(强制生成包含所有变更类型的完整changelog,会存储+U、-U) |
日志一致性保证模式
怎么保证日志一致性?
模式 | 行为描述 |
---|---|
transactional | 保证原子提交和changelog顺序,提供exactly-once语义 |
eventual | 保证最终一致,延迟较低但可能会乱序/重复 |
增量数据产生机制
何时写入日志?
模式 | 行为描述 |
---|---|
none | 无变更日志文件(纯批处理场景,无需实时获增量消费数据) |
input | 在刷新内存表时对变更日志文件进行两次写入(直接将Paimon表的输入流作为changelog原样写入Log System) |
full-compaction | 在每次全量压缩时生成变更日志文件 |
lookup | 每次commit snapshot前产生完整的增量数据(查询时生成日志文件),时效性更高,但资源消耗更高 |
合并引擎/数据合并机制
定义如何合并相同主键的数据
-- 4. 合并引擎
'merge-engine' = 'deduplicate',
合并方式:
模式 | 适用场景 |
---|---|
deduplicate | 仅保留相同主键的最新记录(对于具有多条具有相同主键的数据,Paimon结果表仅会保留最新一条数据,并丢弃其它具有主键的数据),适用于主键更新模型 |
partial-update | 将相同主键的多行记录按字段合并,适用于宽表更新,部分更新非空字段 |
aggregation | 对相同主键的记录进行预聚合,适用于分组聚合的场景 |
first-row | 仅保留相同主键的第一条记录,适用于去重但不更新的场景 |
写入优化
如何写入数据文件
-- 写入方式
'write-mode' = 'change-log',
-- 单个写入器的内存缓存大小,超过此值将会执行Flush操作
'write-buffer-size' = '256mb',
写入方式:
模式 | 行为描述 |
---|---|
auto | 有主键的表为change-log,没有主键的表则为append-only |
append-only | 直接追加,不会进行主键去重,不会执行数据重复删除或任何主键约束,所以效率会非常高 |
change-log | 进行主键去重,可以接受插入/删除/更新操作 |
其它常用参数:
-- 指定Flink Sink 作业的并行度,与桶数匹配或成倍数关系
'sink.parallelism' = '4'
-- 是否忽略delete操作
'write.ignore-delete' = 'false'
-- 是否溢写到磁盘
'write-buffer-spillable' = 'true'
CDAS语法
CDAS(create datastream as select),阿里云Flink引擎会将CDAS语句自动转换成一个一个的CTAS语法进行执行,从而实现整库同步,避免对每个表都进行单独插入、调度,产生大量的insert into操作,占用大量的数据库连接,对业务数据库和网络IO造成压力。
前置知识:CTAS(create table as select),传统方法,通过insert into的形式将MySQL数据导入到Paimon中,如果是多张表,那么就编写多个insert into语句。
-- CDAS语法
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]<target_database>:[catalog_name.]db_name<source_database>:[catalog_name.]db_name
实际示例:将MySQL某个库中的数据同步到kafka中
BEGIN STATEMENT SET;-- 同步A库。
CREATE DATABASE IF NOT EXISTS fp_kafka.kafka
WITH ('cdas.topic.pattern' = 'ods_{table-name}')
AS DATABASE mysqlcdc.`order` INCLUDING ALL TABLES
/*+ OPTIONS('scan.startup.mode' = 'initial') */ ;-- 同步B库。
CREATE DATABASE IF NOT EXISTS fp_kafka.kafka
WITH ('cdas.topic.pattern' = 'ods_{table-name}')
AS DATABASE mysqlcdc.driver INCLUDING ALL TABLES
/*+ OPTIONS('scan.startup.mode' = 'initial') */ ;END;
读取优化/消费位点
指定表的扫描方式/读取的起始位置
'scan.mode' = 'latest',
读取/扫描方式:
模式 | 行为描述 |
---|---|
latest | 流读时读取最新快照,符合大多数实时消费场景 |
latest-full | 流读时,首次启动读取当前全量快照,之后消费增量changelog,用于初始化状态或需要全量+增量的场景。批读时,只生成最新的快照,但不读取新的更改 |
from-snapshot/timestamp | 流读时,从指定快照ID或时间戳开始读取,用于时间旅行或回溯。批读时,读取该快照节点/时间节点之前的最新数据,但不读取新的更改 |
full-compaction | 流读时,最近一次压缩后生成快照时的最新数据,后面的数据将会增量读取。批读时,最近一次压缩后生成快照时的最新数据,后面的修改数据将不会被读取到 |
hint语法
-- 从快照 N 开始消费增量变更 (Changelog)
SELECT * FROM my_table /*+ OPTIONS('scan.mode'='latest-full') */;-- 查询快照 ID=123 时的数据
SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id'='123') */;-- 查询 1 小时前的数据 (基于时间戳)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis'='1698300000000') */;
快照管理
快照元数据的管理,Paimon依赖快照实现MVCC和时间旅行,对快照文件的自动过期处理,可以防止快照元数据膨胀
MVCC(Multi-Version Concurrency Control,多版本并发控制)
-- 保留至少最近多长时间内的快照
'snapshot.time-retained' = '24h',
-- 最少保留的快照数量,即时超过了过期期限,仍然要保留下来
'snapshot.num-retained.min' = '10',
-- 最多保留的快照数量
'snapshot.num-retained.max' = '30',
持续发现分区
流读时,自动检查新分区的时间间隔
'continuous.discovery-interval' = '5s'