深入解析Paimon的RowKind数据变更机制 和 KeyValue存储
RowKind
RowKind
是 Paimon 中一个基础但极其重要的枚举。如果说一条数据记录(Row)是“名词”,那么 RowKind
就是描述这条记录的“动词”,它定义了这条记录所代表的变更类型。这对于实现数据湖的增量更新、流式消费以及与外部计算引擎(如 Flink)的交互至关重要。
RowKind.java
文件中定义了四种变更类型,它们共同构成了对数据变更的完整描述。
// ... existing code ...
@Public
public enum RowKind {// Note: Enums have no stable hash code across different JVMs, use toByteValue() for// this purpose./** Insertion operation. */INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */DELETE("-D", (byte) 3);
// ... existing code ...
INSERT ("+I")
: 插入。代表这是一条全新的数据。DELETE ("-D")
: 删除。代表这条记录对应的主键被删除了。在 Paimon 内部,这通常会生成一条“墓碑”记录。UPDATE_BEFORE ("-U")
: 更新前。这代表一个更新操作发生前,记录的“旧值”(前镜像)。它的作用是撤回 (Retract) 旧的状态。UPDATE_AFTER ("+U")
: 更新后。这代表一个更新操作发生后,记录的“新值”(后镜像)。它的作用是追加 (Accrete) 新的状态。
UPDATE_BEFORE
和 UPDATE_AFTER
总是成对出现,用于精确地描述一次更新操作,这对于流式计算中的有状态算子(如聚合、关联)至关重要。
RowKind
的设计兼顾了可读性和序列化性能。
构造函数与字段:
private final String shortString;
: 每个枚举值都有一个简短的字符串表示(如"+I"
),方便人类阅读和调试。private final byte value;
: 每个枚举值都有一个唯一的byte
值(0 到 3)。这个字节值是其主要的序列化形式,因为它比字符串更紧凑,并且在不同的 JVM 实例之间保持稳定,避免了使用 Java 枚举默认hashCode
或ordinal()
可能带来的问题。
核心工具方法:
isRetract()
和isAdd()
:/** Is {@link #UPDATE_BEFORE} or {@link #DELETE}. */ public boolean isRetract() {return this == RowKind.UPDATE_BEFORE || this == RowKind.DELETE; }/** Is {@link #INSERT} or {@link #UPDATE_AFTER}. */ public boolean isAdd() {return this == RowKind.INSERT || this == RowKind.UPDATE_AFTER; }
这两个方法非常有价值,它们将四种
RowKind
归纳为两种基本操作:撤回和追加。所有带负号的(-U
,-D
)都是撤回操作,所有带正号的(+I
,+U
)都是追加操作。这简化了下游算子的处理逻辑。fromByteValue(byte value)
和fromShortString(String value)
: 这两个静态工厂方法分别用于从byte
值和短字符串反序列化出RowKind
对象,是toByteValue()
和shortString()
的逆操作,构成了完整的序列化/反序列化闭环。
在 Paimon 生态中的应用
RowKind
的设计并非孤立的,它在整个 Paimon 生态中扮演着“通用语言”的角色。
与计算引擎无缝对接: Paimon 的
RowKind
设计与 Flink 的org.apache.flink.types.RowKind
几乎完全一致。从FlinkRowData.toFlinkRowKind
方法可以看出,它们之间可以进行一一映射。这使得 Paimon 可以作为 Flink 的原生流式 Sink 和 Source,无损地传输和理解 Changelog 数据流。支持灵活的数据源: 在实际业务中,Changelog 的格式多种多样。Paimon 允许用户通过
rowkind.field
表属性,从源数据的一个字段中直接提取RowKind
信息。- 如文档
sequence-rowkind.md
所述,用户可以在建表时指定'rowkind.field' = 'your_op_column'
。 RowKindGenerator.java
类就是这个功能的具体实现。它会读取指定列的字符串值(如"+I"
,"-D"
),并使用RowKind.fromShortString()
将其转换为RowKind
枚举。
- 如文档
驱动内部写入和合并逻辑: 在 Paimon 内部,
RowKind
决定了数据的最终去向。- 在
MergeIntoPaimonTable.scala
的MERGE INTO
命令实现中,可以看到代码根据ROW_KIND_COL
的值来分别处理数据:DELETE
或UPDATE_AFTER
的行可能会被用于更新删除向量(Deletion Vector),而INSERT
或UPDATE_AFTER
的行会被写入新的数据文件。 - 在
KeyValue
对象中,valueKind
字段存储的就是RowKind
,它在 Compaction 过程中指导MergeFunction
如何合并数据:是保留新值、还是将一个主键标记为删除。
- 在
更新操作的两种模型
在数据处理中,“更新(Update)”这个操作可以被理解为两种不同的模型:
模型A:幂等更新 (Idempotent Update)
- 定义:对于一个有主键的表,一次更新操作可以直接用新值覆盖旧值。无论这个更新操作执行多少次,结果都是一样的。
- 例子:
UPDATE T SET value = 'B' WHERE pk = 1
。只要主键是 1,value
最终就是 'B'。 - Paimon 内部处理:在 Paimon 的 Merge-Tree 结构中,默认就是采用这种模型。一次更新操作,本质上就是写入一条新的
KeyValue
记录,其RowKind
可以是INSERT
(+I
) 或DELETE
(-D
)。比如,更新可以被看作是“删除旧值,插入新值”,但为了效率,Paimon 直接写入一条带新值的+I
记录,利用序列号(Sequence Number)来覆盖旧记录即可。在这种模型下,确实不需要UPDATE_*
标记。
模型B:非幂等更新 (Non-idempotent Update) / Changelog 模型
- 定义:当处理的数据流没有唯一键,或者需要精确追踪每一次变更的“前镜像”和“后镜像”时,一次更新操作必须被拆解为两个步骤:撤回旧值 (
-U
) 和 追加新值 (+U
)。 - 例子:假设有一个流任务,需要计算一个无主键表中所有行的
value
字段的总和。如果一条(value=10)
的记录被更新为(value=15)
,你不能直接加上 15,否则结果就错了。你必须先减去 10,再- 加上 15。 - 与 Flink CDC 和 Changelog Stream 的关系:这正是 Flink CDC(Change Data Capture)和 Flink 的 Changelog Stream 所采用的模型。当从 MySQL Binlog 等数据源捕获变更时,一次
UPDATE
操作会产生两条流数据:一条UPDATE_BEFORE
(包含旧值),紧跟着一条UPDATE_AFTER
(包含新值)。
- 定义:当处理的数据流没有唯一键,或者需要精确追踪每一次变更的“前镜像”和“后镜像”时,一次更新操作必须被拆解为两个步骤:撤回旧值 (
总结
RowKind
是 Paimon 数据模型中一个优雅而强大的设计。
- 职责清晰: 它只做一件事——定义数据行的变更类型。
- 表达力强: 四个枚举值完整地覆盖了
INSERT
,DELETE
和UPDATE
(前镜像+后镜像) 这三种核心的数据库变更操作,能够精确描述 Changelog。 - 设计高效: 使用
byte
作为核心序列化格式,保证了性能和跨平台稳定性。 - 生态友好: 与 Flink 等主流计算引擎的原生
RowKind
对齐,并提供rowkind.field
选项以兼容各类数据源,展现了其作为通用数据湖存储的开放性和灵活性。
通过 RowKind
,Paimon 成功地将批处理和流处理的世界在数据表示层面上统一起来,是其成为一个真正的流式数据湖(Streaming Lakehouse)的关键基石之一。
KeyValue
KeyValue
是 Paimon 数据模型的核心,是数据在内存、文件以及处理流程中流转的基本单元。可以把它理解为 Paimon 世界里的“原子”,它封装了一次数据变更所需的所有信息。
首先,我们来看 KeyValue.java
中定义的几个核心成员变量,它们共同构成了 KeyValue
的完整信息。
// ... existing code ...
public class KeyValue {public static final long UNKNOWN_SEQUENCE = -1;public static final int UNKNOWN_LEVEL = -1;private InternalRow key;// determined after written into memory table or read from fileprivate long sequenceNumber;private RowKind valueKind;private InternalRow value;// determined after read from fileprivate int level;
// ... existing code ...
private InternalRow key;
- 含义: 主键 (Primary Key)。这是这条记录的唯一标识符,由一个或多个字段组成。Paimon 根据
key
来对数据进行排序、合并和去重。InternalRow
是 Paimon 内部使用的高效二进制行格式。 - 作用: 所有操作(查询、更新、删除)都围绕
key
进行。在数据文件中,记录会按key
排序。
- 含义: 主键 (Primary Key)。这是这条记录的唯一标识符,由一个或多个字段组成。Paimon 根据
private long sequenceNumber;
- 含义: 序列号。这是一个全局单调递增的长整型数字。每当有一次写入操作(增、删、改),Paimon 都会分配一个新的、更大的序列号。
- 作用: 这是实现多版本并发控制(MVCC)和决定数据新旧的唯一标准。当存在多条记录拥有相同
key
时,拥有最大sequenceNumber
的那条记录将胜出,代表了这个key
的最新状态。
private RowKind valueKind;
- 含义: 值类型 (Value Kind)。这是一个枚举类型,用于标记这条记录的操作意图。
- 作用: 这是
KeyValue
的“动词”,告诉 Paimon 应该如何解释这条记录。RowKind
主要有两个核心值被 Paimon 的 Merge-Tree 存储引擎直接使用:RowKind.ADD
(+I
): 代表增加或更新。它告诉合并引擎,这条记录是一个有效值。RowKind.DELETE
(-D
): 代表删除。它告诉合并引擎,这是一个“墓碑”,所有比它旧的、关于同一个key
的记录都应该被视为已删除。
- 正如我们之前讨论的,
RowKind
还包含UPDATE_BEFORE
(-U
) 和UPDATE_AFTER
(+U
),它们主要用于与 Flink 等流计算引擎对接,以精确表示数据变更流(Changelog),但在 Paimon 内部的合并逻辑中,最终都会被转换为ADD
或DELETE
来看待。
private InternalRow value;
- 含义: 值 (Value)。这部分包含了记录中除了主键之外的所有其他字段。
- 作用: 存储记录的实际业务数据。
private int level;
- 含义: 层级 (Level)。表示这条记录是从哪个层级的数据文件中读取出来的。
- 作用: 主要用于 Compaction(合并)和读取逻辑中。Level 0 是最新的层级,数字越大层级越老。这个字段在数据写入时通常是未知的 (
UNKNOWN_LEVEL
),只有在从文件中读取时才会被填充。
KeyValue
的物理存储结构
KeyValue
是一个逻辑上的概念。当它被序列化并写入数据文件时,它的字段会被“展平”成一个完整的行。createKeyValueFields
方法清晰地定义了这种物理结构。
// ... existing code ...public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND);fields.addAll(valueFields);return fields;}
// ... existing code ...
从这个方法可以看出,一条记录在文件中的物理列顺序是: [...主键字段..., _SEQUENCE_NUMBER
, _VALUE_KIND
, ...值字段...]
_SEQUENCE_NUMBER
和 _VALUE_KIND
是 Paimon 自动添加的两个系统字段,它们是 KeyValue
核心元数据的物理体现,对于保证数据一致性至关重要。
KeyValue
的生命周期和使用方式
KeyValue
对象在 Paimon 系统中被广泛使用,并且为了性能考虑,它被设计为可重用 (Reusable) 的。
创建与填充:
- 当用户数据通过
MergeTreeWriter
写入时,Writer 会创建一个KeyValue
对象,并用用户的key
和value
、新分配的sequenceNumber
以及对应的valueKind
来填充它。 replace()
方法是其核心的填充方法,它允许用一套新的数据重用同一个KeyValue
对象实例,避免了频繁创建对象的开销。
- 当用户数据通过
序列化与反序列化:
KeyValueSerializer
负责将KeyValue
对象和物理的InternalRow
格式进行相互转换。它精确地按照createKeyValueFields
定义的顺序来组装和解析行数据。
在合并与读取中使用:
- 当
MergeTreeCompactRewriter
执行合并时,它会从多个数据文件中读取记录,并将它们反序列化成KeyValue
对象。 - 然后,它会比较这些
KeyValue
对象的key
来排序,比较sequenceNumber
来决定胜出者,并根据valueKind
来处理删除逻辑。 - 最终,胜出的
KeyValue
会被再次序列化,写入新的数据文件。
- 当
level
字段
会发现在 KeyValue
类中,除了上面提到的四个部分,还有一个 level
字段。
// ... existing code ...
private InternalRow key;
// determined after written into memory table or read from file
private long sequenceNumber;
private RowKind valueKind;
private InternalRow value;
// determined after read from file
private int level;
// ... existing code ...
请注意 level
字段上面的注释:// determined after read from file
。
level
代表这条数据所在的LSM树的层级。- 这个信息不是每一行记录都必须持久化在数据文件里的。它更多的是一个运行时元数据。
- 当 Paimon 的 Reader 从一个数据文件(
DataFileMeta
)中读取数据时,它知道这个文件属于哪个level
,于是在创建KeyValue
对象时,会把这个level
信息一并填充进去。 - 这个
level
信息对于后续的合并(Compaction)等操作非常重要,因为合并策略会根据数据所在的层级来决定行为。
代码证明:
KeyValue.java
类中的 createKeyValueFields
静态方法最能说明问题。这个方法定义了写入物理文件时,一行完整记录应该包含哪些字段。
// ... existing code .../*** Create key-value fields.** @param keyFields the key fields* @param valueFields the value fields* @return the table fields*/public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND);fields.addAll(valueFields);return fields;}
// ... existing code ...
可以看到,这个方法构建出的字段列表精确地包含了:
keyFields
(主键字段)SEQUENCE_NUMBER
(序列号)VALUE_KIND
(行类型)valueFields
(值字段)
这里面并没有 LEVEL
字段。这证明了物理文件存储的核心内容就是上面描述的四个部分。
- 物理存储:磁盘上的 Parquet/ORC 文件存储的是
(key, sequenceNumber, valueKind, value)
的扁平化结构。 - 内存对象
KeyValue
:是上述物理结构的运行时载体,它在内存中持有这四部分数据。同时,为了方便后续处理(如Compaction),它还额外携带了一些运行时元数据,比如level
。
总结
KeyValue
类是 Paimon 数据组织和处理的基石。它通过将主键 (key
)、序列号 (sequenceNumber
)、操作类型 (valueKind
) 和业务数据 (value
) 这四个核心元素绑定在一起,构建了一个功能完备、信息全面的数据单元。
- 逻辑上,它清晰地表达了一次数据变更的完整语义。
- 物理上,它通过
_SEQUENCE_NUMBER
和_VALUE_KIND
这两个系统字段,将核心元数据持久化到数据文件中。 - 设计上,它的可重用性体现了 Paimon 对性能的极致追求。
BinaryRow
public final class BinaryRow extends BinarySection implements InternalRow, DataSetters
这行定义包含了几个关键信息:
public final class BinaryRow
: 这是一个公共的、最终的类,意味着它不能被其他类继承。extends BinarySection
: 它继承自BinarySection
。BinarySection
封装了对底层内存段(MemorySegment[]
)的引用、偏移量(offset
)和大小(sizeInBytes
),为BinaryRow
提供了操作二进制数据的基础。implements InternalRow
: 这是最重要的部分。它实现了InternalRow
接口,表明BinaryRow
是 Paimon 内部行数据格式的一种具体实现。implements DataSetters
: 它实现了DataSetters
接口,这个接口定义了一系列setXX
方法(如setInt
,setLong
,setString
等),表明BinaryRow
具备写入各种类型字段的能力。
BinaryRow
的序列化结构
BinaryRow
的核心设计思想是将一行数据紧凑地存储在一块连续的内存中,以避免Java对象的开销,并实现高效的序列化和反序列化。其内存布局(也就是序列化结构)分为两个部分:定长部分 (Fixed-length part) 和 变长部分 (Variable-length part)。
+------------------------------------------------------------------+
| Fixed-length Part |
+----------------+------------------------+------------------------+
| 1-byte Header | Null Bits (BitSet) | Field Values |
+----------------+------------------------+------------------------+||v
+------------------------------------------------------------------+
| Variable-length Part |
+------------------------------------------------------------------+
| (concatenated binary data for variable-length fields like String, Binary, etc.) |
+------------------------------------------------------------------+
我们来详细解析每个部分:
1. 定长部分 (Fixed-length Part)
这部分总是位于 BinaryRow
所指向的内存区域的开头,并且其自身长度是固定的。它包含三个子部分:
Header (1 byte):
- 目前主要用于存储
RowKind
(行类型,如+I
,-D
等)。RowKind
用一个字节表示,这使得在处理数据时能快速判断该行的状态。 - 代码依据:
- 目前主要用于存储
// ... existing code ...
@Override
public RowKind getRowKind() {byte kindValue = segments[0].get(offset);return RowKind.fromByteValue(kindValue);
}@Override
public void setRowKind(RowKind kind) {segments[0].put(offset, kind.toByteValue());
}
// ... existing code ...
Null Bits (BitSet):
- 这是一个位图(bitmask),用来紧凑地记录哪些字段是
NULL
。每个字段对应一个 bit,如果 bit 为1,表示对应字段为NULL
,为0则表示非NULL
。 - 这种方式非常节省空间,比如64个字段的
NULL
信息只需要8个字节就能存储。 - 它的总字节数会按8字节(64位)对齐。
- 代码依据:
// ... existing code ... public static int calculateBitSetWidthInBytes(int arity) {return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; } // ... existing code ... @Override public boolean isNullAt(int pos) {assertIndexIsValid(pos);return MemorySegmentUtils.bitGet(segments[0], offset, pos + HEADER_SIZE_IN_BITS); }@Override public void setNullAt(int i) {assertIndexIsValid(i);MemorySegmentUtils.bitSet(segments[0], offset, i + HEADER_SIZE_IN_BITS);// ... } // ... existing code ...
- 这是一个位图(bitmask),用来紧凑地记录哪些字段是
Field Values (字段值区域):
- 这是定长部分的主体,紧跟在 Null Bits 后面。它为每一个字段都预留了固定的8个字节(64位)的空间。
- 对于定长类型 (如
int
,long
,double
,boolean
等),它们的值会直接存储在这8个字节中。例如,一个int
会占用4个字节,剩下的4个字节是空的。 - 对于变长类型 (如
String
,byte[]
,Array
,Map
,Row
等),这8个字节存储的不是数据本身,而是一个指针,格式为(offset << 32) | length
。offset
: 指向变长部分中该字段实际数据的起始位置。length
: 该字段实际数据的长度。
- 代码依据:
// ... existing code ... private int getFieldOffset(int pos) {return offset + nullBitsSizeInBytes + pos * 8; } // ... existing code ... @Override public void setInt(int pos, int value) {assertIndexIsValid(pos);setNotNullAt(pos);segments[0].putInt(getFieldOffset(pos), value); } // ... existing code ... @Override public BinaryString getString(int pos) {assertIndexIsValid(pos);int fieldOffset = getFieldOffset(pos);final long offsetAndLen = segments[0].getLong(fieldOffset);return MemorySegmentUtils.readBinaryString(segments, offset, fieldOffset, offsetAndLen); } // ... existing code ...
2. 变长部分 (Variable-length Part)
- 这部分紧跟在定长部分的末尾。
- 它包含了所有变长类型字段的实际二进制数据,这些数据被紧凑地拼接在一起。
- 定长部分的指针就是指向这个区域的相应位置。
总结
BinaryRow
的序列化结构是一种非常经典且高效的设计:
- 快速访问:通过
(基地址 + 字段偏移)
的方式,可以实现对任何定长字段的 O(1) 随机访问。对于变长字段,也只需要一次解引用即可定位到数据。 - 空间高效:通过 Null BitSet 和紧凑的变长区,极大地节省了存储空间。
- CPU 友好:二进制的紧凑布局有利于 CPU 缓存。比较、哈希等操作可以直接在字节层面进行,非常快。
- 零拷贝/序列化:当数据需要在网络间传输或写入磁盘时,可以直接拷贝
BinaryRow
底层的字节数组,几乎没有额外的序列化开销。
这种设计是现代大数据处理引擎(如 Flink, Spark, Paimon)实现高性能的关键技术之一。
KeyValue的RowKind 和 BinaryRow的RowKind
这两者并不矛盾,它们在数据处理的不同阶段扮演着不同的角色。
核心区别
KeyValue.valueKind
: 这是逻辑上的行类型。它代表了这个KeyValue
键值对作为一个整体的操作类型(是新增、删除还是更新)。这个信息是持久化的,当数据被写入文件时,valueKind
会作为一个特殊的列(_VALUE_KIND
)和数据一起存储。InternalRow
Header 中的RowKind
: 这是运行时的行类型。它是InternalRow
这个具体数据载体自身的一个属性。它主要用于在算子之间传递数据时,让接收方能快速知道这个二进制数据块的类型,而不需要解析内部的字段。这个信息通常是瞬时的。
让我们通过数据写入和读取的流程来理解它们是如何协同工作的:
数据进入 (Write):
- 上游(如 Flink)传来一条数据,通常是一个
InternalRow
对象,其 Header 中可能已经包含了RowKind
(比如+I
)。 - Paimon 的
TableWriteImpl
接收到这个InternalRow
。 - 它会从这个
InternalRow
中提取出RowKind
(通过row.getRowKind()
或者RowKindGenerator
)。 - 然后,它会创建一个
KeyValue
对象,并将提取出的RowKind
赋值给KeyValue
的valueKind
字段。 - 此时,
KeyValue
对象完整地表达了这条记录的逻辑含义:{key, sequenceNumber, valueKind, value}
。
- 上游(如 Flink)传来一条数据,通常是一个
数据持久化 (Write to File):
- 当
KeyValue
对象需要被写入 Parquet/ORC 文件时,它会被“压平”成一个宽的InternalRow
。 - 这个压平的结构由
createKeyValueFields
方法定义,它明确地包含了VALUE_KIND
这个特殊字段。// ... existing code ... public static List<DataField> createKeyValueFields(List<DataField> keyFields, List<DataField> valueFields) {List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);fields.addAll(keyFields);fields.add(SEQUENCE_NUMBER);fields.add(VALUE_KIND); // KeyValue.valueKind 的值被写入这个字段fields.addAll(valueFields);return fields; } // ... existing code ...
- 所以,最终持久化到文件里的是
KeyValue.valueKind
的值,而不是InternalRow
Header 里的值。
- 当
数据读取 (Read):
- Paimon 的 Reader 从数据文件中读取一行压平的
InternalRow
。 KeyValueSerializer
会负责将这个宽的InternalRow
解析回KeyValue
对象。- 它会从
keyArity + 1
的位置读取_VALUE_KIND
字段的值,并用它来设置KeyValue
对象的valueKind
字段。// ... existing code ... @Override public KeyValue fromRow(InternalRow row) {reusedKey.replace(row);reusedValue.replace(row);long sequenceNumber = row.getLong(keyArity);RowKind valueKind = RowKind.fromByteValue(row.getByte(keyArity + 1)); // 从持久化列中读取reusedKv.replace(reusedKey, sequenceNumber, valueKind, reusedValue); // 设置到 KeyValue 对象中return reusedKv; } // ... existing code ...
- Paimon 的 Reader 从数据文件中读取一行压平的
运行时使用 (Runtime):
- 当其他算子需要处理读取上来的数据时,有时为了方便,会把
KeyValue.valueKind
的值再设置回其包含的InternalRow
的 Header 中。 - 例如
ValueContentRowDataRecordIterator
,它将KeyValue
转换回InternalRow
给下游使用,这时它就把逻辑上的valueKind
设置到了InternalRow
的运行时 Header 中。// ... existing code ... @Override public InternalRow next() throws IOException {KeyValue kv = nextKeyValue();// ...InternalRow rowData = kv.value();rowData.setRowKind(kv.valueKind()); // 将 KeyValue 的逻辑 valueKind 设置到 InternalRow 的运行时 Header 中return rowData; } // ... existing code ...
- 当其他算子需要处理读取上来的数据时,有时为了方便,会把
总结
KeyValue.valueKind
是逻辑层面的、持久化的行类型,是这条记录操作类型的“事实标准”。InternalRow
Header 中的RowKind
是运行时层面的、瞬时的行类型,是一种高效传递行状态的机制。- 两者的数据会相互传递:写入时,
InternalRow
Header 的RowKind
->KeyValue.valueKind
-> 持久化列;读取时,持久化列 ->KeyValue.valueKind
->InternalRow
Header。
怎么解析实际类型
BinaryRow
本身不存储每个字段的元数据信息(比如字段类型、字段名)。它只是一块紧凑的、按照特定规则布局的二进制数据。
那么,系统是如何知道该如何反序列化这块二进制数据的呢?
答案是:反序列化操作依赖于外部提供的 Schema (模式),通常是一个 RowType
对象。
这个设计是大数据系统中一种非常常见且高效的模式,可以称之为“读时模式 (Schema-on-Read)”或者更准确地说是“数据与模式分离”。BinaryRow
只负责承载数据,而如何解释这些数据则由使用者(调用方)根据 Schema 来决定。
让我们用一个比喻来解释:
BinaryRow
就像是一盘没有标签的录音磁带,上面录制了二进制的音频信号。RowType
(Schema) 就像是这盘磁带的封面,上面写着曲目列表:“第一首:整数,4字节;第二首:字符串,变长;第三首:长整数,8字节...”。
没有封面(RowType
),你就无法知道磁带(BinaryRow
)在哪个位置是什么内容。
工作流程
1. 序列化 (写入时)
当一个逻辑上的行(比如 GenericRow
)需要被序列化成 BinaryRow
时,执行序列化的组件(比如 BinaryRowWriter
或 InternalRowSerializer
)必须知道目标 Schema (RowType
)。
- 它会遍历
RowType
中的每个字段。 - 如果第一个字段类型是
INT
,它就调用writer.writeInt(...)
。 - 如果第二个字段类型是
STRING
,它就调用writer.writeString(...)
。 - ...以此类推。
BinaryRowWriter
根据不同的数据类型,将值正确地写入到 BinaryRow
的定长区或变长区。
代码佐证: 在测试代码 RowDataTest.java
中,getBinaryRow()
方法展示了这一点。代码的编写者明确知道要写入的数据的顺序和类型,并依次调用了 writer
的相应方法。
// ... existing code ...private BinaryRow getBinaryRow() {BinaryRow row = new BinaryRow(19);BinaryRowWriter writer = new BinaryRowWriter(row);writer.writeBoolean(0, true);writer.writeByte(1, (byte) 1);writer.writeShort(2, (short) 2);writer.writeInt(3, 3);writer.writeLong(4, 4);
// ... existing code ...writer.writeString(8, str);
// ... existing code ...return row;}
// ... existing code ...
2. 反序列化 (读取时)
当需要从 BinaryRow
中读取数据时,读取方同样必须持有写入时所使用的那个 RowType
。
- 当需要读取第一个字段时,它会查看
RowType
,发现类型是INT
。 - 于是,它就调用
binaryRow.getInt(0)
。 - 当需要读取第二个字段时,它再次查看
RowType
,发现类型是STRING
。 - 于是,它就调用
binaryRow.getString(1)
。
BinaryRow
内部的 getXX
方法只负责根据给定的位置索引 pos
,按照 BinaryRow
的内存布局规则去解析字节,它本身并不知道这个字段在业务上代表什么。
代码佐证: BinaryRow.java
中的 get
方法完美地诠释了这一点。它们只关心位置 pos
和二进制布局,不关心业务类型。
// ... existing code ...@Overridepublic int getInt(int pos) {assertIndexIsValid(pos);// 直接从计算好的偏移量读取一个整数return segments[0].getInt(getFieldOffset(pos));}@Overridepublic long getLong(int pos) {assertIndexIsValid(pos);// 直接从计算好的偏移量读取一个长整数return segments[0].getLong(getFieldOffset(pos));}
// ... existing code ...@Overridepublic BinaryString getString(int pos) {assertIndexIsValid(pos);int fieldOffset = getFieldOffset(pos);// 从定长区读取 "偏移量和长度" 这个指针final long offsetAndLen = segments[0].getLong(fieldOffset);// 根据指针信息去变长区读取实际的字符串数据return MemorySegmentUtils.readBinaryString(segments, offset, fieldOffset, offsetAndLen);}
// ... existing code ...
总结
- 不存储类型是为了极致的性能和空间效率。如果每一行都存储自己的 Schema,会带来巨大的空间浪费和序列化/反序列化开销。
- Paimon 依赖于在处理的上下文(Context)中传递
RowType
。无论是表的 Reader、Writer,还是各种函数,在初始化时都会被赋予它们需要处理的数据的RowType
。 - 序列化和反序列化过程是严格对称的。必须使用完全相同的
RowType
来写入和读取,否则就会导致数据解析错误。
这种设计是所有现代高性能数据处理框架(包括 Flink、Spark 等)的共同选择。