当前位置: 首页 > backend >正文

深入解析Paimon的RowKind数据变更机制 和 KeyValue存储

RowKind

RowKind 是 Paimon 中一个基础但极其重要的枚举。如果说一条数据记录(Row)是“名词”,那么 RowKind 就是描述这条记录的“动词”,它定义了这条记录所代表的变更类型。这对于实现数据湖的增量更新、流式消费以及与外部计算引擎(如 Flink)的交互至关重要。

RowKind.java 文件中定义了四种变更类型,它们共同构成了对数据变更的完整描述。

// ... existing code ...
@Public
public enum RowKind {// Note: Enums have no stable hash code across different JVMs, use toByteValue() for// this purpose./** Insertion operation. */INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */DELETE("-D", (byte) 3);
// ... existing code ...
  • INSERT ("+I")插入。代表这是一条全新的数据。
  • DELETE ("-D")删除。代表这条记录对应的主键被删除了。在 Paimon 内部,这通常会生成一条“墓碑”记录。
  • UPDATE_BEFORE ("-U")更新前。这代表一个更新操作发生前,记录的“旧值”(前镜像)。它的作用是撤回 (Retract) 旧的状态。
  • UPDATE_AFTER ("+U")更新后。这代表一个更新操作发生后,记录的“新值”(后镜像)。它的作用是追加 (Accrete) 新的状态。

UPDATE_BEFORE 和 UPDATE_AFTER 总是成对出现,用于精确地描述一次更新操作,这对于流式计算中的有状态算子(如聚合、关联)至关重要。

RowKind 的设计兼顾了可读性和序列化性能。

  • 构造函数与字段:

    • private final String shortString;: 每个枚举值都有一个简短的字符串表示(如 "+I"),方便人类阅读和调试。
    • private final byte value;: 每个枚举值都有一个唯一的 byte 值(0 到 3)。这个字节值是其主要的序列化形式,因为它比字符串更紧凑,并且在不同的 JVM 实例之间保持稳定,避免了使用 Java 枚举默认 hashCode 或 ordinal() 可能带来的问题。
  • 核心工具方法:

    • isRetract() 和 isAdd():

      /** Is {@link #UPDATE_BEFORE} or {@link #DELETE}. */
      public boolean isRetract() {return this == RowKind.UPDATE_BEFORE || this == RowKind.DELETE;
      }/** Is {@link #INSERT} or {@link #UPDATE_AFTER}. */
      public boolean isAdd() {return this == RowKind.INSERT || this == RowKind.UPDATE_AFTER;
      }
      

      这两个方法非常有价值,它们将四种 RowKind 归纳为两种基本操作:撤回追加。所有带负号的(-U-D)都是撤回操作,所有带正号的(+I+U)都是追加操作。这简化了下游算子的处理逻辑。

    • fromByteValue(byte value) 和 fromShortString(String value): 这两个静态工厂方法分别用于从 byte 值和短字符串反序列化出 RowKind 对象,是 toByteValue() 和 shortString() 的逆操作,构成了完整的序列化/反序列化闭环。

在 Paimon 生态中的应用

RowKind 的设计并非孤立的,它在整个 Paimon 生态中扮演着“通用语言”的角色。

  1. 与计算引擎无缝对接: Paimon 的 RowKind 设计与 Flink 的 org.apache.flink.types.RowKind 几乎完全一致。从 FlinkRowData.toFlinkRowKind 方法可以看出,它们之间可以进行一一映射。这使得 Paimon 可以作为 Flink 的原生流式 Sink 和 Source,无损地传输和理解 Changelog 数据流。

  2. 支持灵活的数据源: 在实际业务中,Changelog 的格式多种多样。Paimon 允许用户通过 rowkind.field 表属性,从源数据的一个字段中直接提取 RowKind 信息。

