当前位置: 首页 > news >正文

HBase Compaction HFile 可见性和并发安全性分析

HStore

HStore 本身不直接记录一个“目录”的概念,但它管理着与特定列族相关的所有 StoreFiles。这些 StoreFiles 存储在 HDFS 上的特定目录中,通常位于 Region 目录下的列族目录中。HStore 通过 StoreFileTracker (如 StoreFileTrackerFactory.create) 来跟踪和管理这些文件。StoreFileTracker 负责记录和更新 StoreFiles 的元数据,包括它们在文件系统中的位置。

  • 在 HStore 的构造函数中,会初始化 StoreFileManager,它负责管理 StoreFiles 的生命周期,包括添加、删除和获取 StoreFiles。StoreFileManager 会与 StoreFileTracker 交互,以确保文件列表是最新的。
  • HStore 还通过 archiveLock 和相关方法(如 closeAndArchiveCompactedFiles 和 removeCompactedfiles)来管理已压缩文件的归档,确保这些文件被正确地移动到归档目录并从活动文件列表中移除。

Compaction 如何与 HStore 交互?

  • Compaction 触发HStore 通过 requestCompaction 方法来触发 compaction。该方法会选择合适的 StoreFiles 进行 compaction,并将这些文件添加到 filesCompacting 列表中,以防止它们被其他操作干扰。
  • Compaction 执行:实际的 compaction 逻辑由 CompactionPipeline 和 Compactor 实现。HStore 通过 compact 方法来协调整个 compaction 过程。该方法会创建一个临时文件来存储 compaction 结果,然后将输入文件移动到归档目录,并将临时文件移动到 Store 的目录中。
  • Compaction 完成:在 compaction 完成后,HStore 会调用 replaceStoreFiles 方法来更新 StoreFiles 列表,将旧的输入文件替换为新的输出文件。同时,它会更新相关的统计信息和指标,如 compactedCellsCount 和 compactedOutputFileSize
  • Compaction 失败处理:如果 compaction 过程中发生错误,HStore 会尝试回滚操作,确保数据一致性。例如,在 compact 方法中,如果写入 WAL 记录失败,它会删除临时文件并恢复原始文件。
  • Compaction 监控HStore 提供了 getCompactionProgress 方法来获取当前 compaction 的进度,并通过 finishCompactionRequest 和 cancelRequestedCompaction 方法来管理 compaction 请求的完成和取消。

总结来说,HStore 通过 StoreFileManager 和 StoreFileTracker 来管理 StoreFiles 的目录和元数据,并通过一系列方法(如 requestCompactioncompactreplaceStoreFiles 等)来协调和执行 compaction 过程。

HStore 如何利用 storeEngine 的锁机制保证可见性和安全性

1. 读操作方面

在 HStore 类中,读操作(如 getScanners 方法)通过获取 storeEngine 的读锁来保证可见性和安全性:

public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)throws IOException {Collection<HStoreFile> storeFilesToScan;List<KeyValueScanner> memStoreScanners;this.storeEngine.readLock();try {storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,includeStartRow, stopRow, includeStopRow, onlyLatestVersion);memStoreScanners = this.memstore.getScanners(readPt);// NOTE: here we must increase the refCount for storeFiles because we would open the// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484// for more details.HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);} finally {this.storeEngine.readUnlock();}try {// First the store file scanners// TODO this used to get the store files in descending order,// but now we get them in ascending order, which I think is// actually more correct, since memstore get put at the end.List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);scanners.addAll(sfScanners);// Then the memstore scannersscanners.addAll(memStoreScanners);return scanners;} catch (Throwable t) {clearAndClose(memStoreScanners);throw t instanceof IOException ? (IOException) t : new IOException(t);} finally {HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);}
}

在读操作中,storeEngine.readLock() 被调用以获取读锁,这确保了在读取 StoreFile 列表时不会发生并发修改。读锁允许多个读操作同时进行,但会阻止写操作。在读取完成后,通过 storeEngine.readUnlock() 释放读锁。

2. 写操作方面

在 HStore 类中,写操作(如 replaceStoreFiles 方法)通过获取 storeEngine 的写锁来保证可见性和安全性:

@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,boolean writeCompactionMarker) throws IOException {storeEngine.replaceStoreFiles(compactedFiles, result, () -> {if (writeCompactionMarker) {writeCompactionWalRecord(compactedFiles, result);}}, () -> {synchronized (filesCompacting) {filesCompacting.removeAll(compactedFiles);}});// These may be null when the RS is shutting down. The space quota Chores will fix the Region// sizes later so it's not super-critical if we miss these.RegionServerServices rsServices = region.getRegionServerServices();if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {updateSpaceQuotaAfterFileReplacement(rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),compactedFiles, result);}
}

replaceStoreFiles 方法调用了 storeEngine.replaceStoreFiles,其实现如下:

public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)throws IOException {storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),StoreUtils.toStoreFileInfo(newFiles));walMarkerWriter.run();writeLock();try {storeFileManager.addCompactionResults(compactedFiles, newFiles);actionUnderLock.run();} finally {writeUnlock();}
}

在写操作中,storeEngine.writeLock() 被调用以获取写锁,这确保了在修改 StoreFile 列表时不会有其他读或写操作同时进行。写锁是独占的,它会阻止所有其他读和写操作。在修改完成后,通过 storeEngine.writeUnlock() 释放写锁。

