如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈
引言:
上篇文章“HBase1.x精通:詳解HBase讀緩存BlockCache(一)”主要講解了HBase塊緩存的機制和三種策略,我們生產環境當前用的版本是HBase1.2.5,它默認的讀緩存策略是LruBlockCache,下面我就結合HBase1.2.5源碼深入剖析LruBlockCache的實現。
1.BlockCache初始化
當每個HRegionserver線程通過函數run()啟動時,調用函數handleReportForDutyResponse()運行初始化。設置WAL並啟動所有服務器線程。
<code> /** * The HRegionServer sticks in this loop until closed. */ @Override public void run() { try { //Regionserver線程註冊初始化;zookeeper註冊,租借線程,等等。. preRegistrationInitialization(); } catch (Throwable e) { abort("Fatal exception during initialization", e); } try { if (!isStopped() && !isAborted()) { ShutdownHook.install(conf, fs, this, Thread.currentThread()); // Set our ephemeral znode up in zookeeper now we have a name. createMyEphemeralNode(); // 創建臨時節點後,初始化RegionServerCoprocessorHost, // 以防任何協處理器想要使用ZooKeeper this.rsHost = new RegionServerCoprocessorHost(this, this.conf); } //向HMaster申請登記; //如果服務器停止了,或者clusterup標誌關閉了,或者hdfs出了問題,則關閉 while (keepLooping()) { RegionServerStartupResponse w = reportForDuty(); if (w == null) { LOG.warn("reportForDuty failed; sleeping and then retrying."); this.sleeper.sleep(); } else { //這裡開始調用實現LruBlockCache的初始化工作 handleReportForDutyResponse(w); break; } } ..................................... }/<code>
接下來調用函數handleReportForDutyResponse(),依次調用函數startHeapMemoryManager(),調用變量HeapMemoryManager的函數create(),最後通過CacheConfig.instantiateBlockCache(conf)完成緩存的初始化。
HRegionServer有一個HeapMemoryManager類型的成員變量,用於管理RegionServer進程的堆內存,HeapMemoryManager中的blockCache就是RegionServer中的讀緩存,它的初始化在CacheConfig的instantiateBlockCache方法中完成。
<code> /** * Returns the block cache or <code>null/<code> in case none should be used. * Sets GLOBAL_BLOCK_CACHE_INSTANCE * * @param conf The current configuration. * @return The block cache or <code>null/<code>. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; if (blockCacheDisabled) return null; //一級緩存 LruBlockCache l1 = getL1(conf); // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. if (blockCacheDisabled) return null; //二級緩存 BlockCache l2 = getL2(conf); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); //判斷是否啟用了外部二級緩存:hbase.blockcache.use.external=true啟用 if (useExternal) { GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { if (combinedWithLru) { GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler // mechanism. It is a little ugly but works according to the following: when the // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get // a block from the L1 cache, if not in L1, we will search L2. GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; }}/<code>
上面有兩行比較重要的代碼:
LruBlockCache l1 = getL1(conf);
//獲取一級緩存LruBlockCache默認為HRegionsever堆內存的40%,對應參數:hfile.block.cache.size
BlockCache l2 = getL2(conf);
//這是個什麼東西呢?
從HBase-1.1.0起,在使用方式上可以單獨使用LRUBlockCache,或者組合起來使用,多級緩存的方式。LruBlockCache為一級緩存,BucketCache或者ExternalBlockCache為二級緩存。HBase可以使用memcached作為外部BlockCache,這是一個在設備失效或者升級時不會發生完全的冷緩存的很好的特性。用句通俗的話講,就是HBase出現故障或者升級時,緩存輕易不會丟失。
代碼21行中對useExternal和combinedWithLru的判斷,如果指定了useExternal為true,則結合memcached等外部緩存與BlockCache一起使用。如果指定了combinedWithLru,則結合bucketCache,也就是堆外內存與BlockCache一起使用。在上述兩種情況下,BlockCache用於存放索引等元數據,真實的數據文件則緩存在memcached或bucketCache中。
二級緩存可通過配置以下兩個參數啟動:
hbase.blockcache.use.external為true 開啟二級緩存
並配置hbase.cache.memcached.servers來指明memcached servers.
當 hbase.bucketcache.combinedcache.enable 為false。在這種模式下,當L1緩存內容被清除(置換)時,會將置換出的塊放入L2。當一個塊被緩存時,首先被緩存在L1。當我們去查詢一個緩存塊時,首先在L1查,若是沒找到,則再搜索L2。我們將此部署方法稱為Raw L1+L2。需要注意的是,這個L1+L2模式已經在hbase 2.0.0 以後被移除了。
2.BlockCache實現
BlockCache基於客戶端對數據的訪問頻率,定義了三個不同的優先級,如下所示:
SINGLE:如果Block被第一次訪問,則該Block被放在這一優先級隊列中;
MULTI:如果一個Block被多次訪問,則從single移到Multi中;
MEMORY:memory優先級由用戶指定,一般不推薦,只用系統表才使用memory優先級;
LruBlockCache內部是通過一個ConcurrentHashMap來保存所有cache的block的
<code>/** Concurrent map (the cache) */ private final Map<blockcachekey> map;/<blockcachekey>/<code>
Block塊加入緩存的實現是在函數cacheBlock()中,
<code> /** * Cache the block with the specified name and buffer. ** It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) * this can happen, for which we compare the buffer contents. * @param cacheKey block's cache key * @param buf block buffer * @param inMemory if block is in-memory * @param cacheDataInL1 */ @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, final boolean cacheDataInL1) { if (buf.heapSize() > maxBlockSize) { // If there are a lot of blocks that are too // big this can make the logs way too noisy. // So we log 2% if (stats.failInsert() % 50 == 0) { LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " + maxBlockSize); } return; } LruCachedBlock cb = map.get(cacheKey); if (cb != null) { // compare the contents, if they are not equal, we are in big trouble if (compare(buf, cb.getBuffer()) != 0) { throw new RuntimeException("Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); } String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; LOG.warn(msg); return; } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); if (currentSize >= hardLimitSize) { stats.failInsert(); if (LOG.isTraceEnabled()) { LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many." + " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); } if (!evictionInProgress) { runEviction(); } return; } cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); long val = elements.incrementAndGet(); if (LOG.isTraceEnabled()) { long size = map.size(); assertCounterSanity(size, val); } if (newSize > currentAcceptableSize && !evictionInProgress) { runEviction(); } }
/<code>
代碼大體邏輯如下:
1).這裡假設不會對同一個已經被緩存的BlockCacheKey重複放入cache操作;
2).根據inMemory標誌創建不同類別的CachedBlock對象:若inMemory為true則創建BlockPriority.MEMORY類型,否則創建BlockPriority.SINGLE;注意,這裡只有這兩種類型的Cache,因為BlockPriority.MULTI在Cache Block被重複訪問時才進行創建。
3).將BlockCacheKey和創建的CachedBlock對象加入到前文說過的ConcurrentHashMap中,同時更新log&metrics上的計數;
4).最後判斷如果加入新block後cache size大於設定的臨界值且當前沒有淘汰線程運行,則調用runEviction()方法啟動LRU淘汰線程。
緩存數據的獲取是在方法getBlock()中實現的
<code>/** * Get the buffer of the block with the specified name. * @param cacheKey block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) * @param updateCacheMetrics Whether to update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { LruCachedBlock cb = map.get(cacheKey); if (cb == null) { if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary()); // If there is another block cache then try and read there. // However if this is a retry ( second time in double checked locking ) // And it's already a miss then the l2 will also be a miss. if (victimHandler != null && !repeat) { Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); // Promote this to L1. if (result != null && caching) { cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true); } return result; } return null; } if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary()); cb.access(count.incrementAndGet()); return cb.getBuffer(); }/<code>
代碼邏輯大體如下:
1、首先,從LruBlockCache的map中直接獲取;
2、如果map中沒有,則在victimHandler存在且!repeat的情況下,通過victimHandler的getBlock()方法獲取並緩存到LruBlockCache中,即綜合考慮第二種緩存模式,並同步到第一種緩存中;
3、如果1或2能夠獲取到數據,更新統計數據,且通過緩存塊的access方法,更新訪問時間accessTime,將可能的BlockPriority.SINGLE升級為BlockPriority.MULTI;
BlockCache的LRU淘汰過程,主要是通過EvictionThread線程實現的
線程啟動後調用wait被阻塞住,直到函數evict()調用notifyAl()才開始執行。envict()函數代碼如下:
<code> /** * Eviction method. */ void evict() { // Ensure only one eviction at a time if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size.get(); long bytesToFree = currentSize - minSize(); if (LOG.isTraceEnabled()) { LOG.trace("Block cache LRU eviction started; Attempting to free " + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); } if(bytesToFree <= 0) return; // Instantiate priority buckets BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE: { bucketSingle.add(cachedBlock); break; } case MULTI: { bucketMulti.add(cachedBlock); break; } case MEMORY: { bucketMemory.add(cachedBlock); break; } } } long bytesFreed = 0; if (forceInMemory || memoryFactor > 0.999f) { long s = bucketSingle.totalSize(); long m = bucketMulti.totalSize(); if (bytesToFree > (s + m)) { // this means we need to evict blocks in memory bucket to make room, // so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); if (LOG.isTraceEnabled()) { LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); if (LOG.isTraceEnabled()) { LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); } } else { // this means no need to evict block in memory bucket, // and we try best to make the ratio between single-bucket and // multi-bucket is 1:2 long bytesRemain = s + m - bytesToFree; if (3 * s <= bytesRemain) { // single-bucket is small enough that no eviction happens for it // hence all eviction goes from multi-bucket bytesFreed = bucketMulti.free(bytesToFree); } else if (3 * m <= 2 * bytesRemain) { // multi-bucket is small enough that no eviction happens for it // hence all eviction goes from single-bucket bytesFreed = bucketSingle.free(bytesToFree); } else { // both buckets need to evict some blocks bytesFreed = bucketSingle.free(s - bytesRemain / 3); if (bytesFreed < bytesToFree) { bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); } } } } else { PriorityQueue<blockbucket> bucketQueue = new PriorityQueue<blockbucket>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; BlockBucket bucket; while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } } if (LOG.isTraceEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); LOG.trace("Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }/<blockbucket>/<blockbucket>/<code>
具體邏輯如下:
1).先獲取鎖,保證同一時間只有一個線程執行;
2).計算當前緩存總大小currentSize,以及需要清理的緩存大小bytesToFree ,如果bytesToFree為0,直接返回;
3).創建三個隊列,存放Single、Multi和InMemory類Block Cache,其中每個BlockBucket維護了一個CachedBlockQueue,按LRU淘汰算法維護該BlockBucket中的所有CachedBlock對象;
4).遍歷記錄所有Block Cache的全局ConcurrentHashMap,加入到相應的BlockBucket隊列中;
5).將以上三個BlockBucket隊列加入到一個優先級隊列中,按照各個BlockBucket超出bucketSize的大小順序排序(見BlockBucket的compareTo方法);
6) 遍歷優先級隊列,對於每個BlockBucket,通過Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)計算出需要釋放的空間大小,這樣做可以保證儘可能平均地從三個BlockBucket中釋放指定的空間;具體實現過程詳見BlockBucket的free方法,從其CachedBlockQueue中取出即將被淘汰掉的CachedBlock對象。
閱讀更多 JasonLu1986 的文章