    • 如文档 sequence-rowkind.md 所述,用户可以在建表时指定 'rowkind.field' = 'your_op_column'
    • RowKindGenerator.java 类就是这个功能的具体实现。它会读取指定列的字符串值(如 "+I""-D"),并使用 RowKind.fromShortString() 将其转换为 RowKind 枚举。
  3. 驱动内部写入和合并逻辑: 在 Paimon 内部,RowKind 决定了数据的最终去向。

    • 在 MergeIntoPaimonTable.scala 的 MERGE INTO 命令实现中,可以看到代码根据 ROW_KIND_COL 的值来分别处理数据:DELETE 或 UPDATE_AFTER 的行可能会被用于更新删除向量(Deletion Vector),而 INSERT 或 UPDATE_AFTER 的行会被写入新的数据文件。
    • 在 KeyValue 对象中,valueKind 字段存储的就是 RowKind,它在 Compaction 过程中指导 MergeFunction 如何合并数据:是保留新值、还是将一个主键标记为删除。

更新操作的两种模型

在数据处理中,“更新(Update)”这个操作可以被理解为两种不同的模型:

  • 模型A:幂等更新 (Idempotent Update)

    • 定义:对于一个有主键的表,一次更新操作可以直接用新值覆盖旧值。无论这个更新操作执行多少次,结果都是一样的。
    • 例子UPDATE T SET value = 'B' WHERE pk = 1。只要主键是 1,value 最终就是 'B'。
    • Paimon 内部处理:在 Paimon 的 Merge-Tree 结构中,默认就是采用这种模型。一次更新操作,本质上就是写入一条新的 KeyValue 记录,其 RowKind 可以是 INSERT (+I) 或 DELETE (-D)。比如,更新可以被看作是“删除旧值,插入新值”,但为了效率,Paimon 直接写入一条带新值的 +I 记录,利用序列号(Sequence Number)来覆盖旧记录即可。在这种模型下,确实不需要 UPDATE_* 标记。
  • 模型B:非幂等更新 (Non-idempotent Update) / Changelog 模型

    • 定义:当处理的数据流没有唯一键,或者需要精确追踪每一次变更的“前镜像”和“后镜像”时,一次更新操作必须被拆解为两个步骤:撤回旧值 (-U) 和 追加新值 (+U)
    • 例子:假设有一个流任务,需要计算一个无主键表中所有行的 value 字段的总和。如果一条 (value=10) 的记录被更新为 (value=15),你不能直接加上 15,否则结果就错了。你必须先减去 10,再- 加上 15。
    • 与 Flink CDC 和 Changelog Stream 的关系:这正是 Flink CDC(Change Data Capture)和 Flink 的 Changelog Stream 所采用的模型。当从 MySQL Binlog 等数据源捕获变更时,一次 UPDATE 操作会产生两条流数据:一条 UPDATE_BEFORE(包含旧值),紧跟着一条 UPDATE_AFTER(包含新值)。

总结

RowKind 是 Paimon 数据模型中一个优雅而强大的设计。

  • 职责清晰: 它只做一件事——定义数据行的变更类型。
  • 表达力强: 四个枚举值完整地覆盖了 INSERTDELETE 和 UPDATE (前镜像+后镜像) 这三种核心的数据库变更操作,能够精确描述 Changelog。
  • 设计高效: 使用 byte 作为核心序列化格式,保证了性能和跨平台稳定性。
  • 生态友好: 与 Flink 等主流计算引擎的原生 RowKind 对齐,并提供 rowkind.field 选项以兼容各类数据源,展现了其作为通用数据湖存储的开放性和灵活性。

通过 RowKind,Paimon 成功地将批处理和流处理的世界在数据表示层面上统一起来,是其成为一个真正的流式数据湖(Streaming Lakehouse)的关键基石之一。

KeyValue

KeyValue 是 Paimon 数据模型的核心,是数据在内存、文件以及处理流程中流转的基本单元。可以把它理解为 Paimon 世界里的“原子”,它封装了一次数据变更所需的所有信息。

首先,我们来看 KeyValue.java 中定义的几个核心成员变量,它们共同构成了 KeyValue 的完整信息。

// ... existing code ...
public class KeyValue {public static final long UNKNOWN_SEQUENCE = -1;public static final int UNKNOWN_LEVEL = -1;private InternalRow key;// determined after written into memory table or read from fileprivate long sequenceNumber;private RowKind valueKind;private InternalRow value;// determined after read from fileprivate int level;
// ... existing code ...
  • private InternalRow key;

