HBase1.x精通:详解HBase读缓存BlockCache(二、源码剖析)

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈


引言:

上篇文章“HBase1.x精通:详解HBase读缓存BlockCache(一)”主要讲解了HBase块缓存的机制和三种策略,我们生产环境当前用的版本是HBase1.2.5,它默认的读缓存策略是LruBlockCache,下面我就结合HBase1.2.5源码深入剖析LruBlockCache的实现。


1.BlockCache初始化

当每个HRegionserver线程通过函数run()启动时,调用函数handleReportForDutyResponse()运行初始化。设置WAL并启动所有服务器线程。

<code> /** * The HRegionServer sticks in this loop until closed. */ @Override public void run() { try { //Regionserver线程注册初始化;zookeeper注册,租借线程,等等。. preRegistrationInitialization(); } catch (Throwable e) { abort("Fatal exception during initialization", e); } try { if (!isStopped() && !isAborted()) { ShutdownHook.install(conf, fs, this, Thread.currentThread()); // Set our ephemeral znode up in zookeeper now we have a name. createMyEphemeralNode(); // 创建临时节点后,初始化RegionServerCoprocessorHost, // 以防任何协处理器想要使用ZooKeeper this.rsHost = new RegionServerCoprocessorHost(this, this.conf); } //向HMaster申请登记; //如果服务器停止了,或者clusterup标志关闭了,或者hdfs出了问题,则关闭 while (keepLooping()) { RegionServerStartupResponse w = reportForDuty(); if (w == null) { LOG.warn("reportForDuty failed; sleeping and then retrying."); this.sleeper.sleep(); } else { //这里开始调用实现LruBlockCache的初始化工作 handleReportForDutyResponse(w); break; } } ..................................... }/<code>

接下来调用函数handleReportForDutyResponse(),依次调用函数startHeapMemoryManager(),调用变量HeapMemoryManager的函数create(),最后通过CacheConfig.instantiateBlockCache(conf)完成缓存的初始化。

HRegionServer有一个HeapMemoryManager类型的成员变量,用于管理RegionServer进程的堆内存,HeapMemoryManager中的blockCache就是RegionServer中的读缓存,它的初始化在CacheConfig的instantiateBlockCache方法中完成。

<code> /** * Returns the block cache or <code>null/<code> in case none should be used. * Sets GLOBAL_BLOCK_CACHE_INSTANCE * * @param conf The current configuration. * @return The block cache or <code>null/<code>. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; if (blockCacheDisabled) return null; //一级缓存 LruBlockCache l1 = getL1(conf); // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. if (blockCacheDisabled) return null; //二级缓存 BlockCache l2 = getL2(conf); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); //判断是否启用了外部二级缓存:hbase.blockcache.use.external=true启用 if (useExternal) { GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { if (combinedWithLru) { GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler // mechanism. It is a little ugly but works according to the following: when the // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get // a block from the L1 cache, if not in L1, we will search L2. GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; }}/<code>

上面有两行比较重要的代码:


LruBlockCache l1 = getL1(conf);

//获取一级缓存LruBlockCache默认为HRegionsever堆内存的40%,对应参数:hfile.block.cache.size

BlockCache l2 = getL2(conf);

//这是个什么东西呢?

从HBase-1.1.0起,在使用方式上可以单独使用LRUBlockCache,或者组合起来使用,多级缓存的方式。LruBlockCache为一级缓存,BucketCache或者ExternalBlockCache为二级缓存。HBase可以使用memcached作为外部BlockCache,这是一个在设备失效或者升级时不会发生完全的冷缓存的很好的特性。用句通俗的话讲,就是HBase出现故障或者升级时,缓存轻易不会丢失。

代码21行中对useExternal和combinedWithLru的判断,如果指定了useExternal为true,则结合memcached等外部缓存与BlockCache一起使用。如果指定了combinedWithLru,则结合bucketCache,也就是堆外内存与BlockCache一起使用。在上述两种情况下,BlockCache用于存放索引等元数据,真实的数据文件则缓存在memcached或bucketCache中。

二级缓存可通过配置以下两个参数启动:

hbase.blockcache.use.external为true 开启二级缓存

并配置hbase.cache.memcached.servers来指明memcached servers.


当 hbase.bucketcache.combinedcache.enable 为false。在这种模式下,当L1缓存内容被清除(置换)时,会将置换出的块放入L2。当一个块被缓存时,首先被缓存在L1。当我们去查询一个缓存块时,首先在L1查,若是没找到,则再搜索L2。我们将此部署方法称为Raw L1+L2。需要注意的是,这个L1+L2模式已经在hbase 2.0.0 以后被移除了。


2.BlockCache实现


BlockCache基于客户端对数据的访问频率,定义了三个不同的优先级,如下所示:

SINGLE:如果Block被第一次访问,则该Block被放在这一优先级队列中;

MULTI:如果一个Block被多次访问,则从single移到Multi中;

MEMORY:memory优先级由用户指定,一般不推荐,只用系统表才使用memory优先级;


LruBlockCache内部是通过一个ConcurrentHashMap来保存所有cache的block的

<code>/** Concurrent map (the cache) */ private final Map<blockcachekey> map;/<blockcachekey>/<code>

Block块加入缓存的实现是在函数cacheBlock()中,

<code> /** * Cache the block with the specified name and buffer. *

* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) * this can happen, for which we compare the buffer contents. * @param cacheKey block's cache key * @param buf block buffer * @param inMemory if block is in-memory * @param cacheDataInL1 */ @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, final boolean cacheDataInL1) { if (buf.heapSize() > maxBlockSize) { // If there are a lot of blocks that are too // big this can make the logs way too noisy. // So we log 2% if (stats.failInsert() % 50 == 0) { LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " + maxBlockSize); } return; } LruCachedBlock cb = map.get(cacheKey); if (cb != null) { // compare the contents, if they are not equal, we are in big trouble if (compare(buf, cb.getBuffer()) != 0) { throw new RuntimeException("Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); } String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; LOG.warn(msg); return; } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); if (currentSize >= hardLimitSize) { stats.failInsert(); if (LOG.isTraceEnabled()) { LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many." + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); } if (!evictionInProgress) { runEviction(); } return; } cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); long val = elements.incrementAndGet(); if (LOG.isTraceEnabled()) { long size = map.size(); assertCounterSanity(size, val); } if (newSize > currentAcceptableSize && !evictionInProgress) { runEviction(); } }

/<code>

代码大体逻辑如下:

1).这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

2).根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建。

3).将BlockCacheKey和创建的CachedBlock对象加入到前文说过的ConcurrentHashMap中,同时更新log&metrics上的计数;

