您好,登錄后才能下訂單哦!
今天小編給大家分享一下Netty分布式ByteBuf怎么使用命中緩存分配的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
回顧上一小節的內容, 我們講到PoolThreadCache中維護了三個緩存數組(實際上是六個, 這里僅僅以Direct為例, heap類型的邏輯是一樣的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分別代表tiny類型, small類型和normal類型的緩存數組
這三個數組保存在PoolThreadCache的成員變量中:
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
其中是在構造方法中進行了初始化:
tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena);
private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }
這里上面的小節已經分析過, 這里創建了一個緩存數組, 這個緩存數組的長度,也就是numCaches, 在不同的類型, 這個長度不一樣, tiny類型長度是32, small類型長度為4, normal類型長度為3
我們知道, 緩存數組中每個節點代表一個緩存對象, 里面維護了一個隊列, 隊列大小由PooledByteBufAllocator類中的tinyCacheSize, smallCacheSize, normalCacheSize屬性決定的, 這里之前小節已經剖析過
其中每個緩存對象, 隊列中緩存的ByteBuf大小是固定的, netty將每種緩沖區類型分成了不同長度規格, 而每個緩存中的隊列緩存的ByteBuf的長度, 都是同一個規格的長度, 而緩沖區數組的長度, 就是規格的數量
比如, 在tiny類型中, netty將其長度分成32個規格, 每個規格都是16的整數倍, 也就是包含0B, 16B, 32B, 48B, 64B, 80B, 96B......496B總共32種規格, 而在其緩存數組tinySubPageDirectCaches中, 這每一種規格代表數組中的一個緩存對象緩存的ByteBuf的大小, 我們以tinySubPageDirectCaches[1]為例(這里下標選擇1是因為下標為0代表的規格是0B, 其實就代表一個空的緩存, 這里不進行舉例), 在tinySubPageDirectCaches[1]的緩存對象中所緩存的ByteBuf的緩沖區長度是16B, 在tinySubPageDirectCaches[2]中緩存的ByteBuf長度都為32B, 以此類推, tinySubPageDirectCaches[31]中緩存的ByteBuf長度為496B
有關類型規則的分配如下:
tiny
:總共32個規格, 均是16的整數倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B
small
:4種規格, 512b, 1k, 2k, 4k
nomal
:3種規格, 8k, 16k, 32k
這樣, PoolThreadCache中緩存數組的數據結構為
大概了解緩存數組的數據結構, 我們再繼續剖析在緩沖中分配內存的邏輯
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //規格化 final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判斷是不是tinty boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //緩存分配 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //通過tinyIdx拿到tableIdx tableIdx = tinyIdx(normCapacity); //subpage的數組 table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到對應的節點 final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默認情況下, head的next也是自身 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在緩存上進行內存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回 return; } //分配不成功, 做實際的內存分配 allocateNormal(buf, reqCapacity, normCapacity); } else { //大于這個值, 就不在緩存上分配 allocateHuge(buf, reqCapacity); } }
首先通過normalizeCapacity方法進行內存規格化
int normalizeCapacity(int reqCapacity) { if (reqCapacity < 0) { throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); } if (reqCapacity >= chunkSize) { return reqCapacity; } //如果>tiny if (!isTiny(reqCapacity)) { // >= 512 //找一個2的冪次方的數值, 確保數值大于等于reqCapacity int normalizedCapacity = reqCapacity; normalizedCapacity --; normalizedCapacity |= normalizedCapacity >>> 1; normalizedCapacity |= normalizedCapacity >>> 2; normalizedCapacity |= normalizedCapacity >>> 4; normalizedCapacity |= normalizedCapacity >>> 8; normalizedCapacity |= normalizedCapacity >>> 16; normalizedCapacity ++; if (normalizedCapacity < 0) { normalizedCapacity >>>= 1; } return normalizedCapacity; } //如果是16的倍數 if ((reqCapacity & 15) == 0) { return reqCapacity; } //不是16的倍數, 變成最大小于當前值的值+16 return (reqCapacity & ~15) + 16; }
if (!isTiny(reqCapacity)) 代表如果大于tiny類型的大小, 也就是512, 則會找一個2的冪次方的數值, 確保這個數值大于等于reqCapacity
如果是tiny, 則繼續往下
if ((reqCapacity & 15) == 0) 這里判斷如果是16的倍數, 則直接返回
如果不是16的倍數, 則返回 (reqCapacity & ~15) + 16 , 也就是變成最小大于當前值的16的倍數值
從上面規格化邏輯看出, 這里將緩存大小規格化成固定大小, 確保每個緩存對象緩存的ByteBuf容量統一
if(isTinyOrSmall(normCapacity)) 這里是根據規格化后的大小判斷是否tiny或者small類型, 我們跟到方法中:
boolean isTinyOrSmall(int normCapacity) { return (normCapacity & subpageOverflowMask) == 0; }
這里是判斷如果normCapacity小于一個page的大小, 也就是8k代表其實tiny或者small
繼續看allocate方法:
如果當前大小是tiny或者small, 則isTiny(normCapacity)判斷是否是tiny類型, 跟進去:
static boolean isTiny(int normCapacity) { return (normCapacity & 0xFFFFFE00) == 0; }
這里是判斷如果小于512, 則認為是tiny
再繼續看allocate方法:
如果是tiny, 則通過cache.allocateTiny(this, buf, reqCapacity, normCapacity)在緩存上進行分配
我們就以tiny類型為例, 分析在緩存上分配ByteBuf的流程
我們跟進去, 進入到了PoolThreadCache的allocateTiny方法中:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); }
這里有個方法cacheForTiny(area, normCapacity), 這個方法的作用是根據normCapacity找到tiny類型緩存數組中的一個緩存對象
我們跟進cacheForTiny:
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); }
PoolArena.tinyIdx(normCapacity)是找到tiny類型緩存數組的下標
繼續跟tinyIdx:
static int tinyIdx(int normCapacity) { return normCapacity >>> 4; }
這里直接將normCapacity除以16, 通過前面的內容我們知道, tiny類型緩存數組中每個元素規格化的數據都是16的倍數, 所以通過這種方式可以找到其下標, 參考圖5-2, 如果是16B會拿到下標為1的元素, 如果是32B則會拿到下標為2的元素
if (area.isDirect()) 這里判斷是否是分配堆外內存, 因為我們是按照堆外內存進行舉例, 所以這里為true
再繼續跟到cache(tinySubPageDirectCaches, idx)方法中:
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; }
這里我們看到直接通過下標的方式拿到了緩存數組中的對象
回到PoolThreadCache的allocateTiny方法中:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); }
拿到了緩存對象之后, 我們跟到allocate(cacheForTiny(area, normCapacity), buf, reqCapacity)方法中:
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { return false; } boolean allocated = cache.allocate(buf, reqCapacity); if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } return allocated; }
這里通過cache.allocate(buf, reqCapacity)進行繼續進行分配
再繼續往里跟, 跟到內部類MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法中:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true; }
這里首先通過queue.poll()這種方式彈出一個entry, 我們之前的小節分析過, MemoryRegionCache維護著一個隊列, 而隊列中的每一個值是一個entry
static final class Entry<T> { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; long handle = -1; //代碼省略 }
這里重點關注chunk和handle的這兩個屬性, chunk代表一塊連續的內存, 我們之前簡單介紹過, netty是通過chunk為單位進行內存分配的, 我們之后會對chunk進行剖析
handle相當于一個指針, 可以唯一定位到chunk里面的一塊連續的內存, 之后也會詳細分析
這樣, 通過chunk和handle就可以定位ByteBuf中指定一塊連續內存, 有關ByteBuf相關的讀寫, 都會在這塊內存中進行
我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true; }
彈出entry之后, 通過initBuf(entry.chunk, entry.handle, buf, reqCapacity)這種方式給ByteBuf初始化, 這里參數傳入我們剛才分析過的當前Entry的chunk和hanle
因為我們分析的tiny類型的緩存對象是SubPageMemoryRegionCache類型,所以我們繼續跟到SubPageMemoryRegionCache類的initBuf(entry.chunk, entry.handle, buf, reqCapacity)方法中:
protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, handle, reqCapacity); }
這里的chunk調用了initBufWithSubpage(buf, handle, reqCapacity)方法, 其實就是PoolChunk類中的方法
我們繼續跟initBufWithSubpage:
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) { initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity); }
這里有關bitmapIdx(handle)相關的邏輯, 會在后續的章節進行剖析, 這里繼續往里跟:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { assert bitmapIdx != 0; int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache()); }
這里我們先關注init方法, 因為我們是以PooledUnsafeDirectByteBuf為例, 所以這里走的是PooledUnsafeDirectByteBuf的init方法
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { super.init(chunk, handle, offset, length, maxLength, cache); initMemoryAddress(); }
首先調用了父類的init方法, 再跟進去:
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化 assert handle >= 0; assert chunk != null; //在哪一塊內存上進行分配的 this.chunk = chunk; //這一塊內存上的哪一塊連續內存 this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }
這里將PooledUnsafeDirectByteBuf的各個屬性進行了初始化
this.chunk = chunk 這里初始化了chunk, 代表當前的ByteBuf是在哪一塊內存中分配的
this.handle = handle 這里初始化了handle, 代表當前的ByteBuf是這塊內存的哪個連續內存
有關offset和length, 我們會在之后的小節進行分析, 在這里我們只需要知道, 通過緩存分配ByteBuf, 我們只需要通過一個chunk和handle, 就可以確定一塊內存
以上就是通過緩存分配ByteBuf對象的過程
我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true; }
分析完了initBuf方法, 再繼續往下看
entry.recycle()這步是將entry對象進行回收, 因為entry對象彈出之后沒有再被引用, 可能gc會將entry對象回收, netty為了將對象進行循環利用, 就將其放在對象回收站進行回收
我們跟進recycle方法
void recycle() { chunk = null; handle = -1; recyclerHandle.recycle(this); }
chunk = null和handle = -1表示當前Entry不指向任何一塊內存
recyclerHandle.recycle(this) 將當前entry回收, 有關對象回收站, 我們會在后面的章節詳細剖析
以上就是“Netty分布式ByteBuf怎么使用命中緩存分配”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。