    • 含义主键 (Primary Key)。这是这条记录的唯一标识符,由一个或多个字段组成。Paimon 根据 key 来对数据进行排序、合并和去重。InternalRow 是 Paimon 内部使用的高效二进制行格式。
    • 作用: 所有操作(查询、更新、删除)都围绕 key 进行。在数据文件中,记录会按 key 排序。
  • private long sequenceNumber;

    • 含义序列号。这是一个全局单调递增的长整型数字。每当有一次写入操作(增、删、改),Paimon 都会分配一个新的、更大的序列号。
    • 作用: 这是实现多版本并发控制(MVCC)和决定数据新旧的唯一标准。当存在多条记录拥有相同 key 时,拥有最大 sequenceNumber 的那条记录将胜出,代表了这个 key 的最新状态。
  • private RowKind valueKind;

    • 含义值类型 (Value Kind)。这是一个枚举类型,用于标记这条记录的操作意图。
    • 作用: 这是 KeyValue 的“动词”,告诉 Paimon 应该如何解释这条记录。RowKind 主要有两个核心值被 Paimon 的 Merge-Tree 存储引擎直接使用:
      • RowKind.ADD (+I): 代表增加或更新。它告诉合并引擎,这条记录是一个有效值。
      • RowKind.DELETE (-D): 代表删除。它告诉合并引擎,这是一个“墓碑”,所有比它旧的、关于同一个 key 的记录都应该被视为已删除。
    • 正如我们之前讨论的,RowKind 还包含 UPDATE_BEFORE (-U) 和 UPDATE_AFTER (+U),它们主要用于与 Flink 等流计算引擎对接,以精确表示数据变更流(Changelog),但在 Paimon 内部的合并逻辑中,最终都会被转换为 ADD 或 DELETE 来看待。
  • private InternalRow value;

    • 含义值 (Value)。这部分包含了记录中除了主键之外的所有其他字段。
    • 作用: 存储记录的实际业务数据。
  • private int level;

    • 含义层级 (Level)。表示这条记录是从哪个层级的数据文件中读取出来的。
    • 作用: 主要用于 Compaction(合并)和读取逻辑中。Level 0 是最新的层级,数字越大层级越老。这个字段在数据写入时通常是未知的 (UNKNOWN_LEVEL),只有在从文件中读取时才会被填充。

KeyValue 的物理存储结构

KeyValue 是一个逻辑上的概念。当它被序列化并写入数据文件时,它的字段会被“展平”成一个完整的行。createKeyValueFields 方法清晰地定义了这种物理结构。

// ... existing code ...public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND);fields.addAll(valueFields);return fields;}
// ... existing code ...

从这个方法可以看出,一条记录在文件中的物理列顺序是: [...主键字段..., _SEQUENCE_NUMBER_VALUE_KIND, ...值字段...]

_SEQUENCE_NUMBER 和 _VALUE_KIND 是 Paimon 自动添加的两个系统字段,它们是 KeyValue 核心元数据的物理体现,对于保证数据一致性至关重要。

KeyValue 的生命周期和使用方式

KeyValue 对象在 Paimon 系统中被广泛使用,并且为了性能考虑,它被设计为可重用 (Reusable) 的。

  • 创建与填充:

    • 当用户数据通过 MergeTreeWriter 写入时,Writer 会创建一个 KeyValue 对象,并用用户的 key 和 value、新分配的 sequenceNumber 以及对应的 valueKind 来填充它。
    • replace() 方法是其核心的填充方法,它允许用一套新的数据重用同一个 KeyValue 对象实例,避免了频繁创建对象的开销。
  • 序列化与反序列化:

