paimon中批和流查看过去的快照的数据及变动的数据
1、批处理
创建表并插入三条数据
CREATE TABLE ws_t (id INT,ts BIGINT,vc INT,PRIMARY KEY (id) NOT ENFORCED
);
INSERT INTO ws_t VALUES(2,2,2),(3,3,3),(4,4,4),(5,5,5);
--设置执行模式为批处理
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
+----+----+----+
| id | ts | vc |
+----+----+----+
| 2 | 2 | 2 |
| 3 | 3 | 3 |
| 4 | 4 | 4 |
| 5 | 5 | 5 |
+----+----+----+
4 rows in set
变动两次数据让数据产生新的快照
第一次删除一条数据
delte from ws_t where id=2;
+----+----+----+
| id | ts | vc |
+----+----+----+
| 3 | 3 | 3 |
| 4 | 4 | 4 |
| 5 | 5 | 5 |
+----+----+----+
3 rows in set
----------------------------------------
第二次变更一次数据
update ws_t set ts=6 where id=3;
+----+----+----+
| id | ts | vc |
+----+----+----+
| 3 | 6 | 3 |
| 4 | 4 | 4 |
| 5 | 5 | 5 |
+----+----+----+
3 rows in set
现在在hdfs上有三个快照
1.1、通过指定快照id来查询数据
SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
1.2、读取指定时间戳的快照
SELECT * FROM ws_t$snapshots;
+-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
| snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | total_record_count | delta_record_count | changelog_record_count | watermark |
+-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
| 1 | 0 | fdf3e8ff-d8e4-47a6-a9b2-759... | 9223372036854775807 | APPEND | 2025-05-13 17:22:18.457 | 4 | 4 | 0 | -9223372036854775808 |
| 2 | 0 | a631b852-19f4-463b-aee8-67a... | 9223372036854775807 | APPEND | 2025-05-13 17:30:21.173 | 5 | 1 | 0 | -9223372036854775808 |
| 3 | 0 | 9940bb36-7e55-40b7-90cf-cc1... | 9223372036854775807 | APPEND | 2025-05-13 17:51:10.679 | 6 | 1 | 0 | -9223372036854775808 |
+-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
需要把时间转换为数字,也可以直接去hdfs页面查询
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
1.3、增量查询(查询几个快照之间的变化)
只能查询出修改的
SELECT * FROM ws_t /*+ OPTIONS('incremental-between' = '1,3') */;
+----+----+----+
| id | ts | vc |
+----+----+----+
| 3 | 6 | 3 |
+----+----+----+
1 row in setaudit_log 系统表能查询出不同快照之间删除的数据
SELECT * FROM ws_t$audit_log /*+ OPTIONS('incremental-between' = '1,2') */;
+---------+----+----+----+
| rowkind | id | ts | vc |
+---------+----+----+----+
| -D | 2 | 2 | 2 |
+---------+----+----+----+
1 row in set
2、流式查询
先把环境修改成流式
SET 'execution.checkpointing.interval'='30s';
SET 'execution.runtime-mode' = 'streaming';
op值对应的含义
+I:插入操作。-U:使用更新行的先前内容进行更新操作。+U:使用更新行的新内容进行更新操作。-D:删除操作。
2.1从指定快照id开始读取变更数据
SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+----+-------------+----------------------+-------------+
| op | id | ts | vc |
+----+-------------+----------------------+-------------+
| +I | 2 | 2 | 2 |
| +I | 3 | 3 | 3 |
| +I | 4 | 4 | 4 |
| +I | 5 | 5 | 5 || -D | 2 | 2 | 2 |
| -U | 3 | 3 | 3 |
| +U | 3 | 6 | 3 |
2.2、从指定时间戳开始读取
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
2.3、第一次启动时读取指定快照数据,并继续读取变化
SELECT * FROM ws_t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */;
+----+-------------+----------------------+-------------+
| op | id | ts | vc |
+----+-------------+----------------------+-------------+
| +I | 3 | 3 | 3 |
| +I | 4 | 4 | 4 |
| +I | 5 | 5 | 5 |
| -U | 3 | 3 | 3 |
| +U | 3 | 6 | 3 |