您好,登錄后才能下訂單哦!
本篇內容介紹了“KAFKA是如何處理延時任務的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
首先,我們需要了解一下kafka中大概有哪些需要延時的任務,該怎么查看呢?
很簡單,kafka的設計都是基于接口的,那么我們只需要找到延時任務的頂層接口,然后看一下該接口有哪些實現類就知道有哪些延時任務了。
頂層抽象類接口是:DelayedOperation
對應的子類:
DelayedHeartbeat:就是用于做消費者心跳超時檢測的;
DelayedProduce:就是做生產者設置ack=-1時需要等待所有副本確認寫入成功的;
DelayedFetch:就是在消費的時候該分區沒有數據,需要去做延時等待;
DelayedJoin:就是去做消費者加組的時候,在JOIN階段需要延時等待。
這個答案已經在標題中就已經回答了,就是時間輪。
那么時間輪在kafka中是如何實現的呢?
kafka中的時間輪本體是一個20長度數組,不過內部持有上層數組的一個引用,數組中每個元素都是一個List,存放處于這個時間段的所有任務。
最后將這些有任務的List引用,放入DelayQueue來實現時間的流動,每次從DelayQueue中取出到期的List進行對應的操作。
翻譯一下:
就是原本把所有延時任務都一股腦全部放入DelayQueue中,實在是太多了,由于DelayQueue底層數據結構是小頂堆,插入和刪除的時間復雜度都是 O(nlog(n)),
n代表的具體任務的數量,當n值非常大時,對應的性能就很差,不能滿足一個高性能中間件的要求。于是就想了個辦法減小n的個數,
就是把原來的一個個延時任務,通過時間區間來封裝成一個List,把List作為一個基本單位存入到DelayQueue中,那么這一樣一來,就能把插入和刪除的時間復雜度
從O(nlog(n))降低到接近O(1)[這里為什么是近似O(1)呢?你可以理解為時間輪是一個類hash表的結構],除此之外,最重要的就是大大減小了DelayQueue中元素的個數n,
因為一層時間輪就20個List,10層也就才200個,所以對于這么小數量的元素個數,DelayQueue是完全能hold的住的。
總結一下:
其實時間輪的設計思想就是批處理的思想,把一批任務根據時間區間封裝成一個List,最后把List放到DelayQueue中去實現輪轉的效果。
優化點主要是兩個,一個是插入/刪除的時間復雜度由O(nlog(n))降低到了近似O(1),第二個是大大減小了DelayQueue元素的個數。
了解設計思想,我們再看看實現原理:
1、核心函數:加入Task到時間輪中
分為三步:
如果任務已經超期就返回false
如果任務在自己的時間跨度內,就計算應該放入哪個桶中(在哪個時間區間);如果桶沒在DelayQueue中則加入到DelayQueue中去。
如果任務的超時時間超過了自己的時間跨度,就往上層時間傳,直到找到一個滿足時間跨度的時間輪。
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // 被取消 // Cancelled false } else if (expiration < currentTime + tickMs) { // 已經過期 // Already expired false } else if (expiration < currentTime + interval) { // 在有效期內 // Put in its own bucket val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)// Set the bucket expiration time // 設置超時時間,如果該桶已經設置了超時時間則說明已經存在于DelayQueue中了 // 如果不存在超時時間,則需要將當前桶加入DelayQueue中 if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) }true } else { // 超過了當前層時間輪的時間跨度 需要向上層時間輪傳遞,如果上層不存在則新建 // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry) }}
2、時間輪如何推進?
每一個DelayedOperationPurgatory,都有一個線程expirationReaper,去負責推進時間輪,如果當前沒有task到期就掛起200ms等待。
如果有task到期,就取出對應的桶,然后將桶中的數據全都執行reinsert,也就是從最底層的時間輪重新執行一遍add操作。
/** * A background reaper to expire delayed operations that have timed out */private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { override def doWork() { advanceClock(200L) } }
def advanceClock(timeoutMs: Long): Boolean = { // 從延時隊列中取出到期的桶 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) {writeLock.lock()try { // 一次性把到期的全部取出來 while (bucket != null) { // 時間輪的時間推進 timingWheel.advanceClock(bucket.getExpiration())// 把桶中的所有數據都拿去執行reinsert函數 // 本質就是去執行addTimerTaskEntry(timerTaskEntry) bucket.flush(reinsert)bucket = delayQueue.poll() } } finally { writeLock.unlock() }true } else {false } }
3、到期的任務如何執行?
其實就是接著上面的源碼,當任務到期之后,reinsert函數會返回false,代表已經超期/被取消了,每個DelayedOperationPurgatory又有一個單線程的taskExecutor,
超期的任務就提交到線程池中去執行即可。
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // 如果時間輪添加返回false則說明超期/被取消了,直接提交到自己的單線程線程池中去執行該task if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) } }
4、整個流程的運行圖
整個流程概括下來,就是業務代碼想TimingWheel執行add,提交任務;
TimingWheel找到合適的時間輪后插入對應的桶中,并將桶放入DelayQueue中;
DelayedOperationPurgatory組件中存在收割線程,去不停從DelayQueue中poll對應到期的task;
最后task重新執行reinsert,如果超期了就提交到taskExecutor中去執行對應的業務handler邏輯。
這個答案在第二小節的時候其實已經給出的,在這里進行一個總結:
1、DelayQueue底層數據結構是小頂堆,插入和刪除的時間復雜度都是 O(nlog(n)),因此面對大量的延時操作時,該結構無法滿足kafka高性能的要求。
2、時間輪采用批處理的思想將任務按照區間進行封裝,形成一類類似hash表的結構,讓插入/刪除的時間復雜度降低為O(1),并且大大減小了DelayQueue元素的個數。
另外補充一點,kafka中每一種延時場景都會創建單獨的時間輪,一個時間輪里只存放一種類型的延時任務,因為不同Task在超期/完成的時候需要執行的邏輯是不一樣的,
需要一一對應去執行。舉個栗子,心跳延時場景有自己的heartbeatPurgatory,生產延時有自己的delayedProducePurgatory,以此類推。
1、HEARTBEAT請求的處理
從源碼中我們可以知道,心跳的維護和會話的超時,kafka的實現非常巧妙。
通常情況下,心跳3s發一次,session超時時間是10s;
kafka就在收到HEARTBEAT請求之后,就先創建一個DelayedHeartbeat延時任務,超時時間就是對應的session.timeout值即10s;
如果在10s內又收到了對應consumer的HEARTBEAT請求,就將上次提交的延時任務完成;
如果在10s內沒有收到對應consumer的HEARTBEAT請求,則任務consumer出問題了,就去執行對應的超期邏輯。
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) // 完成上次的延時任務 heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline // 服務端能拿到這個session.timeout,然后根據這個時間生成一個延時任務, // 例如30s,如果這么長時間么有收到心跳請求,則認為消費者出了問題,就踢掉以后執行rebalance。 val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) }
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat() }
2、DelayedHeartbeat任務超期后的邏輯
這一塊也很簡單,就是去執行coordinator.onExpireHeartbeat函數,
具體邏輯就是打印一個標識日志:Member xxx has failed,這個日志為什么要單獨講呢?因為這個是我們排查消費者問題的時候的核心日志;
我們在看server.log的時候,如果查到某個消費組的消費者出現這個日志,那么我們就能肯定這個消費組的這個消費者是因為會話超時的原因被剔除了;
從而我們就可以繼續往下分析這個消費者掉線的原因是因為消費者進程掛了?或者是客戶端機器負載太高而心跳線程是守護線程優先級比較低拿不到CPU資源?
等等一系列定位線索。這個日志主要是用于定位消費者出問題,以及消費組rebalance原因的,是非常重要的一個標識日志!
講完日志,我們就可以看到后續就是去開啟rebalance,因為消費者個數變了,需要重新去進行分區分配,已經故障轉移。
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { // 標識日志 info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member)} } }
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) { group.remove(member.memberId) group.currentState match {case Dead | Empty => case Stable | CompletingRebalance => maybePrepareRebalance(group) case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } }
“KAFKA是如何處理延時任務的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。