    • KeyValueSerializer 负责将 KeyValue 对象和物理的 InternalRow 格式进行相互转换。它精确地按照 createKeyValueFields 定义的顺序来组装和解析行数据。
  • 在合并与读取中使用:

    • 当 MergeTreeCompactRewriter 执行合并时,它会从多个数据文件中读取记录,并将它们反序列化成 KeyValue 对象。
    • 然后,它会比较这些 KeyValue 对象的 key 来排序,比较 sequenceNumber 来决定胜出者,并根据 valueKind 来处理删除逻辑。
    • 最终,胜出的 KeyValue 会被再次序列化,写入新的数据文件。

level 字段

会发现在 KeyValue 类中,除了上面提到的四个部分,还有一个 level 字段。

// ... existing code ...
private InternalRow key;
// determined after written into memory table or read from file
private long sequenceNumber;
private RowKind valueKind;
private InternalRow value;
// determined after read from file
private int level;
// ... existing code ...

请注意 level 字段上面的注释:// determined after read from file

  • level 代表这条数据所在的LSM树的层级。
  • 这个信息不是每一行记录都必须持久化在数据文件里的。它更多的是一个运行时元数据
  • 当 Paimon 的 Reader 从一个数据文件(DataFileMeta)中读取数据时,它知道这个文件属于哪个 level,于是在创建 KeyValue 对象时,会把这个 level 信息一并填充进去。
  • 这个 level 信息对于后续的合并(Compaction)等操作非常重要,因为合并策略会根据数据所在的层级来决定行为。

代码证明:

KeyValue.java 类中的 createKeyValueFields 静态方法最能说明问题。这个方法定义了写入物理文件时,一行完整记录应该包含哪些字段。

// ... existing code .../*** Create key-value fields.** @param keyFields the key fields* @param valueFields the value fields* @return the table fields*/public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND);fields.addAll(valueFields);return fields;}
// ... existing code ...

可以看到,这个方法构建出的字段列表精确地包含了:

  1. keyFields (主键字段)
  2. SEQUENCE_NUMBER (序列号)
  3. VALUE_KIND (行类型)
  4. valueFields (值字段)

这里面并没有 LEVEL 字段。这证明了物理文件存储的核心内容就是上面描述的四个部分。

  • 物理存储:磁盘上的 Parquet/ORC 文件存储的是 (key, sequenceNumber, valueKind, value) 的扁平化结构。
  • 内存对象 KeyValue:是上述物理结构的运行时载体,它在内存中持有这四部分数据。同时,为了方便后续处理(如Compaction),它还额外携带了一些运行时元数据,比如 level

总结

KeyValue 类是 Paimon 数据组织和处理的基石。它通过将主键 (key)序列号 (sequenceNumber)操作类型 (valueKind) 和业务数据 (value) 这四个核心元素绑定在一起,构建了一个功能完备、信息全面的数据单元。

  • 逻辑上,它清晰地表达了一次数据变更的完整语义。
  • 物理上,它通过 _SEQUENCE_NUMBER 和 _VALUE_KIND 这两个系统字段,将核心元数据持久化到数据文件中。
  • 设计上,它的可重用性体现了 Paimon 对性能的极致追求。

BinaryRow 

public final class BinaryRow extends BinarySection implements InternalRow, DataSetters 这行定义包含了几个关键信息:

