当前位置: 首页 > news >正文

Paimon MergeTreeWrite、Compaction 和 快照构建

Paimon 写入与合并操作的冲突处理与快照构建机制分析

Paimon 通过​​分离写入(Append)和合并(Compaction)的元数据记录​​,结合​​分步原子提交​​和​​严格冲突检测​​,确保并发操作下数据一致性。即使写入与合并并发执行,也不会产生数据冲突。


写入与 Compaction 分开记录是否冲突?

​不会冲突​​。核心机制由 FileStoreCommitImpl实现,通过以下设计保证:

1. 分离变更类型

  • ​写入阶段​​(MergeTreeWriter):

    • 新写入数据(write())与后台 Compaction 产生的变更分别记录在不同集合:

      • 新数据 → newFiles

      • Compaction 变更 → compactBefore/compactAfter

  • ​提交阶段​​(prepareCommit):

    • 所有变更打包为 CommitMessage,但内部保持分类存储。

2. 分步提交与冲突检测流程

步骤

操作类型

处理逻辑

冲突检测重点

1

​APPEND(新写入)​

- 优先处理
- 解析 CommitMessage中的 APPEND变更

- 检查新增文件与当前快照中受影响分区文件(baseEntries)的冲突(如删除不存在的文件)

2

​生成 APPEND 快照​

- 创建新快照(如快照 N)
- 仅包含新写入数据文件

- 基于原子性文件操作(如 rename)提交快照

3

​COMPACT(合并)​

- 在 APPEND 快照基础上处理 COMPACT变更

- 验证合并前旧文件(compactBefore)必须存在
- 确保合并后新文件(compactAfter)无 LSM 结构冲突(如 Level > 0 的文件 key 范围重叠)

4

​生成 COMPACT 快照​

- 创建新快照(如快照 N+1)
- 记录旧文件删除与新文件添加

- 原子性提交

​关键机制​​:一次写入任务可能原子性生成两个连续快照(APPEND + COMPACT)。任一冲突检测失败将导致整体回滚,避免中间状态或元数据损坏。


快照构建详细流程

通过 FileStoreCommitImpl.javacommit方法展开分析:

1. 收集变更(collectChanges

CommitMessage中的文件变更分类至不同列表:

  • appendTableFiles​:新写入的数据文件

  • compactTableFiles​:Compaction 涉及的文件变更(删除旧文件 + 添加新文件)

  • 其他文件(如 changelog、索引文件等)

2. 提交 APPEND 快照(tryCommitwith CommitKind.APPEND

关键步骤:

  1. ​锁定基线​​:获取当前最新快照(latestSnapshot

  2. ​读取存量文件​​:提取 latestSnapshot中与本次写入相关分区的所有文件列表(baseEntries

  3. ​冲突检查​​(noConflictsOrFail):

    • 模拟将 appendTableFiles应用于 baseEntries

    • 检查目标:删除文件是否存在 + LSM 结构合法性(如 Level 1+ 文件的 key 范围不重叠)

    • 失败时抛出异常并终止提交

  4. ​生成新快照​​:

    • 调用 tryCommitOnce创建新 deltaManifestList 记录 appendTableFiles变更

    • 基于 latestSnapshotbaseManifestList和新 deltaManifestList 构建 APPEND类型快照

    • 通过文件系统原子操作(如 rename)持久化快照文件

3. 提交 COMPACT 快照(tryCommitwith CommitKind.COMPACT

关键步骤:

  1. ​更新基线​​:将上一步 appendTableFiles内容合并至 baseEntries,形成新基线

  2. ​冲突检查​​(noConflictsOrFail):

    • 在新基线上模拟应用 compactTableFiles变更

    • 重复文件存在性与 LSM 结构合法性检查

  3. ​生成新快照​​:

    • 调用 tryCommitOnce基于最新 APPEND 快照创建 COMPACT类型快照

    • 新 deltaManifestList 记录 compactTableFiles变更

    • 通过原子操作持久化快照文件

代码:

public void commit(ManifestCommittable committable, boolean checkAppendFiles) {LOG.info("Ready to commit to table {}, number of commit messages: {}",tableName,committable.fileCommittables().size());if (LOG.isDebugEnabled()) {LOG.debug("Ready to commit\n{}", committable);}long started = System.nanoTime();int generatedSnapshot = 0;int attempts = 0;Snapshot latestSnapshot = null;Long safeLatestSnapshotId = null;List<SimpleFileEntry> baseEntries = new ArrayList<>();List<ManifestEntry> appendTableFiles = new ArrayList<>();List<ManifestEntry> appendChangelog = new ArrayList<>();List<ManifestEntry> compactTableFiles = new ArrayList<>();List<ManifestEntry> compactChangelog = new ArrayList<>();List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();collectChanges(committable.fileCommittables(),appendTableFiles,appendChangelog,compactTableFiles,compactChangelog,appendHashIndexFiles,compactDvIndexFiles);try {List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);if (!ignoreEmptyCommit|| !appendTableFiles.isEmpty()|| !appendChangelog.isEmpty()|| !appendHashIndexFiles.isEmpty()) {// Optimization for common path.// Step 1:// Read manifest entries from changed partitions here and check for conflicts.// If there are no other jobs committing at the same time,// we can skip conflict checking in tryCommit method.// This optimization is mainly used to decrease the number of times we read from// files.latestSnapshot = snapshotManager.latestSnapshot();if (latestSnapshot != null && checkAppendFiles) {// it is possible that some partitions only have compact changes,// so we need to contain all changesbaseEntries.addAll(readAllEntriesFromChangedPartitions(latestSnapshot, appendTableFiles, compactTableFiles));noConflictsOrFail(latestSnapshot.commitUser(),baseEntries,appendSimpleEntries,Snapshot.CommitKind.APPEND);safeLatestSnapshotId = latestSnapshot.id();}attempts +=tryCommit(appendTableFiles,appendChangelog,appendHashIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets(),committable.properties(),Snapshot.CommitKind.APPEND,noConflictCheck(),null);generatedSnapshot += 1;}if (!compactTableFiles.isEmpty()|| !compactChangelog.isEmpty()|| !compactDvIndexFiles.isEmpty()) {// Optimization for common path.// Step 2:// Add appendChanges to the manifest entries read above and check for conflicts.// If there are no other jobs committing at the same time,// we can skip conflict checking in tryCommit method.// This optimization is mainly used to decrease the number of times we read from// files.if (safeLatestSnapshotId != null) {baseEntries.addAll(appendSimpleEntries);noConflictsOrFail(latestSnapshot.commitUser(),baseEntries,SimpleFileEntry.from(compactTableFiles),Snapshot.CommitKind.COMPACT);// assume this compact commit follows just after the append commit created abovesafeLatestSnapshotId += 1;}attempts +=tryCommit(compactTableFiles,compactChangelog,compactDvIndexFiles,committable.identifier(),committable.watermark(),committable.logOffsets(),committable.properties(),Snapshot.CommitKind.COMPACT,hasConflictChecked(safeLatestSnapshotId),null);generatedSnapshot += 1;}} finally {long commitDuration = (System.nanoTime() - started) / 1_000_000;LOG.info("Finished commit to table {}, duration {} ms", tableName, commitDuration);if (this.commitMetrics != null) {reportCommit(appendTableFiles,appendChangelog,compactTableFiles,compactChangelog,commitDuration,generatedSnapshot,attempts);}}}private int tryCommit(List<ManifestEntry> tableFiles,List<ManifestEntry> changelogFiles,List<IndexManifestEntry> indexFiles,long identifier,@Nullable Long watermark,Map<Integer, Long> logOffsets,Map<String, String> properties,Snapshot.CommitKind commitKind,ConflictCheck conflictCheck,@Nullable String statsFileName) {int retryCount = 0;RetryResult retryResult = null;long startMillis = System.currentTimeMillis();while (true) {Snapshot latestSnapshot = snapshotManager.latestSnapshot();CommitResult result =tryCommitOnce(retryResult,tableFiles,changelogFiles,indexFiles,identifier,watermark,logOffsets,properties,commitKind,latestSnapshot,conflictCheck,statsFileName);if (result.isSuccess()) {break;}retryResult = (RetryResult) result;if (System.currentTimeMillis() - startMillis > commitTimeout|| retryCount >= commitMaxRetries) {String message =String.format("Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",commitTimeout, retryCount);throw new RuntimeException(message, retryResult.exception);}commitRetryWait(retryCount);retryCount++;}return retryCount + 1;}

为什么 Compact 提交时不重新获取最新快照?

这主要是出于 ​​性能优化​​ 和 ​​单个提交单元的原子性​​ 考虑。

FileStoreCommitImpl.javacommit方法中,逻辑如下:

在提交开始时,代码通过以下方式获取一次最新的快照:

latestSnapshot = snapshotManager.latestSnapshot();

基于这个 latestSnapshot,它为 ​​APPEND​​ 类型的变更(新写入的文件)做了一次冲突检查。如果检查通过,它会记录下:

safeLatestSnapshotId = latestSnapshot.id();

这相当于一个 ​​乐观锁​​,其含义是:

“我已经基于快照 X 检查过 APPEND 了,如果在我提交时,最新快照仍然是 X,那就不用再检查了。”

执行 tryCommit,生成一个新的快照(我们称之为 X+1)。

接下来处理 ​​COMPACT​​ 变更时,​​它并没有去重新请求最新的快照(即 X+1)​​。

相反,它在 ​​内存中模拟了 APPEND 提交成功后的状态​​:

baseEntries.addAll(appendSimpleEntries);

然后,​​基于这个内存中的、模拟的、最新的状态,对 COMPACT 变更进行冲突检查​​。

这同样是 ​​乐观的​​,其假设是:​​APPEND 成功后,没有其他并发的提交插入进来​​。

这种设计避免了在处理 ​​APPEND​​ 和 ​​COMPACT​​ 之间,​​再次从文件系统读取和解析大量的 Manifest 文件​​。

因为 ​​读取 Manifest 是一个相对昂贵的操作​​。

通过在内存中模拟状态的演进,它可以在 ​​一次提交(ManifestCommittable)内部高效地完成对 APPEND 和 COMPACT 两部分变更的连续性校验​​。

tryCommit方法 最终会调用 tryCommitOnce。在 tryCommitOnce中有如下逻辑:

// ...existing code...        
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {// latestSnapshotId is different from the snapshot id we've checked for conflicts,// so we have to check again// ...existing code...noConflictsOrFail(latestSnapshot.commitUser(),baseDataFiles,SimpleFileEntry.from(deltaFiles),commitKind);
}
// ...existing code...

这里的 conflictCheck.shouldCheck(latestSnapshot.id())就是关键。

commit方法中,为 ​​COMPACT​​ 变更调用 tryCommit时,传入的是:

hasConflictChecked(safeLatestSnapshotId)

而此时,safeLatestSnapshotId已经被更新为 ​​APPEND 快照之后的值(即 X+1)​​。

tryCommitOnce为 ​​COMPACT​​ 执行时,它获取的 latestSnapshot确实是 ​​APPEND 提交后产生的那个新快照(X+1)​​。

此时:

  • latestSnapshot.id()的值为 ​​X+1​

  • safeLatestSnapshotId的值也是 ​​X+1​

因此 shouldCheck返回 ​​false​​,从而 ​​跳过了昂贵的冲突检查​​。

如果在 ​​APPEND 提交后​​,另一个 Job 抢先提交了一个快照(X+2),那么当我们的 ​​COMPACT 部分执行 tryCommitOnce​​ 时:

  • 它获取到的 latestSnapshot将是 ​​X+2​

  • latestSnapshot.id()(值为 X+2)​​不等于​safeLatestSnapshotId(值为 X+1)

  • 因此 shouldCheck返回 ​​true​

此时,代码会 ​​回退到慢速路径​​:

  • 重新从文件系统读取从快照 ​​X+1 到 X+2​​ 的所有变更,

  • 应用到 baseDataFiles上,

  • 然后基于这个 ​​真正最新的状态​​,重新进行一次 ​​完整的冲突检查​​。

如果检查失败,​​整个提交会重试​​。


对于只有 Compaction 的任务怎么处理?

这种情况非常常见,例如一个 ​​专门用于合并的 Flink 作业​​。

其处理流程会 ​​简化很多​​:

  1. commit方法中,经过 collectChanges之后,appendTableFiles列表是 ​​空的​​。

  2. 第一个判断:

    if (!ignoreEmptyCommit ...)

    其条件不满足,因此 ​​跳过 APPEND 提交​​。

  3. 直接进入:

    if (!compactTableFiles.isEmpty() ...)
  4. 此时,safeLatestSnapshotId是 ​​null​​,所以 ​​乐观检查的优化不会被触发​​。

  5. 代码直接调用 tryCommit来提交 ​​COMPACT 变更​​。

  6. tryCommitOnce内部,它会 ​​获取最新的快照​​,并 ​​基于它进行完整的冲突检查​​。

所以,对于 ​​只有 Compaction 的任务​​,它会 ​​直接走标准的 “获取最新快照 -> 检查冲突 -> 提交” 流程​​,​​逻辑非常清晰​​。

noConflictsOrFail 方法

FileStoreCommitImpl中 用于检测在提交操作中是否存在冲突,以确保数据一致性。它主要处理以下几种冲突情况:

  1. 总桶数变化冲突

    • 当提交类型不是 OVERWRITE 时,会检查所有分区的总桶数是否保持一致。如果某个分区的总桶数发生变化(且没有使用覆盖写入),则会抛出冲突异常。
    • 这种检查对于 compaction 和直接写入都适用,确保在没有明确覆盖意图的情况下,分区的桶数结构保持稳定。
  2. 文件删除冲突

    • 通过 assertNoDelete 方法检查,确保不会尝试删除之前未添加的文件。
    • 如果检测到尝试删除不存在的文件,会抛出异常。
    • 这种检查可以防止意外删除数据文件,对于 compaction(可能涉及文件合并和删除)和直接写入(可能涉及文件更新)都很重要。
  3. LSM 冲突检测

    • 对于 LSM 表(level >= 1),会检查同一分区、桶和级别的文件之间是否存在键范围重叠。
    • 如果发现键范围重叠,说明存在冲突,会抛出异常。
    • 这种检查主要针对 compaction 操作,确保在合并文件时不会产生键范围冲突。
  4. 分区过期冲突

    • 在 assertConflictForPartitionExpire 方法中,如果启用了分区过期功能且是基于值的过期策略,会检查是否尝试写入已过期的分区。
    • 如果所有被删除的分区都已过期,则会抛出异常,提示用户过滤掉这些数据。

对于 compaction 和直接写入操作,冲突的定义主要包括:

  • 在没有覆盖意图的情况下更改分区结构(如桶数)。
  • 尝试删除不存在的文件。
  • 在 LSM 表中合并文件时产生键范围重叠。
  • 向已过期的分区写入数据。Paimon 允许用户配置分区过期策略,基于分区值的时间戳或分区最后更新时间来判断分区是否过期。过期的分区会被逻辑删除,最新的快照无法查询其数据。

这些冲突检测机制确保了在并发写入或 compaction 过程中数据的一致性和完整性。


总结:Paimon 的一致性保障设计

  1. ​变更类型解耦​

    写入与合并操作通过分离记录(newFilesvs compactBefore/After)实现逻辑隔离。

  2. ​分阶段严格校验​

    • 先提交 APPEND 并生成独立快照,再基于其结果提交 COMPACT

    • 每阶段均执行文件存在性及 LSM 结构冲突检测

  3. ​原子性快照生成​

    • 每个快照仅记录增量变更(delta),依赖文件系统原子操作(如 rename)确保“全有或全无”

    • 避免中间状态,保障系统健壮性

​最终效果​​:并发写入与合并操作通过分步提交与严格校验实现无冲突数据一致性,元数据始终处于有效状态。

MergeTreeWriter

MergeTreeWriter 像一个指挥官,它协调多个组件来完成工作。

每个分区的每个桶 有 且 只有 一个 MergeTreeWriter

构造中通过 newSequenceNumber = maxSequenceNumber + 1; 尽可能维护统一的序列号

public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {// ... 内存缓冲与溢写相关配置 ...private final boolean writeBufferSpillable;private final MemorySize maxDiskSize;private final int sortMaxFan;private final CompressOptions sortCompression;private final IOManager ioManager;// ... 核心功能组件 ...private final CompactManager compactManager;private final MergeFunction<KeyValue> mergeFunction;private final KeyValueFileWriterFactory writerFactory;private final ChangelogProducer changelogProducer;// ... 状态与结果追踪 ...private final LinkedHashSet<DataFileMeta> newFiles;private final LinkedHashSet<DataFileMeta> deletedFiles;private final LinkedHashSet<DataFileMeta> newFilesChangelog;private final LinkedHashMap<String, DataFileMeta> compactBefore;private final LinkedHashSet<DataFileMeta> compactAfter;private final LinkedHashSet<DataFileMeta> compactChangelog;// ... 内存缓冲区实例 ...private WriteBuffer writeBuffer;// ...
}
  • writeBuffer (SortBufferWriteBuffer): 这是它的内存组件。所有新数据首先进入这里进行排序和预合并。setMemoryPool 方法会为其注入内存池。
  • compactManager: 这是它的合并任务管理器。它负责维护该 Bucket 内所有数据文件的层级结构(Levels),并根据策略决定何时、对哪些文件发起 Compaction。
  • mergeFunction: 定义了数据合并的逻辑。例如,对于主键表,它可能是“保留最新的值”(Deduplicate);对于聚合表,它可能是“对值进行累加”。
  • writerFactory (KeyValueFileWriterFactory): 文件写入工厂,负责创建用于写入数据文件(SST)和 Changelog 文件的 RollingFileWriter
  • changelogProducer: 配置项,决定了 Changelog 的生成策略(不生成、从输入生成等)。
  • 各种 Set 和 Map: 这些集合用于追踪文件状态
    • newFiles: 记录从内存 writeBuffer 刷盘后生成的新文件。
    • compactBefore / compactAfter: 记录一次 Compaction 操作中,被合并的旧文件和生成的新文件。
    • *Changelog: 记录相关的 Changelog 文件。
    • 这些集合中的内容最终会在 prepareCommit 时被打包成 CommitIncrement

写入流程 (write 方法)

// ... existing code ...@Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();// 1. 尝试将数据写入内存缓冲区boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {// 2. 如果内存缓冲区满了,先执行刷盘flushWriteBuffer(false, false);// 3. 再次尝试写入success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}private long newSequenceNumber() {return newSequenceNumber++;}
// ... existing code ...
  1. 为每条记录分配一个递增的 sequenceNumber,用于保证时序。
  2. 调用 writeBuffer.put() 尝试将数据写入内存 和 本地磁盘。
  3. 如果 put() 返回 false,说明内存缓冲区已满,无法容纳新数据。
  4. 此时,会触发 flushWriteBuffer(),将buffer数据刷到磁盘。
  5. 刷盘后,再次尝试写入。如果还是失败,说明单条记录的大小超过了整个内存缓冲区,抛出异常。

刷盘流程 (flushWriteBuffer 方法)

这是将内存数据持久化的核心。

// ... existing code ...private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)throws Exception {if (writeBuffer.size() > 0) {// ...// 1. 创建数据文件和Changelog文件的写入器final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter = ...;final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);try {// 2. 遍历内存缓冲区中的有序数据,应用合并逻辑,并同时写入数据文件和Changelog文件writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);} finally {// 3. 清空内存缓冲区,关闭写入器writeBuffer.clear();// ... close writers ...}// 4. 收集新生成的文件元数据List<DataFileMeta> dataMetas = dataWriter.result();// ... handle changelog files ...// 5. 将新文件信息添加到 newFiles 集合,并通知 compactManagerfor (DataFileMeta dataMeta : dataMetas) {newFiles.add(dataMeta);compactManager.addNewFile(dataMeta);}}// 6. 检查并触发新的CompactiontrySyncLatestCompaction(waitForLatestCompaction);compactManager.triggerCompaction(forcedFullCompaction);}
// ... existing code ...
  1. 创建 RollingFileWriter,用于写入 Level-0 的数据文件,并根据配置决定是否创建 Changelog 文件写入器。
  2. 调用 writeBuffer.forEach(),它会提供一个内存中排序好的数据迭代器。MergeTreeWriter 在遍历时,会应用 mergeFunction 对数据进行最终合并,然后分别写入数据文件和 Changelog 文件。
  3. 操作完成后,清空内存缓冲区 writeBuffer,使其可以接收新的数据。
  4. 从写入器中获取新生成文件的元数据 DataFileMeta
  5. 将这些元数据记录在 newFiles 集合中,并通知 compactManager 有新的 Level-0 文件加入。
  6. compactManager 会根据当前文件层级状态,决定是否需要触发一次新的 Compaction。

在 flushWriteBuffer 中创建的这两个 writerdataWriter 和 changelogWriter,分别服务于两个完全不同的目的:一个用于写入表的状态数据,另一个用于生成流式读取的变更日志(Changelog)

dataWriter: 状态数据的写入器

  • 用途dataWriter 的职责是创建正式的 Level-0 数据文件(SST 文件)。这些文件是表的核心组成部分,包含了经过排序和合并后的数据,代表了表在某个时间点的最新状态。
  • 创建方式:
    // ... existing code ...final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
    // ... existing code ...
    
    它调用的是 createRollingMergeTreeFileWriter。这个方法会生成带有 data- 前缀的文件名,例如 data-a9f8b8e8-....parquet。这些文件最终会被 CompactManager 管理,并参与后续的合并(Compaction)过程。

changelogWriter: 变更日志的写入器

  • 用途changelogWriter 的职责是创建变更日志文件。这些文件记录了本次提交中每一条原始的变更记录(比如 INSERTUPDATE_BEFOREUPDATE_AFTERDELETE)。这些文件专门用于支持下游的流式查询任务,让 Flink 等引擎可以像消费 Kafka 一样消费 Paimon 表的增量变化。
  • 创建方式:
    // ... existing code ...final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =(changelogProducer == ChangelogProducer.INPUT && !isInsertOnly)? writerFactory.createRollingChangelogFileWriter(0): null;
    // ... existing code ...
    
    它调用的是 createRollingChangelogFileWriter。这个方法会生成带有 changelog- 前缀的文件名,例如 changelog-b7c6a5e4-....parquet。这些文件不会参与数据合并,它们只是作为本次提交产物的一部分,供流作业消费。

MergeTreeWriter 通过 KeyValue 中的 RowKind 来携带增、删、改的语义。在写数据时,它将这些带有语义的记录先放入缓冲区,然后在刷写时,将原始记录流写入 changelog 文件,将合并后的结果写入数据文件。这样既保证了数据文件的紧凑和高效查询,又通过 changelog 文件提供了完整的变更历史。

SortBufferWriteBuffer 的 forEach 接受 keyComparator

public void forEach(Comparator<InternalRow> keyComparator,MergeFunction<KeyValue> mergeFunction,@Nullable KvConsumer rawConsumer,KvConsumer mergedConsumer)throws IOException {// TODO do not use iteratorMergeIterator mergeIterator =new MergeIterator(rawConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);while (mergeIterator.hasNext()) {mergedConsumer.accept(mergeIterator.next());}}

readOnce()时直接写入了 rawConsumer:

        private boolean readOnce() throws IOException {try {currentRow = kvIter.next(currentRow);} catch (IOException e) {throw new RuntimeException(e);}if (currentRow != null) {current.fromRow(currentRow);if (rawConsumer != null) {rawConsumer.accept(current.getReusedKv());}}return currentRow != null;}

为什么一次写入需要两种 Writer?

这是由 Paimon 的 changelog-producer(变更日志生成策略)配置决定的。

当设置 changelog-producer = 'input' 时,你告诉 Paimon:“我需要你把每次写入的原始输入数据作为变更日志给我”。

MergeTreeWriter 在执行 flushWriteBuffer 时,writeBuffer.forEach(...) 会遍历内存中所有待写入的原始数据。为了同时满足“更新表状态”和“生成变更日志”这两个需求,Paimon 采用了最高效的方式:一次遍历,两次写入

// ... existing code ...try {writeBuffer.forEach(keyComparator,mergeFunction,// 如果 changelogWriter 存在,就把数据喂给它changelogWriter == null ? null : changelogWriter::write,// 同时,也把数据喂给 dataWriterdataWriter::write);} finally {
// ... existing code ...

数据流从 writeBuffer 中出来后,就像一个分叉的水管,被同时送往了 dataWriter 和 changelogWriter。这样就避免了对数据进行两次排序或两次从磁盘读取,用一次计算同时产出了两种不同用途的文件。

代码中还有一个针对 insert-only 场景的优化。如果 changelog-producer 是 input 且写入模式是 insert-only,那么 changelogWriter 就不会被创建。

// ... existing code ...List<DataFileMeta> dataMetas = dataWriter.result();if (changelogWriter != null) {newFilesChangelog.addAll(changelogWriter.result());} else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) {// 对于 insert-only,数据文件本身就是 changelog// 所以直接复制数据文件来作为 changelog 文件,避免了重复写入List<DataFileMeta> changelogMetas = new ArrayList<>();for (DataFileMeta dataMeta : dataMetas) {String newFileName = writerFactory.newChangelogFileName(0);DataFileMeta changelogMeta = dataMeta.rename(newFileName);writerFactory.copyFile(dataMeta, changelogMeta);changelogMetas.add(changelogMeta);}newFilesChangelog.addAll(changelogMetas);}
// ... existing code ...

在这种情况下,数据文件里的记录全是 INSERT,它本身就是一份完美的变更日志。因此 Paimon 只需写入一次数据文件,然后通过成本极低的元数据操作(rename)和文件复制(copyFile)来生成 changelog 文件,从而避免了双份的写入开销。

总结

  • dataWriter -> 写状态 -> 用于批查询和后续合并。
  • changelogWriter -> 写变更 -> 用于流式查询。

这两种 writer 的并存,是 Paimon 实现 Streaming & Batch Unification(流批一体)架构的关键设计,它使得同一份数据在写入时就能同时产出满足两种不同查询模式的物理文件。

提交准备流程 (prepareCommit 方法)

这是在 Flink Checkpoint 时被调用的关键方法。

// ... existing code ...@Overridepublic CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {// 1. 确保内存中的数据全部刷盘flushWriteBuffer(waitCompaction, false);// ...// 2. 同步等待可能正在进行的Compaction任务完成trySyncLatestCompaction(waitCompaction);// 3. 打包并返回所有文件变更return drainIncrement();}
// ... existing code ...
  1. 首先调用 flushWriteBuffer(),确保内存中所有数据都被持久化。
  2. 调用 trySyncLatestCompaction(),等待当前正在进行的 Compaction 任务结束,并将其结果(哪些文件被合并,生成了哪些新文件)更新到 compactBefore 和 compactAfter 集合中。
  3. 调用 drainIncrement(),将 newFilescompactBeforecompactAfter 等集合中的文件元数据打包成一个 CommitIncrement 对象返回。同时清空这些集合,为下一个 Checkpoint 做准备。

这个 CommitIncrement 对象最终会被上层的 Committer 用来生成 Manifest 文件和 Snapshot。

产生的元数据被怎么使用

newFiles.add(dataMeta) 和 compactManager.addNewFile(dataMeta) 这两行代码是新生成的数据文件(dataMeta)生命周期的起点,它们分别将 dataMeta 送上了两条不同的但最终会汇合的处理路径:提交路径合并路径

newFiles 是一个 LinkedHashSet,它的作用是暂存当前批次写入产生的所有新数据文件

可以把它理解成一个“待提交”清单。当一个事务(commit)准备完成时,MergeTreeWriter 需要告诉 Paimon 的 Table/Snapshot 管理器,这个事务到底产生了哪些新的变化。

这个过程发生在 prepareCommit 方法中,它会调用 drainIncrement 方法:

// ... existing code ...private CommitIncrement drainIncrement() {DataIncrement dataIncrement =new DataIncrement(new ArrayList<>(newFiles), // newFiles 在这里被使用new ArrayList<>(deletedFiles),new ArrayList<>(newFilesChangelog));CompactIncrement compactIncrement =new CompactIncrement(new ArrayList<>(compactBefore.values()),new ArrayList<>(compactAfter),new ArrayList<>(compactChangelog));// ... existing code ...newFiles.clear(); // 清空 newFiles,为下一批次做准备deletedFiles.clear();// ... existing code ...return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile);}
// ... existing code ...

从 drainIncrement 方法可以看出:

  1. newFiles 集合中的所有 DataFileMeta 被打包进一个 DataIncrement 对象。
  2. 这个 DataIncrement 最终成为 CommitIncrement 的一部分,被 MergeTreeWriter 返回。
  3. CommitIncrement 是一个非常重要的数据结构,它完整地描述了一次提交的所有文件变化(新增文件、合并前后文件等)。Paimon 的提交进程会把这个 CommitIncrement 的内容记录到 Manifest 文件中,从而生成一个新的快照(Snapshot)。
  4. 一旦 dataMeta 被打包并清空,它就完成了在 newFiles 中的使命。

小结:newFiles 的职责是确保新文件能够被正确地记录到 Paimon 的快照中,使其对查询可见。

compactManager.addNewFile(dataMeta):进入“合并”路径

compactManager 是 Merge-Tree 的核心组件之一,负责管理 LSM 树的结构和触发 Compaction(合并)操作。

当 compactManager.addNewFile(dataMeta) 被调用时,意味着:

  1. compactManager 得知一个新的 L0 层文件(dataMeta)诞生了。
  2. compactManager 会根据其内部的合并策略(例如,当 L0 层的文数量达到某个阈值时)来决定是否要触发一次 Compaction。
  3. 如果触发了 Compaction,compactManager 会挑选一些文件(可能就包括我们刚加入的这个 dataMeta)作为输入,启动一个后台任务去执行合并。

合并完成后,compactManager 会产生一个 CompactResult,这个结果包含了合并前的文件(before)和合并后的新文件(after)。MergeTreeWriter 会通过 trySyncLatestCompaction 方法获取这个结果,并调用 updateCompactResult 来处理它:

// ... existing code ...private void updateCompactResult(CompactResult result) {// ... existing code ...for (DataFileMeta file : result.before()) {// ...// 将被合并掉的老文件加入 compactBefore 集合compactBefore.put(file.fileName(), file);// ...}// 将合并后产生的新文件加入 compactAfter 集合compactAfter.addAll(result.after());compactChangelog.addAll(result.changelog());updateCompactDeletionFile(result.deletionFile());}
// ... existing code ...

可以看到,dataMeta 在被合并后,它的身份就从一个“活跃的数据文件”变成了“待废弃的老文件”,并被记录在 compactBefore 集合中。同时,合并产生的新文件被记录在 compactAfter 集合里。

最终,compactBefore 和 compactAfter 这两个集合里的信息也会在 drainIncrement 方法中被打包进 CommitIncrement,从而在下一次提交时,通知 Paimon 更新快照:标记 compactBefore 的文件为“已删除”,并添加 compactAfter 的文件为“新文件”。

小结:compactManager 的职责是利用 dataMeta 来维护 LSM 树的健康,通过合并来减少文件数量、消除冗余数据,并把文件从低层级(Level 0)推向高层级。

DataFileMeta 在被创建后,通过 newFiles.add 和 compactManager.addNewFile 被赋予了双重身份:

  1. 作为新数据:它通过 newFiles -> DataIncrement -> CommitIncrement 的路径,被记录到新的快照中,从而对用户可见。
  2. 作为合并候选者:它通过 compactManager 进入 LSM 树的管理体系,未来可能会被选中并合并成更大的文件。当它被合并后,它的状态变化(从被合并 -> 产生新文件)会通过 CompactResult -> compactBefore/compactAfter -> CompactIncrement -> CommitIncrement 的路径,再次被记录到新的快照中。

updateCompactResult 

trySyncLatestCompaction会获取compaction结果,调用这个函数:

private void trySyncLatestCompaction(boolean blocking) throws Exception {Optional<CompactResult> result = compactManager.getCompactionResult(blocking);result.ifPresent(this::updateCompactResult);}

updateCompactResult 方法是 MergeTreeWriter 类中用于处理压缩结果的核心方法,它在每次压缩操作完成后被调用,用于更新文件状态和清理不再需要的文件。

    private void updateCompactResult(CompactResult result) {Set<String> afterFiles =result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());for (DataFileMeta file : result.before()) {if (compactAfter.remove(file)) {// This is an intermediate file (not a new data file), which is no longer needed// after compaction and can be deleted directly, but upgrade file is required by// previous snapshot and following snapshot, so we should ensure:// 1. This file is not the output of upgraded.// 2. This file is not the input of upgraded.if (!compactBefore.containsKey(file.fileName())&& !afterFiles.contains(file.fileName())) {writerFactory.deleteFile(file);}} else {compactBefore.put(file.fileName(), file);}}compactAfter.addAll(result.after());compactChangelog.addAll(result.changelog());updateCompactDeletionFile(result.deletionFile());}

删除逻辑是为了在合并树(MergeTree)的压缩(compaction)过程中,正确处理中间文件。当 compactAfter.remove(file) 返回 true 时,表示 file 是这次被压缩的文件(因此先移出之前的compactAfter)。但是,为了确保数据的一致性和可恢复性,需要进行额外的检查:

  • !compactBefore.containsKey(file.fileName()):检查该文件是否不是压缩操作的输入文件。如果是输入文件,则不应直接删除,因为可能需要保留其历史版本或用于回滚。
  • !afterFiles.contains(file.fileName()):检查该文件是否不是压缩操作的输出文件。如果是输出文件,则不应删除,因为它是新生成的数据文件。

只有当一个文件既不是压缩的输入文件,也不是压缩的输出文件,并且在压缩后不再需要时,writerFactory.deleteFile(file) 才会执行,将其安全删除。这确保了只有真正的中间文件才会被删除,而不会影响到历史快照或未来的数据恢复。

启动独立Compaction任务 能不能取消写入任务的Compaction

可以通过设置 write-only 配置项为 true 来取消写入任务的 compaction。这将跳过 compaction 和快照过期操作。这个选项通常与专用的 compaction 作业一起使用。

可以在创建表或提交作业时设置该参数,例如:

tableConf.set(CoreOptions.WRITE_ONLY, true);

或者在 SQL 中设置:

SET 'write-only' = 'true';

相关的代码在 CoreOptions.java 中定义:

public static final ConfigOption<Boolean> WRITE_ONLY =key("write-only").booleanType().defaultValue(false).withFallbackKeys("write.compaction-skip").withDescription("If set to true, compactions and snapshot expiration will be skipped. "+ "This option is used along with dedicated compact jobs.");

并且在 BucketedAppendFileStoreWrite.java 中使用:

if (options.writeOnly()) {return new NoopCompactManager();
} else {// ... 
}

KeyValueFileStoreWrite 也同样进行了配置

private CompactManager createCompactManager(BinaryRow partition,int bucket,CompactStrategy compactStrategy,ExecutorService compactExecutor,Levels levels,@Nullable DeletionVectorsMaintainer dvMaintainer) {if (options.writeOnly()) {return new NoopCompactManager();} else {Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();CompactRewriter rewriter =createRewriter(partition,bucket,keyComparator,userDefinedSeqComparator,levels,dvMaintainer);return new MergeTreeCompactManager(compactExecutor,levels,compactStrategy,keyComparator,options.compactionFileSize(true),options.numSortedRunStopTrigger(),rewriter,compactionMetrics == null? null: compactionMetrics.createReporter(partition, bucket),dvMaintainer,options.prepareCommitWaitCompaction(),options.needLookup(),recordLevelExpire,options.forceRewriteAllFiles());}}

NoopCompactManager 是一个不执行任何 compaction 操作的管理器。

http://www.xdnf.cn/news/1424089.html

相关文章:

  • 甲烷浓度时空演变趋势分析与异常值计算(附下载脚本)
  • 基于docker-compose搭建EFK(Elasticsearch+fluentd+kibana)的日志平台
  • 2025年工作后值得考的财会行业证书推荐,尤其是第二个!
  • 从网络层接入控制过渡到应用层身份认证的过程
  • 如何在SptingBoot项目中引入swagger生成API文档
  • HarvardX TinyML小笔记2(番外3:数据工程)
  • 技术速递|构建你的第一个 MCP 服务器:如何使用自定义功能扩展 AI 工具
  • Linux之Shell编程(四)函数、数组、正则
  • PostgreSQL备份指南:逻辑与物理备份详解
  • EPLAN如何添加接触器辅助触头 | 解决触点不足问题详解4----使用部件组
  • 三、Gitee平台使用指南
  • 在Lumerical FDTD中,磁偶极子通常用于激发TE模式,而电偶极子用于激发TM模式(文心一言)
  • chrome好用的浏览器插件
  • 51.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--新增功能--登录注册扩展
  • UE角色取消被Decal影响
  • 在 PHP 应用中处理限流和 API 节流:扩展、防滥用的最佳实践
  • 【数据可视化-102】苏州大学招生计划全解析:数据可视化的五大维度
  • 预告:AI赋能IT服务管理实践 |2025 “数字化时代的IT服务管理“Meetup-深圳站(9月20日)
  • [吾爱出品] PDF文件加密解密工作,附带源码。
  • GitHub CLI (gh) 全面指南:终端中的 GitHub 工作流革命
  • ServBay 是一款集成式、图形化的本地 Web 开发环境工具,专为 macOS 和 Windows 系统设计
  • 什么是最大熵强化学习?
  • Linux笔记---计算机网络概述
  • Python上下文管理器与资源管理
  • WEEX:从某DEX代币暴涨看加密交易选择
  • 【Linux】模拟实现Shell(下)
  • 快递地址归类排序实现(Java Python)
  • 查看服务器设备是否为物理机
  • Linux内核进程管理子系统有什么第三十九回 —— 进程主结构详解(35)
  • 算法练习——169.多数元素