当前位置: 首页 > java >正文

Paimon的部分更新以及DeleteVector实现

背景

本文基于 Paimon 0.9
出于对与Paimon内部的DeleteVctor的实现以及部分更新的实现进行的源码阅读。
关于 DeleteVector的介绍可以看这里

说明

对于Paimon来说无论是Spark中使用还是Flink使用,后面的逻辑都是一样的,所以我们以Spark为例来说。所以我们会参考类 org.apache.paimon.spark.SparkSource,
对于Flink可以参考org.apache.paimon.flink.FlinkTableFactory
如没特别说明,这里都是以主键表来进行说明。

paimon的部分字段更新

这里主要的场景更多的是多流或者多批写同一个表字段的场景,且每个流或批只更新某几个字段(同样的主键),具体的配置或说明参考Partial Update
这里涉及到的方法为 SparkTableWrite.write,最终会到MergeTreeWriter.write:

 @Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
  • writeBuffer.put 主要是往buffer中写数据
    这里的writeBufferSortBufferWriteBuffer类实例。
    这里会 主键+sequenceNumber+valueKind + value 的形式写入数据
  • flushWriteBuffer 这里就会涉及到数据落盘以及部分更新的逻辑:
      writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);
    
    • mergeFunction 这里的函数就是会在MergeTreeWriter初始化,也就是会初始化为PartialUpdateMergeFunction
    • 对于forEach的实现会构建一个 MergeIterator,在这里面会调用 PartialUpdateMergeFunction.add方法
      这里就会涉及到部分更新的逻辑,主要就是:把按照 主键+sequenceNumber 排序好的数据传给PartialUpdateMergeFunction
      这样PartialUpdateMergeFunction只需要判断前后两个的数据的主键是否一致来进行更新。
      具体的更新逻辑见: Partial Update
      new MergeIterator(awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
      
      这里的buffer.sortedIterator主要看SortBufferWriteBuffer构造方法(也就是为什么会按照主键+sequenceNumber排序):
       public SortBufferWriteBuffer(RowType keyType,RowType valueType,@Nullable FieldsComparator userDefinedSeqComparator,MemorySegmentPool memoryPool,boolean spillable,MemorySize maxDiskSize,int sortMaxFan,CompressOptions compression,IOManager ioManager) {...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);sortFields = IntStream.concat(sortFields, udsFields);}// sequence fieldsortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();// row typeList<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());fieldTypes.add(new BigIntType(false));fieldTypes.add(new TinyIntType(false));fieldTypes.addAll(valueType.getFieldTypes());NormalizedKeyComputer normalizedKeyComputer =CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);RecordComparator keyComparator =CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);...InternalRowSerializer serializer =InternalSerializers.create(KeyValue.schema(keyType, valueType));BinaryInMemorySortBuffer inMemorySortBuffer =BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
      
      其中IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())) 就会会把sequenceNumber这个字段带入到排序中去,
      也就是在buffer.sortedIterato方法中调用。
      如果有定义sequence.field,那这里面的字段也会参与排序,见:udsFields 字段

DeleteVector的实现

关于deleteVector的实现,可以参考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup的机制产生 DeleteVector:

  • 当一个记录不属于 level0层的话,就不会产生DelectVector
  • 当一个记录只属于需要进行compaction的level的话,就不会产生DeleteVector
  • 当一个记录只属于 level0层的话,就要去查询不包含 Compaction的层的文件数据,从而产生DeleteVector
    注意: deleteVector只支持主键表, 是属于bucket级别的,一个bucket一个DeleteVector。

DeleteVector的写

按照以上的说法,只有在Compaction的时候,才会产生DeleteVector,所以 我们直接到达 MergeTreeWriter.flushWriteBuffer,这里涉及到DeleteVector的数据流如下:

compactManager.triggerCompaction(forcedFullCompaction)||\/
submitCompaction||\/
MergeTreeCompactTask.doCompact||\/rewrite  ||\/
rewriteImpl ||\/
LookupMergeTreeCompactRewriter.rewrite ||\/
rewriteOrProduceChangelog||\/
createMergeWrapper||\/
iterator.next()||\/
RecordReaderIterator.next()||\/
advanceIfNeeded||\/
currentIterator.next() ||\/
SortMergeIterator.next()||\/
LookupChangelogMergeFunctionWrapper.add(winner)||\/
LookupChangelogMergeFunctionWrapper.getResult()
  • 这里MergeTreeCompactTask.doCompact写完之后,会有result.setDeletionFile(compactDfSupplier.get())
    compactDfSupplier 这里的源自submitCompaction方法中的compactDfSupplier构造:

     if (dvMaintainer != null) {compactDfSupplier =lazyGenDeletionFile? () -> CompactDeletionFile.lazyGeneration(dvMaintainer): () -> CompactDeletionFile.generateFiles(dvMaintainer);}
    

    而这里的deleteVector的产生来自LookupChangelogMergeFunctionWrapper.getResult(),见以下说明

  • 这里的LookupMergeTreeCompactRewriter.rewriteLookupMergeTreeCompactRewriter实例是在创建MergeTreeWriter

     CompactManager compactManager =createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
    

    这里会调用createRewriter方法创建LookupMergeTreeCompactRewriter实例,
    其中会根据lookupStrategy来创建该实例:

     public LookupStrategy lookupStrategy() {return LookupStrategy.from(mergeEngine().equals(MergeEngine.FIRST_ROW),changelogProducer().equals(ChangelogProducer.LOOKUP),deletionVectorsEnabled(),options.get(FORCE_LOOKUP));
    
  • 这里 currentIterator.next() 是 通过调用currentIterator = SortMergeReaderWithLoserTree.readBatch获取的,而SortMergeReaderWithLoserTree 是通过readerForMergeTree方法获取的

  • 这里LookupChangelogMergeFunctionWrapper.getResult()才是重点

     @Overridepublic ChangelogResult getResult() {// 1. Compute the latest high level record and containLevel0 of candidatesLinkedList<KeyValue> candidates = mergeFunction.candidates();Iterator<KeyValue> descending = candidates.descendingIterator();KeyValue highLevel = null;boolean containLevel0 = false;while (descending.hasNext()) {KeyValue kv = descending.next();if (kv.level() > 0) {descending.remove();if (highLevel == null) {highLevel = kv;}} else {containLevel0 = true;}}// 2. Lookup if latest high level record is absentif (highLevel == null) {InternalRow lookupKey = candidates.get(0).key();T lookupResult = lookup.apply(lookupKey);if (lookupResult != null) {if (lookupStrategy.deletionVector) {PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;highLevel = positionedKeyValue.keyValue();deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition());} else {highLevel = (KeyValue) lookupResult;}}}// 3. Calculate resultKeyValue result = calculateResult(candidates, highLevel);// 4. Set changelog when there's level-0 recordsreusedResult.reset();if (containLevel0 && lookupStrategy.produceChangelog) {setChangelog(highLevel, result);}return reusedResult.setResult(result);} 
    
  • 这里主要说明 lookup.apply的方法,其中 lookup的 构造是在createLookupChangelogMergeFunctionWrapper构造中:

       @Overridepublic MergeFunctionWrapper<ChangelogResult> create(MergeFunctionFactory<KeyValue> mfFactory,int outputLevel,LookupLevels<T> lookupLevels,@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {return new LookupChangelogMergeFunctionWrapper<>(mfFactory,key -> {try {return lookupLevels.lookup(key, outputLevel + 1);} catch (IOException e) {throw new UncheckedIOException(e);}},valueEqualiser,changelogRowDeduplicate,lookupStrategy,deletionVectorsMaintainer,userDefinedSeqComparator);}}
    

    这里的lookupLevels.lookup 会最终调用createLookupFile 方法构造LookupFile 实例,
    其中会调用 valueProcessor.persistToDisk(kv, batch.returnedPosition()方法,持久化 行号到对应的文件,
    这样就能获取到对应的行号。

  • 获取到对应的结果 lookupResult 后
    调用 deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()方法去构造
    DeletionVector.
    上面提到的result.setDeletionFile(compactDfSupplier.get())会调用 CompactDeletionFile.generateFiles(dvMaintainer) 方法
    从而调用maintainer.writeDeletionVectorsIndex方法,从而写如到DeleteVector文件中。

DeleteVector的读

DeleteVector的读取主要在以下方法中构造:PrimaryKeyFileStoreTable.newRead:
最终会调用RawFileSplitRead.createReader从而调用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)方法构造ApplyDeletionVectorReader实例:

 public RecordIterator<InternalRow> readBatch() throws IOException {RecordIterator<InternalRow> batch = reader.readBatch();if (batch == null) {return null;}checkArgument(batch instanceof FileRecordIterator,"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");return new ApplyDeletionFileRecordIterator((FileRecordIterator<InternalRow>) batch, deletionVector);}

该处的readBatch方法会构造一个ApplyDeletionFileRecordIterator迭代器,可见在next()方法会对每一个记录调用deletionVector.isDeleted是否删除的判断.

 @Overridepublic InternalRow next() throws IOException {while (true) {InternalRow next = iterator.next();if (next == null) {return null;}if (!deletionVector.isDeleted(returnedPosition())) {return next;}}}

FAQ

写入文件的时候,怎么记录行号和主键的关系?

这里不会写入的时候记录行号,会在调用createLookupFile 在构建 LookupFile这个文件的时候(初始化),从parquet文件读取过来的时候,就会获取行号。

http://www.xdnf.cn/news/16136.html

相关文章:

  • 使用阿里云 ESA 边缘函数转发代理 docker registry
  • Vue TodoList案例
  • day060-zabbix监控各种客户端
  • Android网络请求,Retrofit,OKHttp学习
  • 在AI深度嵌入企业业务的当下——AI时代的融合数据库
  • 【Vue3】ECharts图表案例
  • 跟著Qcadoo MES系统学习产品设计001
  • [CH582M入门第十步]蓝牙从机
  • Redis的key过期策略
  • 基于多种机器学习的水质污染及安全预测分析系统的设计与实现【随机森林、XGBoost、LightGBM、SMOTE、贝叶斯优化】
  • 【前沿技术动态】【AI总结】RustFS:从 0 到 1 打造下一代分布式对象存储
  • Linux网络-------1.socket编程基础---(UDP-socket)
  • 基于Tornado的WebSocket实时聊天系统:从零到一构建与解析
  • Zookeeper学习专栏(八):使用高级客户端库Apache Curator
  • 《计算机网络》实验报告七 HTTP协议分析与测量
  • Qwen3-Code-480B-A35B-instruct模型开源当天“舆情分析”
  • @Repository与@Mapper核心区别详解
  • OpenCV 图像预处理:颜色操作与灰度、二值化处理详解
  • Modbus TCP转Devicenet:水泥厂PLC与多类仪表的自动化通信实践
  • javaSE(List集合ArrayList实现类与LinkedList实现类)day15
  • 如何Visual Studio 的配置从 Qt-Debug 切换到 x64-Debug
  • 本地运行C++版StableDiffusion!开源应用StableVerce发布
  • 垃圾回收介绍
  • (LeetCode 面试经典 150 题 ) 228. 汇总区间 (数组)
  • Ubuntu 1804 编译ffmpeg qsv MediaSDK libva 遇到的问题记录
  • 计算机网络学习----域名解析
  • 牛油果褐变的成因与食用安全
  • 棱镜技术在光谱相机中应用
  • JVM、Dalvik、ART区别
  • JVM、Dalvik、ART垃圾回收机制