  • public final class BinaryRow: 这是一个公共的、最终的类,意味着它不能被其他类继承。
  • extends BinarySection: 它继承自 BinarySectionBinarySection 封装了对底层内存段(MemorySegment[])的引用、偏移量(offset)和大小(sizeInBytes),为 BinaryRow 提供了操作二进制数据的基础。
  • implements InternalRow: 这是最重要的部分。它实现了 InternalRow 接口,表明 BinaryRow 是 Paimon 内部行数据格式的一种具体实现。
  • implements DataSetters: 它实现了 DataSetters 接口,这个接口定义了一系列 setXX 方法(如 setIntsetLongsetString 等),表明 BinaryRow 具备写入各种类型字段的能力。

BinaryRow 的序列化结构

BinaryRow 的核心设计思想是将一行数据紧凑地存储在一块连续的内存中,以避免Java对象的开销,并实现高效的序列化和反序列化。其内存布局(也就是序列化结构)分为两个部分:定长部分 (Fixed-length part) 和 变长部分 (Variable-length part)

+------------------------------------------------------------------+
|                     Fixed-length Part                            |
+----------------+------------------------+------------------------+
| 1-byte Header  |  Null Bits (BitSet)    |      Field Values      |
+----------------+------------------------+------------------------+||v
+------------------------------------------------------------------+
|                     Variable-length Part                         |
+------------------------------------------------------------------+
|  (concatenated binary data for variable-length fields like String, Binary, etc.) |
+------------------------------------------------------------------+

我们来详细解析每个部分:

1. 定长部分 (Fixed-length Part)

这部分总是位于 BinaryRow 所指向的内存区域的开头,并且其自身长度是固定的。它包含三个子部分:

  • Header (1 byte):

    • 目前主要用于存储 RowKind(行类型,如 +I-D 等)。RowKind 用一个字节表示,这使得在处理数据时能快速判断该行的状态。
    • 代码依据:
// ... existing code ...
@Override
public RowKind getRowKind() {byte kindValue = segments[0].get(offset);return RowKind.fromByteValue(kindValue);
}@Override
public void setRowKind(RowKind kind) {segments[0].put(offset, kind.toByteValue());
}
// ... existing code ...
  • Null Bits (BitSet):

    • 这是一个位图(bitmask),用来紧凑地记录哪些字段是 NULL。每个字段对应一个 bit,如果 bit 为1,表示对应字段为 NULL,为0则表示非 NULL
    • 这种方式非常节省空间,比如64个字段的 NULL 信息只需要8个字节就能存储。
    • 它的总字节数会按8字节(64位)对齐。
    • 代码依据:
      // ... existing code ...
      public static int calculateBitSetWidthInBytes(int arity) {return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
      }
      // ... existing code ...
      @Override
      public boolean isNullAt(int pos) {assertIndexIsValid(pos);return MemorySegmentUtils.bitGet(segments[0], offset, pos + HEADER_SIZE_IN_BITS);
      }@Override
      public void setNullAt(int i) {assertIndexIsValid(i);MemorySegmentUtils.bitSet(segments[0], offset, i + HEADER_SIZE_IN_BITS);// ...
      }
      // ... existing code ...
      
  • Field Values (字段值区域):

    • 这是定长部分的主体,紧跟在 Null Bits 后面。它为每一个字段都预留了固定的8个字节(64位)的空间。
    • 对于定长类型 (如 intlongdoubleboolean 等),它们的值会直接存储在这8个字节中。例如,一个 int 会占用4个字节,剩下的4个字节是空的。
    • 对于变长类型 (如 Stringbyte[]ArrayMapRow 等),这8个字节存储的不是数据本身,而是一个指针,格式为 (offset << 32) | length
      • offset: 指向变长部分中该字段实际数据的起始位置。
      • length: 该字段实际数据的长度。
    • 代码依据:

      // ... existing code ...
      private int getFieldOffset(int pos) {return offset + nullBitsSizeInBytes + pos * 8;
      }
      // ... existing code ...
      @Override
      public void setInt(int pos, int value) {assertIndexIsValid(pos);setNotNullAt(pos);segments[0].putInt(getFieldOffset(pos), value);
      }
      // ... existing code ...
      @Override
      public BinaryString getString(int pos) {assertIndexIsValid(pos);int fieldOffset = getFieldOffset(pos);final long offsetAndLen = segments[0].getLong(fieldOffset);return MemorySegmentUtils.readBinaryString(segments, offset, fieldOffset, offsetAndLen);
      }
      // ... existing code ...
      

2. 变长部分 (Variable-length Part)

