您好,登錄后才能下訂單哦!
本期內容:
1、ReceiverTracker的架構設計
2、消息循環系統
3、ReceiverTracker具體實現
上節課講到了Receiver是如何不斷的接收數據的,并且接收到的數據的元數據會匯報給ReceiverTracker,下面我們看看ReceiverTracker具體的功能及實現。
ReceiverTracker主要的功能:
在Executor上啟動Receivers。
停止Receivers 。
更新Receiver接收數據的速度(也就是限流)
不斷的等待Receivers的運行狀態,只要Receivers停止運行,就重新啟動Receiver,也就是Receiver的容錯功能。
接受Receiver的注冊。
借助ReceivedBlockTracker來管理Receiver接收數據的元數據。
匯報Receiver發送過來的錯誤信息
ReceiverTracker 管理了一個消息通訊體ReceiverTrackerEndpoint,用來與Receiver或者ReceiverTracker 進行消息通信。
在ReceiverTracker的start方法中,實例化了ReceiverTrackerEndpoint,并且在Executor上啟動Receivers。
啟動Receivr,其實是ReceiverTracker給ReceiverTrackerEndpoint發送了一個本地消息,ReceiverTrackerEndpoint將Receiver封裝成RDD以job的方式提交給集群運行。
Receiver啟動后,會向ReceiverTracker注冊,注冊成功才算正式啟動了。
當Receiver端接收到數據,達到一定的條件需要將數據寫入BlockManager,并且將數據的元數據匯報給ReceiverTracker。
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
當ReceiverTracker收到元數據后,會在線程池中啟動一個線程來寫數據
case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) }
數據的元數據是交由ReceivedBlockTracker管理的
數據最終被寫入到streamIdToUnallocatedBlockQueues中,一個流對應一個數據塊信息的隊列。
每當Streaming 觸發job時,會將隊列中的數據分配成一個batch,并將數據寫入timeToAllocatedBlocks數據結構。
下面是簡單的流程圖:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。