您好,登錄后才能下訂單哦!
本篇內容主要講解“ReceiverTracker怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“ReceiverTracker怎么實現”吧!
ReceiverTacker類如下,從源碼注釋可以看出該類的作用。
管理ReceiverInputDStreams的執行,記錄Receiver發來的元數據信息。ReceiverTacker類構造時必須傳入StreamingContext對象。
ReceiverTacker類內部有ReceiverTackerEndpoint這個消息通信體,用于和運行在Executor端的ReceiverSupervisorImpl進行通信,包括Receiver的注冊,重啟Receiver,清除之前的Block數據,更新限流值,添加Block元數據信息等消息。
接下來以接收到來自Executor端的ReceiverSupervisorImpl發來添加元數據信息的AddBlock消息,進行講解具體的處理流程。
ReceivedBlockInfo類包含了StreamID,Block中記錄條數,元數據Metadata,接收Block的存儲結果(BlockID和記錄數量)
ReceiverBlockTracker類是addBlock方法的具體實現。
1.調用ReceiverBlockTracker的writeToLog方法
2.調用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues為HashMap,Key為StreamID,Value為ReceivedBlockQueue。而ReceivedBlockQueue 的定義為private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
ReceiverBlockTracker類,可以從源碼中看出,他會記錄所有接收到的Block信息,根據需要把Block分配給Batch。如果設置了checkpoint,開啟WAL,則會把所有的操作保存到預寫日志中,因此當Driver失敗后就可以從checkpoint和WAL中恢復ReceiverTracker的狀態。
ReceiverBlockTracker類中重要的方法,allocateBlocksToBatch。private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]存儲批處理時刻,分配到的Blocks數據。
該方法是被ReceiverTracker調用的。
而ReceiverTracker的allocateBlocksToBatch方法是被JobGenerator的generateJobs方法調用的。
ReceiverBlockTracker類中重要的方法,getBlocksOfBatch。
該方法是被ReceiverTracker的getBlocksOfBatch調用。
ReceiverTracker的getBlocksOfBatch方法是被ReceiverInputDStream的compute方法調用的。
到此,相信大家對“ReceiverTracker怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。