  • 这部分紧跟在定长部分的末尾。
  • 它包含了所有变长类型字段的实际二进制数据,这些数据被紧凑地拼接在一起。
  • 定长部分的指针就是指向这个区域的相应位置。

总结

BinaryRow 的序列化结构是一种非常经典且高效的设计:

  • 快速访问:通过 (基地址 + 字段偏移) 的方式,可以实现对任何定长字段的 O(1) 随机访问。对于变长字段,也只需要一次解引用即可定位到数据。
  • 空间高效:通过 Null BitSet 和紧凑的变长区,极大地节省了存储空间。
  • CPU 友好:二进制的紧凑布局有利于 CPU 缓存。比较、哈希等操作可以直接在字节层面进行,非常快。
  • 零拷贝/序列化:当数据需要在网络间传输或写入磁盘时,可以直接拷贝 BinaryRow 底层的字节数组,几乎没有额外的序列化开销。

这种设计是现代大数据处理引擎(如 Flink, Spark, Paimon)实现高性能的关键技术之一。

KeyValue的RowKind 和 BinaryRow的RowKind

这两者并不矛盾,它们在数据处理的不同阶段扮演着不同的角色。

核心区别

  • KeyValue.valueKind: 这是逻辑上的行类型。它代表了这个 KeyValue 键值对作为一个整体的操作类型(是新增、删除还是更新)。这个信息是持久化的,当数据被写入文件时,valueKind 会作为一个特殊的列(_VALUE_KIND)和数据一起存储。

  • InternalRow Header 中的 RowKind: 这是运行时的行类型。它是 InternalRow 这个具体数据载体自身的一个属性。它主要用于在算子之间传递数据时,让接收方能快速知道这个二进制数据块的类型,而不需要解析内部的字段。这个信息通常是瞬时的。

让我们通过数据写入和读取的流程来理解它们是如何协同工作的:

  1. 数据进入 (Write):

    • 上游(如 Flink)传来一条数据,通常是一个 InternalRow 对象,其 Header 中可能已经包含了 RowKind(比如 +I)。
    • Paimon 的 TableWriteImpl 接收到这个 InternalRow
    • 它会从这个 InternalRow 中提取出 RowKind (通过 row.getRowKind() 或者 RowKindGenerator)。
    • 然后,它会创建一个 KeyValue 对象,并将提取出的 RowKind 赋值给 KeyValue 的 valueKind 字段。
    • 此时,KeyValue 对象完整地表达了这条记录的逻辑含义:{key, sequenceNumber, valueKind, value}
  2. 数据持久化 (Write to File):

    • 当 KeyValue 对象需要被写入 Parquet/ORC 文件时,它会被“压平”成一个宽的 InternalRow
    • 这个压平的结构由 createKeyValueFields 方法定义,它明确地包含了 VALUE_KIND 这个特殊字段。
      // ... existing code ...
      public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND); // KeyValue.valueKind 的值被写入这个字段fields.addAll(valueFields);return fields;
      }
      // ... existing code ...
      
    • 所以,最终持久化到文件里的是 KeyValue.valueKind 的值,而不是 InternalRow Header 里的值。
  3. 数据读取 (Read):

    • Paimon 的 Reader 从数据文件中读取一行压平的 InternalRow
    • KeyValueSerializer 会负责将这个宽的 InternalRow 解析回 KeyValue 对象。
    • 它会从 keyArity + 1 的位置读取 _VALUE_KIND 字段的值,并用它来设置 KeyValue 对象的 valueKind 字段。
      // ... existing code ...
      @Override
      public KeyValue fromRow(InternalRow row) {reusedKey.replace(row);reusedValue.replace(row);long sequenceNumber = row.getLong(keyArity);RowKind valueKind = RowKind.fromByteValue(row.getByte(keyArity + 1)); // 从持久化列中读取reusedKv.replace(reusedKey, sequenceNumber, valueKind, reusedValue); // 设置到 KeyValue 对象中return reusedKv;
      }
      // ... existing code ...
      
  4. 运行时使用 (Runtime):

    • 当其他算子需要处理读取上来的数据时,有时为了方便,会把 KeyValue.valueKind 的值再设置回其包含的 InternalRow 的 Header 中。
    • 例如 ValueContentRowDataRecordIterator,它将 KeyValue 转换回 InternalRow 给下游使用,这时它就把逻辑上的 valueKind 设置到了 InternalRow 的运行时 Header 中。
      // ... existing code ...
      @Override
      public InternalRow next() throws IOException {KeyValue kv = nextKeyValue();// ...InternalRow rowData = kv.value();rowData.setRowKind(kv.valueKind()); // 将 KeyValue 的逻辑 valueKind 设置到 InternalRow 的运行时 Header 中return rowData;
      }
      // ... existing code ...
      