4).最后判断如果加入新block后cache size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰线程。


缓存数据的获取是在方法getBlock()中实现的


<code>/** * Get the buffer of the block with the specified name. * @param cacheKey block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) * @param updateCacheMetrics Whether to update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { LruCachedBlock cb = map.get(cacheKey); if (cb == null) { if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary()); // If there is another block cache then try and read there. // However if this is a retry ( second time in double checked locking ) // And it's already a miss then the l2 will also be a miss. if (victimHandler != null && !repeat) { Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); // Promote this to L1. if (result != null && caching) { cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true); } return result; } return null; } if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary()); cb.access(count.incrementAndGet()); return cb.getBuffer(); }/<code>

代码逻辑大体如下:

1、首先,从LruBlockCache的map中直接获取;

2、如果map中没有,则在victimHandler存在且!repeat的情况下,通过victimHandler的getBlock()方法获取并缓存到LruBlockCache中,即综合考虑第二种缓存模式,并同步到第一种缓存中;

3、如果1或2能够获取到数据,更新统计数据,且通过缓存块的access方法,更新访问时间accessTime,将可能的BlockPriority.SINGLE升级为BlockPriority.MULTI;


BlockCache的LRU淘汰过程,主要是通过EvictionThread线程实现的

线程启动后调用wait被阻塞住,直到函数evict()调用notifyAl()才开始执行。envict()函数代码如下:

<code> /** * Eviction method. */ void evict() { // Ensure only one eviction at a time if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size.get(); long bytesToFree = currentSize - minSize(); if (LOG.isTraceEnabled()) { LOG.trace("Block cache LRU eviction started; Attempting to free " + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); } if(bytesToFree <= 0) return; // Instantiate priority buckets BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE: { bucketSingle.add(cachedBlock); break; } case MULTI: { bucketMulti.add(cachedBlock); break; } case MEMORY: { bucketMemory.add(cachedBlock); break; } } } long bytesFreed = 0; if (forceInMemory || memoryFactor > 0.999f) { long s = bucketSingle.totalSize(); long m = bucketMulti.totalSize(); if (bytesToFree > (s + m)) { // this means we need to evict blocks in memory bucket to make room, // so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); if (LOG.isTraceEnabled()) { LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); if (LOG.isTraceEnabled()) { LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); } } else { // this means no need to evict block in memory bucket, // and we try best to make the ratio between single-bucket and // multi-bucket is 1:2 long bytesRemain = s + m - bytesToFree; if (3 * s <= bytesRemain) { // single-bucket is small enough that no eviction happens for it // hence all eviction goes from multi-bucket bytesFreed = bucketMulti.free(bytesToFree); } else if (3 * m <= 2 * bytesRemain) { // multi-bucket is small enough that no eviction happens for it // hence all eviction goes from single-bucket bytesFreed = bucketSingle.free(bytesToFree); } else { // both buckets need to evict some blocks bytesFreed = bucketSingle.free(s - bytesRemain / 3); if (bytesFreed < bytesToFree) { bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); } } } } else { PriorityQueue<blockbucket> bucketQueue = new PriorityQueue<blockbucket>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; BlockBucket bucket; while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } } if (LOG.isTraceEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); LOG.trace("Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }/<blockbucket>/<blockbucket>/<code>

具体逻辑如下:

1).先获取锁,保证同一时间只有一个线程执行;

2).计算当前缓存总大小currentSize,以及需要清理的缓存大小bytesToFree ,如果bytesToFree为0,直接返回;

3).创建三个队列,存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

4).遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

5).将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象。