当前位置: 首页 > web >正文

Paimon 建表常用属性分析

Paimon 建表常用属性分析

大家好,我是FP,一名在读硕士研究生,最近在调研湖仓一体时,发现Paimon建表属性存在诸多难点,因此形成如下总结文字,以供学习交流,若有错误之处,欢迎批评指正。

Paimon常用建表属性指定

-- Flink 建表语句
CREATE TABLE `paimon-06-1`.`ods`.`demo_paimon` (id INT COMMENT '用户ID,唯一字段',name STRING COMMENT '用户姓名',age INT COMMENT '用户年龄',gender INT COMMENT '用户性别',dt STRING COMMENT '日期分区字段',PRIMARY KEY (id, dt) NOT ENFORCED -- 添加逗号分隔字段
) COMMENT '用户信息表'
PARTITIONED BY (dt)
WITH (-- 1. 桶相关优化'bucket' = '4', 'bucket-key' = 'id', -- 2. 合并压缩(Compaction)优化,合并原始的的ORC文件'compaction.min.file-num' = '5','compaction.max.file-num' = '20',-- 3. 日志系统与变更捕获 'log.changelog-mode' = 'auto','log.consistency' = 'transactional','changelog-producer' = 'input',-- 4. 合并引擎'merge-engine' = 'deduplicate',-- 5. 写入优化'write-mode' = 'change-log','write-buffer-size' = '256mb',-- 6. 读取优化'scan.mode' = 'latest',-- 7. 快照管理 - 防止元数据膨胀'snapshot.time-retained' = '24h','snapshot.num-retained.min' = '10','snapshot.num-retained.max' = '30',-- 8. 持续发现分区 (可选,根据需求)'continuous.discovery-interval' = '5s'
);

分区分桶

数据的管理和过滤方式,将数据按照分区、分桶组织成相应的目录,能够显著加速按分区键过滤的查询速度,也便于按分区进行过期数据清理。

同时,DataWorks里面有设置如果是分区表,则必须指定分区字段,否则将无法查询。


合并压缩优化

Compaction负责将桶内的小文件合并成大文件,可以有效防止小文件产生,优化查询性能并管理文件数量,节省存储空间。

-- 桶内文件大于5时才可能触发compaction
'compaction.min.file-num' = '5',
-- 桶内文件大于20时必须触发compaction
'compaction.max.file-num' = '20',

日志系统与变更捕获

changelog 变更日志,记录表数据变化(+I/-D/+U/-U)的日志流。

update_before:表示一条记录在更新操作之前的状态(旧值)

-- 为表指定日志变更模式
'log.changelog-mode' = 'full',
-- 指定日志一致性保证
'log.consistency' = 'transactional',
-- 是否重复写入变更日志文件/日志文件产生方式
'changelog-producer' = 'input',
日志变更模式

如何追加日志?

模式行为描述
Auto有主键的表自动设置为Upsert,无主键的表则为All
Upsert日志系统不存储update_before的更改,日志消费作业将自动添加规范化节点,根据状态生成所需的update_before(只会存储+U)
All保存包括update_before在内的所有修改信息(强制生成包含所有变更类型的完整changelog,会存储+U、-U)
日志一致性保证模式

怎么保证日志一致性?

模式行为描述
transactional保证原子提交和changelog顺序,提供exactly-once语义
eventual保证最终一致,延迟较低但可能会乱序/重复
增量数据产生机制

何时写入日志?

模式行为描述
none无变更日志文件(纯批处理场景,无需实时获增量消费数据)
input在刷新内存表时对变更日志文件进行两次写入(直接将Paimon表的输入流作为changelog原样写入Log System)
full-compaction在每次全量压缩时生成变更日志文件
lookup每次commit snapshot前产生完整的增量数据(查询时生成日志文件),时效性更高,但资源消耗更高

合并引擎/数据合并机制

定义如何合并相同主键的数据

-- 4. 合并引擎
'merge-engine' = 'deduplicate',
合并方式:
模式适用场景
deduplicate仅保留相同主键的最新记录(对于具有多条具有相同主键的数据,Paimon结果表仅会保留最新一条数据,并丢弃其它具有主键的数据),适用于主键更新模型
partial-update将相同主键的多行记录按字段合并,适用于宽表更新,部分更新非空字段
aggregation对相同主键的记录进行预聚合,适用于分组聚合的场景
first-row仅保留相同主键的第一条记录,适用于去重但不更新的场景

写入优化

如何写入数据文件

-- 写入方式
'write-mode' = 'change-log',
-- 单个写入器的内存缓存大小,超过此值将会执行Flush操作
'write-buffer-size' = '256mb',
写入方式:
模式行为描述
auto有主键的表为change-log,没有主键的表则为append-only
append-only直接追加,不会进行主键去重,不会执行数据重复删除或任何主键约束,所以效率会非常高
change-log进行主键去重,可以接受插入/删除/更新操作
其它常用参数:
-- 指定Flink Sink 作业的并行度,与桶数匹配或成倍数关系
'sink.parallelism' = '4'
-- 是否忽略delete操作
'write.ignore-delete' = 'false'
-- 是否溢写到磁盘
'write-buffer-spillable' = 'true'