总结

  • KeyValue.valueKind 是逻辑层面的、持久化的行类型,是这条记录操作类型的“事实标准”。
  • InternalRow Header 中的 RowKind 是运行时层面的、瞬时的行类型,是一种高效传递行状态的机制。
  • 两者的数据会相互传递:写入时,InternalRow Header 的 RowKind -> KeyValue.valueKind -> 持久化列;读取时,持久化列 -> KeyValue.valueKind -> InternalRow Header。

怎么解析实际类型

BinaryRow 本身不存储每个字段的元数据信息(比如字段类型、字段名)。它只是一块紧凑的、按照特定规则布局的二进制数据。

那么,系统是如何知道该如何反序列化这块二进制数据的呢?

答案是:反序列化操作依赖于外部提供的 Schema (模式),通常是一个 RowType 对象。

这个设计是大数据系统中一种非常常见且高效的模式,可以称之为“读时模式 (Schema-on-Read)”或者更准确地说是“数据与模式分离”。BinaryRow 只负责承载数据,而如何解释这些数据则由使用者(调用方)根据 Schema 来决定。

让我们用一个比喻来解释:

  • BinaryRow 就像是一盘没有标签的录音磁带,上面录制了二进制的音频信号。
  • RowType (Schema) 就像是这盘磁带的封面,上面写着曲目列表:“第一首:整数,4字节;第二首:字符串,变长;第三首:长整数,8字节...”。

没有封面(RowType),你就无法知道磁带(BinaryRow)在哪个位置是什么内容。

工作流程

1. 序列化 (写入时)

