您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關怎樣深度剖析Kafka Producer的緩沖池機制,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
在新版的 Kafka Producer 中,設計了一個消息緩沖池,在創建 Producer 時會默認創建一個大小為 32M 的緩沖池,也可以通過 buffer.memory 參數指定緩沖池的大小,同時緩沖池被切分成多個內存塊,內存塊的大小就是我們創建 Producer 時傳的 batch.size 大小,默認大小 16384,而每個 Batch 都會包含一個 batch.size 大小的內存塊,消息就是存放在內存塊當中。整個緩沖池的結構如下圖所示:
客戶端將消息追加到對應主題分區的某個 Batch 中,如果 Batch 已經滿了,則會新建一個 Batch,同時向緩沖池(RecordAccumulator)申請一塊大小為 batch.size 的內存塊用于存儲消息。
當 Batch 的消息被發到了 Broker 后,Kafka Producer 就會移除該 Batch,既然 Batch 持有某個內存塊,那必然就會涉及到 GC 問題。
頻繁的申請內存,用完后就丟棄,必然導致頻繁的 GC,造成嚴重的性能問題。那么,Kafka 是怎么做到避免頻繁 GC 的呢?
前面說過了,緩沖池在設計邏輯上面被切分成一個個大小相等的內存塊,當消息發送完畢,歸還給緩沖池不就可以避免被回收了嗎?
緩沖池的內存持有類是 BufferPool,我們先來看下 BufferPool 都有哪些成員:
public class BufferPool { // 總的內存大小 private final long totalMemory; // 每個內存塊大小,即 batch.size private final int poolableSize; // 申請、歸還內存的方法的同步鎖 private final ReentrantLock lock; // 空閑的內存塊 private final Deque<ByteBuffer> free; // 需要等待空閑內存塊的事件 private final Deque<Condition> waiters; /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ // 緩沖池還未分配的空閑內存,新申請的內存塊就是從這里獲取內存值 private long nonPooledAvailableMemory; // ...}
從 BufferPool 的成員可看出,緩沖池實際上由一個個 ByteBuffer 組成的,BufferPool 持有這些內存塊,并保存在成員 free 中,free 的總大小由 totalMemory 作限制,而 nonPooledAvailableMemory 則表示還剩下緩沖池還剩下多少內存還未被分配。
當 Batch 的消息發送完畢后,就會將它持有的內存塊歸還到 free 中,以便后面的 Batch 申請內存塊時不再創建新的 ByteBuffer,從 free 中取就可以了,從而避免了內存塊被 JVM 回收的問題。
接下來跟大家一起分析申請內存和歸還內存是如何實現的。
1、申請內存
申請內存的入口:
org.apache.kafka.clients.producer.internals.BufferPool#allocate
1)內存足夠的情況
當用戶請求申請內存時,如果發現 free 中有空閑的內存,則直接從中取:
if (size == poolableSize && !this.free.isEmpty()){ return this.free.pollFirst(); }
這里的 size 即申請的內存大小,它等于 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
即如果你的消息大小小于 batchSize,則申請的內存大小為 batchSize,那么上面的邏輯就是如果申請的內存大小等于 batchSize 并且 free 不空閑,則直接從 free 中獲取。
我們不妨想一下,為什么 Kafka 一定要申請內存大小等于 batchSize,才能從 free 獲取空閑的內存塊呢?
前面也說過,緩沖池的內存塊大小是固定的,它等于 batchSize,如果申請的內存比 batchSize 還大,說明一條消息所需要存放的內存空間比內存塊的內存空間還要大,因此不滿足需求,不滿組需求怎么辦呢?我們接著往下分析:
// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size;}
freeListSize:指的是 free 中已經分配好并且已經回收的空閑內存塊總大小;
nonPooledAvailableMemory:緩沖池還未分配的空閑內存,新申請的內存塊就是從這里獲取內存值;
this.nonPooledAvailableMemory + freeListSize:即緩沖池中總的空閑內存空間。
如果緩沖池的內存空間比申請內存大小要大,則調用 freeUp(size); 方法,接著將空閑的內存大小減去申請的內存大小。
private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); }
freeUp 這個方法很有趣,它的思想是這樣的:
如果未分配的內存大小比申請的內存還要小,那只能從已分配的內存列表 free 中將內存空間要回來,直到 nonPooledAvailableMemory 比申請內存大為止。
2)內存不足的情況
在我的「Kafka Producer 異步發送消息居然也會阻塞?」這篇文章當中也提到了,當緩沖池的內存塊用完后,消息追加調用將會被阻塞,直到有空閑的內存塊。
阻塞等待的邏輯是怎么實現的呢?
// we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition();try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); recordWaitTime(timeNs); } if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } }
以上源碼的大致邏輯:
首先創建一個本次等待 Condition,并且把它添加到類型為 Deque 的 waiters 中(后面在歸還內存中會喚醒),while 循環不斷收集空閑的內存,直到內存比申請內存大時退出,在 while 循環過程中,調用 Condition#await 方法進行阻塞等待,歸還內存時會被喚醒,喚醒后會判斷當前申請內存是否大于 batchSize,如果等與 batchSize 則直接將歸還的內存返回即可,如果當前申請的內存大于 大于 batchSize,則需要調用 freeUp 方法從 free 中釋放空閑的內存出來,然后進行累加,直到大于申請的內存為止。
2、歸還內存
申請內存的入口:
org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer, int)
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); }}
歸還內存塊的邏輯比較簡單:
如果歸還的內存塊大小等于 batchSize,則將其清空后添加到緩沖池的 free 中,即將其歸還給緩沖池,避免了 JVM GC 回收該內存塊。如果不等于呢?直接將內存大小累加到未分配并且空閑的內存大小值中即可,內存就無需歸還了,等待 JVM GC 回收掉,最后喚醒正在等待空閑內存的線程。
經過以上的源碼分析之后,給大家指出需要注意的一個問題,這會給 Producer 端帶來嚴重的性能影響:
如果你的消息大小比 batchSize 還要大,則不會從 free 中循環獲取已分配好的內存塊,而是重新創建一個新的 ByteBuffer,并且該 ByteBuffer 不會被歸還到緩沖池中(JVM GC 回收),如果此時 nonPooledAvailableMemory 比消息體還要小,還會將 free 中空閑的內存塊銷毀(JVM GC 回收),以便緩沖池中有足夠的內存空間提供給用戶申請,這些動作都會導致頻繁 GC 的問題出現。
因此,需要根據業務消息的大小,適當調整 batch.size 的大小,避免頻繁 GC。
上述就是小編為大家分享的怎樣深度剖析Kafka Producer的緩沖池機制了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。