StoreEngine 增加文件也是使用写锁

  /*** Add the store files to store file manager, and also record it in the store file tracker.* <p/>* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under* the lock protection. Usually this is for clear the memstore snapshot.*/public void addStoreFiles(Collection<HStoreFile> storeFiles,IOExceptionRunnable actionAfterAdding) throws IOException {storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));writeLock();try {storeFileManager.insertNewFiles(storeFiles);actionAfterAdding.run();} finally {// We need the lock, as long as we are updating the storeFiles// or changing the memstore. Let us release it before calling// notifyChangeReadersObservers. See HBASE-4485 for a possible// deadlock scenario that could have happened if continue to hold// the lock.writeUnlock();}}

总结

HStore 通过 storeEngine 的读写锁机制来保证并发安全和可见性:

  1. 读操作:使用读锁 (readLock),允许多个读操作并发执行,但阻止写操作。
  2. 写操作:使用写锁 (writeLock),确保写操作的独占性,阻止所有其他读和写操作。

这种机制确保了在并发环境下,StoreFile 列表的读取和修改操作是安全的,并且能够保证数据的一致性和可见性。


​Compaction 执行层级​

​1. HStore 级别触发​
  • ​核心机制​​:Compaction 主要在 HStore 级别进行触发和管理

  • ​存储关联​​:每个列族(Column Family)对应一个 HStore 实例

  • ​触发判断​​:通过 HStore.needsCompaction()方法决定是否需要执行 compaction

​2. Region 级别协调​
  • ​多 Store 管理​​:单个 HRegion 包含多个 HStore,由 Region 层级协调各 Store 的 compaction

  • ​启动检查​​:在 HRegionServer 中,region 打开时自动检查 compaction 需求:

    if (!r.isReadOnly()) {for (HStore s : r.stores.values()) {if (s.hasReferences() || s.needsCompaction()) {this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");}}
    }
​3. Region Server 级别调度​
  • ​定期检测​​:CompactionChecker定期扫描所有 region 的 stores,触发必要 compaction

  • ​任务执行​​:通过 CompactSplit线程池异步执行 compaction 任务


​Compaction 协调机制​

​核心检测逻辑​

// CompactionChecker 定期检查实现
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;@Overrideprotected void chore() {for (HRegion hr : this.instance.onlineRegions.values()) {if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {continue;}for (HStore s : hr.stores.values()) {if (s.needsCompaction()) {  // 检查 compaction 必要性this.instance.compactSplitThread.requestSystemCompaction(hr, s, "Periodic compaction"  // 提交系统级 compaction 请求);}}}}
}

​可能存在的问题及解决方案​

​问题类型​

​具体表现​

​解决方案​

​并发控制​

Compaction 与 region 关闭操作冲突

使用 writestate.compacting计数器同步状态;region 关闭时等待所有 compaction 完成

​资源竞争​

多 compaction 任务争抢系统资源

通过 CompactSplit线程池限制并发数;支持优先级调度确保关键任务优先执行

​跨 Region 协调​

各 Region 独立管理导致资源分配不均

设计上保持 Region 独立性(简化实现),但可能引发局部资源竞争


​总结​

HBase Compaction 采用 ​​多层级协同设计​​:

  1. ​触发检测​​:Store 级别实时监测 compaction 需求

  2. ​执行调度​​:Region Server 级别统一协调任务分配与执行

  3. ​资源管理​​:通过线程池并发控制和优先级机制优化资源利用率

该设计通过分层解耦实现了 ​​灵活性​​ 与 ​​可靠性​​ 的平衡,避免了单点故障风险,是经过验证的高效架构方案。

http://www.xdnf.cn/news/1381069.html

相关文章:

  • audioMAE模型代码分析
  • 流程控制语句(3)
  • 帕萨特盘式制动器cad+设计说明书
  • 【C语言16天强化训练】从基础入门到进阶:Day 13
  • week5-[一维数组]归并
  • 公共字段自动填充
  • 云计算学习100天-第29天
  • 基于SamOut的音频Token序列生成模型训练指南
  • Linux shell getopts 解析命令行参数
  • 算力沸腾时代,如何保持“冷静”?国鑫液冷SY4108G-G4解锁AI服务器的“绿色空调”!
  • 使用Rag 命中用户feedback提升triage agent 准确率
  • Elasticsearch数据迁移方案深度对比:三种方法的优劣分析
  • linu 网络 :TCP粘包及UDP
  • 【C++】C++11的右值引用和移动语义
  • STAGEWISE实战指南:从集成到使用的完整解决方案
  • vscode pyqt5设置
  • 【ai编辑器】使用cursor-vip获得cursor的pro版 pro plan(mac)
  • uniapp vue3 canvas实现手写签名
  • Flask测试平台开发,登陆重构
  • (二分查找)Leetcode34. 在排序数组中查找元素的第一个和最后一个位置+74. 搜索二维矩阵
  • 并发编程——05 并发锁机制之深入理解synchronized
  • 学习数据结构(13)二叉树链式结构下
  • 线程池及线程池单例模式
  • 带动态条件的模糊查询SQL
  • DINOv2 vs DINOv3 vs CLIP:自监督视觉模型的演进与可视化对比
  • LeetCode 3446. 按对角线进行矩阵排序
  • UE5提升分辨率和帧率的方法
  • 搭建私有云3步法:cpolar简化Puter本地云端配置
  • C# SIMD编程实践:工业数据处理性能优化案例
  • C++ 哈希概念版