当一个逻辑上的行(比如 GenericRow)需要被序列化成 BinaryRow 时,执行序列化的组件(比如 BinaryRowWriter 或 InternalRowSerializer必须知道目标 Schema (RowType)

  • 它会遍历 RowType 中的每个字段。
  • 如果第一个字段类型是 INT,它就调用 writer.writeInt(...)
  • 如果第二个字段类型是 STRING,它就调用 writer.writeString(...)
  • ...以此类推。

BinaryRowWriter 根据不同的数据类型,将值正确地写入到 BinaryRow 的定长区或变长区。

代码佐证: 在测试代码 RowDataTest.java 中,getBinaryRow() 方法展示了这一点。代码的编写者明确知道要写入的数据的顺序和类型,并依次调用了 writer 的相应方法。

// ... existing code ...private BinaryRow getBinaryRow() {BinaryRow row = new BinaryRow(19);BinaryRowWriter writer = new BinaryRowWriter(row);writer.writeBoolean(0, true);writer.writeByte(1, (byte) 1);writer.writeShort(2, (short) 2);writer.writeInt(3, 3);writer.writeLong(4, 4);
// ... existing code ...writer.writeString(8, str);
// ... existing code ...return row;}
// ... existing code ...

2. 反序列化 (读取时)

当需要从 BinaryRow 中读取数据时,读取方同样必须持有写入时所使用的那个 RowType

  • 当需要读取第一个字段时,它会查看 RowType,发现类型是 INT
  • 于是,它就调用 binaryRow.getInt(0)
  • 当需要读取第二个字段时,它再次查看 RowType,发现类型是 STRING
  • 于是,它就调用 binaryRow.getString(1)

BinaryRow 内部的 getXX 方法只负责根据给定的位置索引 pos,按照 BinaryRow 的内存布局规则去解析字节,它本身并不知道这个字段在业务上代表什么。

代码佐证: BinaryRow.java 中的 get 方法完美地诠释了这一点。它们只关心位置 pos 和二进制布局,不关心业务类型。

// ... existing code ...@Overridepublic int getInt(int pos) {assertIndexIsValid(pos);// 直接从计算好的偏移量读取一个整数return segments[0].getInt(getFieldOffset(pos));}@Overridepublic long getLong(int pos) {assertIndexIsValid(pos);// 直接从计算好的偏移量读取一个长整数return segments[0].getLong(getFieldOffset(pos));}
// ... existing code ...@Overridepublic BinaryString getString(int pos) {assertIndexIsValid(pos);int fieldOffset = getFieldOffset(pos);// 从定长区读取 "偏移量和长度" 这个指针final long offsetAndLen = segments[0].getLong(fieldOffset);// 根据指针信息去变长区读取实际的字符串数据return MemorySegmentUtils.readBinaryString(segments, offset, fieldOffset, offsetAndLen);}
// ... existing code ...

总结

  • 不存储类型是为了极致的性能和空间效率。如果每一行都存储自己的 Schema,会带来巨大的空间浪费和序列化/反序列化开销。
  • Paimon 依赖于在处理的上下文(Context)中传递 RowType。无论是表的 Reader、Writer,还是各种函数,在初始化时都会被赋予它们需要处理的数据的 RowType
  • 序列化和反序列化过程是严格对称的。必须使用完全相同的 RowType 来写入和读取,否则就会导致数据解析错误。

这种设计是所有现代高性能数据处理框架(包括 Flink、Spark 等)的共同选择。

http://www.xdnf.cn/news/15416.html

相关文章:

  • vue中使用西瓜播放器xgplayer (封装)+xgplayer-hls 播放.m3u8格式视频
  • 【王树森推荐系统】物品冷启05:流量调控
  • Java-72 深入浅出 RPC Dubbo 上手 生产者模块详解
  • 清除 Android 手机 SIM 卡数据的4 种简单方法
  • 网络准入控制系统的作用解析,2025年保障企业入网安全第一道防线
  • OpenVela之开发自测试框架cmocka
  • 【算法训练营Day12】二叉树part2
  • 量产技巧之RK3588 Android12默认移除导航栏状态栏​
  • google浏览器::-webkit-scrollbar-thumb设置容器滚动条滑块不生效
  • Android 性能优化:启动优化全解析
  • C++-linux 7.文件IO(一)系统调用
  • Linux上基于C/C++头文件查找对应的依赖开发库
  • uni-app 选择国家区号
  • CentOS 7服务器上使用Docker部署Notesnook的详细指导说明
  • Spring Cloud分布式配置中心:架构设计与技术实践
  • 链表算法之【获取链表开始入环的节点】
  • 图生生AI模仿裂变:1分钟批量裂变素材图片!
  • MySQL数据库的基础操作
  • C++后端面试八股文
  • 深入解析Hadoop YARN架构设计:从原理到实践
  • 5、qt系统相关
  • LLM表征工程还有哪些值得做的地方
  • linux打包固件shell脚本
  • FOC算法中SIMULINK一些常用模块(1)(个人留存)
  • 多客户端-服务器(select,poll)
  • 第二章 基于新版Onenet搭建云服务(stm32物联网)
  • elementPlus中的el-table实现合并单元格
  • MMKV 存储json list数据(kotlin)
  • 《Linux篇》自动化构建-make/Makefile
  • 自动润滑系统:从 “盲目养护“ 到智能精注的工业运维革命