您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“從log4j2到Disruptor的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“從log4j2到Disruptor的示例分析”這篇文章吧。
從日志工廠(Log4jLoggerFactory)中獲取日志Logger實例
從日志上下文工廠(Log4jContextFactory)獲取日志上下文
啟用日志上下文(AsyncLoggerContext)
啟動Disruptor(AsyncLoggerDisruptor)
序列號屏障(ProcessingSequenceBarrier)等待序列號發布
等待策略(WaitStrategy)等待序列號
返回Logger等待序列號(即等待日志寫入)
異步日志(AsyncLogger)寫入
日志內容與轉化者(RingBufferLogEventTranslator)綁定
Disruptor嘗試發布轉化者tryPublish
RingBuffer嘗試發布事件tryPublishEvent
獲取下一個可用序號
轉化并發布序號,日志與序號對應的事件綁定,并發布序號
RingBuffer發布序號,MultiProducerSequencer發布
等待策略(waitStrategy)喚醒阻塞
AsyncLoggerDisruptor
創建事件工廠EventFactory
計算ringBufferSize:AsyncLogger.RingBufferSize屬性
創建等待策略:AsyncLogger.WaitStrategy屬性
創建守護線程執行器executor
創建異步隊列滿時處理策略AsyncQueueFullPolicy(非Disruptor步驟)
創建Disruptor
創建RingBuffer與Disruptor綁定
RingBuffer根據生產者類型創建對應的實例,例如多生產者:MultiProducerSequencer
創建多生產者序號(bufferSize,waitStrategy)
綁定異常句柄(Disruptor.handleExceptionsWith)
綁定事件處理句柄(Disruptor.handleEventsWith)
根據handle列表創建事件處理器createEventProcessors
RingBuffer為Sequence(MultiProducerSequencer)序列創建序列屏障ProcessingSequenceBarrier
創建事件批處理器BatchEventProcessor
為事件批處理器綁定異常處理句柄
消費者倉庫(consumerRepository)添加消費者,創建事件處理信息EventProcessorInfo添加至消費者信息列表consumerInfos
RingBuffer添加處理序列號列表processorSequences為序列號閘
如果存在序列號屏障,從閘門中移除屏障序列號并標識endOfChain為false
啟動Disruptor
遍歷消費者倉庫放入執行器中執行消費者EventProcessorInfo
啟動事件批處理器BatchEventProcessor
事件批處理器序列號自增1
死循環
序列號屏障ProcessingSequenceBarrier等待下個有效序列號,默認為超時等待策略,超時會繼續下輪循環
事件批處理器序列號如果小于等于有效序列號
從RingBuffer中按照序列號獲取event事件
通知回調事件句柄eventHandler.onEvent如果當前消費下標等于有效序列號availableSequence說明是當前批次的最后一個消息,endOfBatch為true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
事件批處理器序列號設置為有效序列號
嘗試發布tryPublish事件轉化器EventTranslator:RingBufferLogEventTranslator
Disruptor獲取RingBuffer嘗試發布事件tryPublishEvent
序列號獲取下個有效序號,步進為1,例如:MultiProducerSequencer.tryNext
游標按照步進移動
判斷是否有足夠的空間,沒有則拋出InsufficientCapacityException異常
返回有效序列號
轉化器轉化消息為對應有效序列號的事件放入entries
發布序列號
設置有效序列號至緩存availableBuffer
等待策略喚醒阻塞waitStrategy.signalAllWhenBlocking
紅色數字標識流程為獲取logger時Disruptor創建消費者流程
黑色數字標識流程為logger寫入日志時Disruptor創建事件并通知消費者流程
RingBuffer對于所有消費者、生產者是同一個實例
環形隊列,dataProvide,數據的存儲與提供者
Sequencer:生產者
對于所有消費者、生產者(可能是多生產者序列類型對于Multi類型)是同一個實例,包含一個游標序列號Sequence
SequenceBarrier:序列號屏障
對于所有消費者、生產者也是同一個實例,序列號屏障包含一個等待策略、一個RingBuffer引用、一個游標序列號、一個依賴序列號(可能是組序列號類型)
BatchEventProcessor:消費者
消費者包含一個RingBuffer引用
一個序列號屏障,可以包含多個屏障序列號,默認為0個則使用RingBuffer的MultiProducerSequencer的游標序列號Sequence
一個EventHandler:RingBufferLogEventHandler
遍歷EventHandler列表將其封裝為BatchEventProcessor,將其與原始eventHandler、barrier屏障注冊至消費者資源庫consumerRepository。
獲取batchEventProcessor序列號默認為-1,將其緩存至processorSequences標識正在處理,并將processorSequences、disruptor、consumerRepository綁定至EventHandlerGroup。Disruptor啟動遍歷消費者資源庫啟動消費者:BatchEventProcessor
消費者入口
消費者消費前先自增本地序列號(即-1+1=0序號),向序列號屏障申請該序列號的消費,默認為Timeout策略申請。
屏障收到申請waitFor序列號,當前屏障游標序列號小于申請的消費序列號,等待生產者生產至當前序列號,如果超時則拋出異常(本地序列號不更新繼續重試);如果沒有超時,將屏障的dependentSequence序列號(如果不是非多序列號屏障類型,log4j2使用的是非多序列號屏障,則是屏障的本地游標)賦值為availableSequence返回。
如果availableSequence有效的序列號(即屏障的游標序列號)小于申請要消費的序列號直接返回availableSequence(即消費超出的生產的速度,消費者申請的序列號向后回移至有效序列號)。否則getHighestPublishedSequence判斷申請的序列號至availableSequence序列號之間的每個序列號對應的消息事件均是有效的則返回有效序列號(即生產者生產很快,消費者申請消費的序列號很小,向前移動至有效的,可能是本身也可能會跳躍多個下標),根據生產者的availableBuffer判斷是否有效,因為生產者先發布序列號再寫入數據,此處避免了讀取數據異常,如果數據沒有寫入,有效序列號緩存標識沒有寫入(即無效),消費者會進行剛剛所說的“重試”,如果之間存在無效序列號則返回申請序列號-1(即回滾一個值,進入邏輯時增加了一個值,也就是回滾至申請前的點,可以理解為與超時相同,即重試)
如果申請的序列號小于等于有效的序列號,則消費序列號對應的消息事件并更新本地BatchEventProcessor的序列號,按照下標去dataProvide(RingBuffer.entries)中提取對應位置的數據消費
如果申請的序列號大于有效的序列號,則將消費者本地序列號設置為有效序列號(即消費超出的生產的速度,消費的序列號向后回移)
如果期間出現任何未catch住的異常則會跳過當前下標,異常出現時的下標及對應的事件會交由exceptionHandler處理,默認為AsyncLoggerDefaultExceptionHandler異步處理,會將異常事件輸出至系統的標準錯誤管道,雖然是異步也是會占用消費者線程池資源
Disruptor:生產者入口
獲取RingBuffer嘗試發布消息,生產者(例如:MultiProducerSequencer)
生產者游標序列號嘗試自增,判斷當前是否有足夠的空間,當前游標+步進-bufferSize是否大于最小的閘門序列號(gatingSequences,即:所有消費者的本地游標序列號processorSequences列表),最小序列號會緩存至本地gatingSequenceCache用于下次判斷減少進行所有閘門序列號的遍歷次數,如果是說明已經沒有空間(因為生產者生產申請的序列號已經追上了消費者消費序列號的最小值。RingBuffer是一個環形隊列結構。上面已經講到消費者序列號會與生產者序列號同步,同步指消費者申請序列號小于有效序列號時會前進至有效序列號,即使有延遲也保證了有大于等于buffer值的緩沖空間供生產者生產),如果沒有空間返回false進入下一輪生產
自增成功后,將消息轉化為對應序列號下標位置的事件數據
Sequencer發布序列號,將當前序列號設置為有效(availableBuffer),并根據等待策略喚醒等待的消費者,被喚醒的消費者根據發布的序列號獲取相應下標處事件數據進行處理
Disruptor采用無鎖并發編程,框架中主要使用CAS與volatile關鍵字保證并發安全
使用環形數據結構(另一個典型的應用是時鐘算法也是使用的環形數據結構),為環形結構添加序列號屏障來控制對環形隊列讀寫操作,保證存儲數據的并發安全
另一個點便是神奇的緩沖行填充了
使用Disruptor并發編程框架
使用NIO寫入日志數據
當然log4j2中有很多細節,如果我們想要獲取線程棧信息,可以同樣學習一下這樣的寫法
// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace(). final StackTraceElement[] stackTrace = new Throwable().getStackTrace(); StackTraceElement last = null; for (int i = stackTrace.length - 1; i > 0; i--) { final String className = stackTrace[i].getClassName(); if (fqcnOfLogger.equals(className)) { return last; } last = stackTrace[i]; }
以上是“從log4j2到Disruptor的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。