hbase-memstore flush剖析
memstore是hbase中一个非常重要的组件,对于hbase的读写操作的性能起到举足轻重的作用,下面讲从memstore概述、memstore flush触发条件、memstore flush流程、memstore flush流程源码四个方面对memstore进行说明
memstore概述
一个RegionServer上对应多个Region,一个Region对应多个Store,一个Store对应一个Memstore和多个HFile,几者之间的关系如下图
Regionserve框架图
hbase写操作首先写入wal,然后写入Memstore,当达到Memstore flush的条件之后将Memstore的数据批量写入磁盘,生成一个新的Hfile文件。这种先写Memstore内存后批量写入磁盘的方式大大提升了hbase的写入性能。hbase读操作首先检查数据是否在Memstore中,未命中然后再到blockcache中查找,如果还没有命中则到Hfile中去查找,最后merge一个结果返回。由于新写入的数据会在Memstore中,而新写入的数据被读取的概率在大多场景中是比较频繁的。可见Memstore在hbase的读写中扮演着多么重要的角色。
memstore flush触发条件
Memstore flush是以Region作为单位,而不是单个Memstore,当满足条件需要进行Memstore flush时会获取该Region上满足条件的store进行Memstore flush,这就是为什么官网建议一个表不要定义太多列簇(一个列簇对应一个store),当多个列簇中的一个列簇对应的store中的Memstore达到了flush条件,会导致该Region上其他store中的Memstore也会进行flush,从而导致flush之后生成Hfile小文件。
以下6个条件满足其一就会触发Memstore flush
1)Region上的所有memstore总和大于的值(默认128M)。具体源码可以查看:()方法。
2)如果region上总的memstore的大小大于blockingMemStoreSize = *,写入操作会报错RegionTooBusyException。具体源码可以查看:()方法
3)周期性(周期值:)线程检查Regionserver上所有的store,当store中memestore的最后一个操作时间与当前时间相差(默认1小时)则需要进行flush 。具体源码可以查看:()
4)RegionServer上的所有memstore的总内存大于globalMemStoreLimitLowMark,则会进行flush,并阻塞,直到所有memstore的总内存小于globalMemStoreLimitLowMark。具体源码可以查看:()方法。
globalMemStoreLimit 和globalMemStoreLimitLowMark 需要明确:
globalMemStoreLimit = heap*(老版本)
globalMemStoreLimitLowMark = globalMemStoreLimit*globalMemStoreLimitLowMarkPercent(.lower.limit或 老版本)
5)当RegionServer中Hlog的数量达到上限,则会选取最早一个Hlog对应的一个或多个Region进行flush。具体源码可以查看:()方法.
6)手动执行flush,通过shell命令 flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。
memstore flush流程
Memstore flush使用的是生产-消费模式,当达到memstore flush的条件会生产一条flush request到DelayQueue队列(DelayQueue详情请看“多线程并发编程15-DelayQueue源码剖析”),消费线程(默认2个,可以通过参数进行修改)持续从DelayQueue中获取flush request请求进行flush。flush的流程如图所示:
Memstore flush 流程图
1)达到“Memstoreflush触发条件”这节所描述的6个条件之一,则发送一个flush request到并发队列DelayQueue中。
2)消费线程持续从并发队列DelayQueue中获取flush request进行flush。
3)获取Region证据拍每个需要flush的store,如果为强制flush则获取所有的store,否则根据选择策略选取满足条件的store。
两种store选择策略:
(默认),选取store中Memstore大于指定阈值(默认16M)的store,如果没有大于指定阈值的store则返回所有的store
,选取所有的store。
4)prepare阶段,创建一个Memstore的快照snapshot,将当前状态下的Memstore的CellSkipListSet赋值为snapshot,并创建一个新的CellSkipListSet,之后写入Memstore的数据就存放在新的CellSkipListSet中。在整个flush阶段读操作会分别遍历snapshot和新的CellSkipListSet。prepare阶段需要获取锁,对写请求阻塞,由于该阶段并没有耗时操作,阻塞的时间很短。
5)flush阶段,遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。
6)commit阶段,遍历所有的Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。
memstore flush流程源码
下面只贴出memstore flush主要流程的源码,具体的请到源码中进行查看。
flushcache:
Memstore flush入口函数。
public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
......
try {
// (1)获取需要flush的store,如果为强制flush则获取所有的store,否则根据选择策略选取满足条件的store
// 两种store选择策略
//(默认),选取store中memstore大于指定阈值(默认16M)的store,如果没有大于指定阈值的store则返回所有的store
//,选取所有的store
Collection<Store> specificStoresToFlush =
forceFlushAllStores ? () : ();
//(2)将选取出的store进行flush。
FlushResult fs = internalFlushcache(specificStoresToFlush,
status, writeFlushRequestWalMarker);
......
}
internalFlushcache:
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
//(1)进行Prepare阶段
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
//(2)进行flush阶段和commit阶段
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}
snapshot:
准备阶段最主要的就是进行快照的生成。
public MemStoreSnapshot snapshot() {
......
this.snapshotId = EnvironmentEdgeManager.currentTime();
this.snapshotSize = keySize();
if (!()) {
//(1)将 当前状态的Memstore的cellset赋值给snapshot
this.snapshot = this.cellSet;
//(2)创建一个新的CellSkipListSet赋值给Memstore的cellSet
this.cellSet = new CellSkipListSet();
.......
}
internalFlushCacheAndCommit
flush阶段和commit阶段。
protected FlushResult internalFlushCacheAndCommit(
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
final Collection<Store> storesToFlush)
throws IOException {
......
// (1)在tmp目录下创建文件,将快照中数据写入该文件中
for (StoreFlushContext flush : ()) {
(status);
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
Iterator<Store> it = ();
// () and storeFlushCtxs have same order
for (StoreFlushContext flush : ()) {
//(2)将tmp目录下的临时文件移动到指定cf目录下,将Hfile添加到StoreFileManager中,并清除快照
boolean needsCompaction = (status);
if (needsCompaction) {
compactionRequested = true;
}
byte[] storeName = ().getFamily().getName();
List<Path> storeCommittedFiles = ();
(storeName, storeCommittedFiles);
// Flush committed no files, indicating flush is empty or flush was canceled
if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
totalFlushableSizeOfFlushableStores -= (storeName);
}
flushedOutputFileSize += ();
}
();
......
}