HBase Compaction HFile 可见性和并发安全性分析
HStore
HStore
本身不直接记录一个“目录”的概念,但它管理着与特定列族相关的所有 StoreFiles。这些 StoreFiles 存储在 HDFS 上的特定目录中,通常位于 Region 目录下的列族目录中。HStore
通过 StoreFileTracker
(如 StoreFileTrackerFactory.create
) 来跟踪和管理这些文件。StoreFileTracker
负责记录和更新 StoreFiles 的元数据,包括它们在文件系统中的位置。
- 在
HStore
的构造函数中,会初始化StoreFileManager
,它负责管理 StoreFiles 的生命周期,包括添加、删除和获取 StoreFiles。StoreFileManager
会与StoreFileTracker
交互,以确保文件列表是最新的。 HStore
还通过archiveLock
和相关方法(如closeAndArchiveCompactedFiles
和removeCompactedfiles
)来管理已压缩文件的归档,确保这些文件被正确地移动到归档目录并从活动文件列表中移除。
Compaction 如何与 HStore 交互?
- Compaction 触发:
HStore
通过requestCompaction
方法来触发 compaction。该方法会选择合适的 StoreFiles 进行 compaction,并将这些文件添加到filesCompacting
列表中,以防止它们被其他操作干扰。 - Compaction 执行:实际的 compaction 逻辑由
CompactionPipeline
和Compactor
实现。HStore
通过compact
方法来协调整个 compaction 过程。该方法会创建一个临时文件来存储 compaction 结果,然后将输入文件移动到归档目录,并将临时文件移动到 Store 的目录中。 - Compaction 完成:在 compaction 完成后,
HStore
会调用replaceStoreFiles
方法来更新 StoreFiles 列表,将旧的输入文件替换为新的输出文件。同时,它会更新相关的统计信息和指标,如compactedCellsCount
和compactedOutputFileSize
。 - Compaction 失败处理:如果 compaction 过程中发生错误,
HStore
会尝试回滚操作,确保数据一致性。例如,在compact
方法中,如果写入 WAL 记录失败,它会删除临时文件并恢复原始文件。 - Compaction 监控:
HStore
提供了getCompactionProgress
方法来获取当前 compaction 的进度,并通过finishCompactionRequest
和cancelRequestedCompaction
方法来管理 compaction 请求的完成和取消。
总结来说,HStore
通过 StoreFileManager
和 StoreFileTracker
来管理 StoreFiles 的目录和元数据,并通过一系列方法(如 requestCompaction
, compact
, replaceStoreFiles
等)来协调和执行 compaction 过程。
HStore 如何利用 storeEngine
的锁机制保证可见性和安全性
1. 读操作方面
在 HStore
类中,读操作(如 getScanners
方法)通过获取 storeEngine
的读锁来保证可见性和安全性:
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)throws IOException {Collection<HStoreFile> storeFilesToScan;List<KeyValueScanner> memStoreScanners;this.storeEngine.readLock();try {storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,includeStartRow, stopRow, includeStopRow, onlyLatestVersion);memStoreScanners = this.memstore.getScanners(readPt);// NOTE: here we must increase the refCount for storeFiles because we would open the// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484// for more details.HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);} finally {this.storeEngine.readUnlock();}try {// First the store file scanners// TODO this used to get the store files in descending order,// but now we get them in ascending order, which I think is// actually more correct, since memstore get put at the end.List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);scanners.addAll(sfScanners);// Then the memstore scannersscanners.addAll(memStoreScanners);return scanners;} catch (Throwable t) {clearAndClose(memStoreScanners);throw t instanceof IOException ? (IOException) t : new IOException(t);} finally {HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);}
}
在读操作中,storeEngine.readLock()
被调用以获取读锁,这确保了在读取 StoreFile 列表时不会发生并发修改。读锁允许多个读操作同时进行,但会阻止写操作。在读取完成后,通过 storeEngine.readUnlock()
释放读锁。
2. 写操作方面
在 HStore
类中,写操作(如 replaceStoreFiles
方法)通过获取 storeEngine
的写锁来保证可见性和安全性:
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,boolean writeCompactionMarker) throws IOException {storeEngine.replaceStoreFiles(compactedFiles, result, () -> {if (writeCompactionMarker) {writeCompactionWalRecord(compactedFiles, result);}}, () -> {synchronized (filesCompacting) {filesCompacting.removeAll(compactedFiles);}});// These may be null when the RS is shutting down. The space quota Chores will fix the Region// sizes later so it's not super-critical if we miss these.RegionServerServices rsServices = region.getRegionServerServices();if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {updateSpaceQuotaAfterFileReplacement(rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),compactedFiles, result);}
}
replaceStoreFiles
方法调用了 storeEngine.replaceStoreFiles
,其实现如下:
public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)throws IOException {storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),StoreUtils.toStoreFileInfo(newFiles));walMarkerWriter.run();writeLock();try {storeFileManager.addCompactionResults(compactedFiles, newFiles);actionUnderLock.run();} finally {writeUnlock();}
}
在写操作中,storeEngine.writeLock()
被调用以获取写锁,这确保了在修改 StoreFile 列表时不会有其他读或写操作同时进行。写锁是独占的,它会阻止所有其他读和写操作。在修改完成后,通过 storeEngine.writeUnlock()
释放写锁。
StoreEngine 增加文件也是使用写锁
/*** Add the store files to store file manager, and also record it in the store file tracker.* <p/>* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under* the lock protection. Usually this is for clear the memstore snapshot.*/public void addStoreFiles(Collection<HStoreFile> storeFiles,IOExceptionRunnable actionAfterAdding) throws IOException {storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));writeLock();try {storeFileManager.insertNewFiles(storeFiles);actionAfterAdding.run();} finally {// We need the lock, as long as we are updating the storeFiles// or changing the memstore. Let us release it before calling// notifyChangeReadersObservers. See HBASE-4485 for a possible// deadlock scenario that could have happened if continue to hold// the lock.writeUnlock();}}
总结
HStore
通过 storeEngine
的读写锁机制来保证并发安全和可见性:
- 读操作:使用读锁 (
readLock
),允许多个读操作并发执行,但阻止写操作。 - 写操作:使用写锁 (
writeLock
),确保写操作的独占性,阻止所有其他读和写操作。
这种机制确保了在并发环境下,StoreFile 列表的读取和修改操作是安全的,并且能够保证数据的一致性和可见性。
Compaction 执行层级
1. HStore 级别触发
核心机制:Compaction 主要在 HStore 级别进行触发和管理
存储关联:每个列族(Column Family)对应一个 HStore 实例
触发判断:通过
HStore.needsCompaction()
方法决定是否需要执行 compaction
2. Region 级别协调
多 Store 管理:单个 HRegion 包含多个 HStore,由 Region 层级协调各 Store 的 compaction
启动检查:在 HRegionServer 中,region 打开时自动检查 compaction 需求:
if (!r.isReadOnly()) {for (HStore s : r.stores.values()) {if (s.hasReferences() || s.needsCompaction()) {this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");}} }
3. Region Server 级别调度
定期检测:
CompactionChecker
定期扫描所有 region 的 stores,触发必要 compaction任务执行:通过
CompactSplit
线程池异步执行 compaction 任务
Compaction 协调机制
核心检测逻辑
// CompactionChecker 定期检查实现
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;@Overrideprotected void chore() {for (HRegion hr : this.instance.onlineRegions.values()) {if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {continue;}for (HStore s : hr.stores.values()) {if (s.needsCompaction()) { // 检查 compaction 必要性this.instance.compactSplitThread.requestSystemCompaction(hr, s, "Periodic compaction" // 提交系统级 compaction 请求);}}}}
}
可能存在的问题及解决方案
问题类型 | 具体表现 | 解决方案 |
---|---|---|
并发控制 | Compaction 与 region 关闭操作冲突 | 使用 |
资源竞争 | 多 compaction 任务争抢系统资源 | 通过 |
跨 Region 协调 | 各 Region 独立管理导致资源分配不均 | 设计上保持 Region 独立性(简化实现),但可能引发局部资源竞争 |
总结
HBase Compaction 采用 多层级协同设计:
触发检测:Store 级别实时监测 compaction 需求
执行调度:Region Server 级别统一协调任务分配与执行
资源管理:通过线程池并发控制和优先级机制优化资源利用率
该设计通过分层解耦实现了 灵活性 与 可靠性 的平衡,避免了单点故障风险,是经过验证的高效架构方案。