CDAS语法

CDAS(create datastream as select),阿里云Flink引擎会将CDAS语句自动转换成一个一个的CTAS语法进行执行,从而实现整库同步,避免对每个表都进行单独插入、调度,产生大量的insert into操作,占用大量的数据库连接,对业务数据库和网络IO造成压力。

前置知识:CTAS(create table as select),传统方法,通过insert into的形式将MySQL数据导入到Paimon中,如果是多张表,那么就编写多个insert into语句。

-- CDAS语法
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]<target_database>:[catalog_name.]db_name<source_database>:[catalog_name.]db_name

实际示例:将MySQL某个库中的数据同步到kafka中

BEGIN STATEMENT SET;-- 同步A库。
CREATE DATABASE IF NOT EXISTS fp_kafka.kafka
WITH ('cdas.topic.pattern' = 'ods_{table-name}')
AS DATABASE mysqlcdc.`order` INCLUDING ALL TABLES
/*+ OPTIONS('scan.startup.mode' = 'initial') */ ;-- 同步B库。
CREATE DATABASE IF NOT EXISTS fp_kafka.kafka
WITH ('cdas.topic.pattern' = 'ods_{table-name}')
AS DATABASE mysqlcdc.driver INCLUDING ALL TABLES
/*+ OPTIONS('scan.startup.mode' = 'initial') */ ;END;

读取优化/消费位点

指定表的扫描方式/读取的起始位置

'scan.mode' = 'latest',
读取/扫描方式:
模式行为描述
latest流读时读取最新快照,符合大多数实时消费场景
latest-full流读时,首次启动读取当前全量快照,之后消费增量changelog,用于初始化状态或需要全量+增量的场景。批读时,只生成最新的快照,但不读取新的更改
from-snapshot/timestamp流读时,从指定快照ID或时间戳开始读取,用于时间旅行或回溯。批读时,读取该快照节点/时间节点之前的最新数据,但不读取新的更改
full-compaction流读时,最近一次压缩后生成快照时的最新数据,后面的数据将会增量读取。批读时,最近一次压缩后生成快照时的最新数据,后面的修改数据将不会被读取到
hint语法
-- 从快照 N 开始消费增量变更 (Changelog)
SELECT * FROM my_table /*+ OPTIONS('scan.mode'='latest-full') */;-- 查询快照 ID=123 时的数据
SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id'='123') */;-- 查询 1 小时前的数据 (基于时间戳)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis'='1698300000000') */;

快照管理

快照元数据的管理,Paimon依赖快照实现MVCC和时间旅行,对快照文件的自动过期处理,可以防止快照元数据膨胀

MVCC(Multi-Version Concurrency Control,多版本并发控制)

-- 保留至少最近多长时间内的快照
'snapshot.time-retained' = '24h',
-- 最少保留的快照数量,即时超过了过期期限,仍然要保留下来
'snapshot.num-retained.min' = '10',
-- 最多保留的快照数量
'snapshot.num-retained.max' = '30',

持续发现分区

流读时,自动检查新分区的时间间隔

'continuous.discovery-interval' = '5s'
http://www.xdnf.cn/news/10348.html

相关文章:

  • simulink mask的使用技巧
  • Windows下编译zlib
  • LangGraph 快速入门
  • Ubuntu设置之初始化
  • 利用Dify创建一个公司产品知识问答
  • DeepSeek部署实战:常见问题与高效解决方案全解析
  • 【Java基础05】面向对象01
  • leetcode动态规划—买卖股票系列
  • Python案例解析 : 函数模块化编程的实践应用
  • CTFHub-RCE 命令注入-过滤目录分隔符
  • 解决8080端口被占问题
  • python学习day34
  • 学习海康VisionMaster之表面缺陷滤波
  • Cesium快速入门到精通系列教程
  • 【KWDB 创作者计划】_探秘浪潮KWDB数据库:从时间索引到前沿技术
  • 用户认证的魔法配方:从模型设计到密码安全的奇幻之旅
  • ApiHug 1.3.9 支持 Spring 3.5.0 + Plugin 0.7.4 内置小插件升级!儿童节快乐!!!
  • vue-08(使用slot进行灵活的组件渲染)
  • Java Spring 之监听器(Listener)详解与实战
  • 如何查看电脑电池性能
  • 对蚁群算法的理解和实例详解
  • [笔记]一般小信号测量方法
  • 企业微信接入说明
  • proteus美观与偏好设置
  • Qq空间照片视频批量下载工具
  • TomSolver 库 | 入门及使用
  • docker安装和镜像源替换
  • Python训练营---Day41
  • GoogLeNet网络模型
  • 【求A类B类月】2022-2-9