您好,登錄后才能下訂單哦!
本期內容:
1、數據接收架構設計模式
2、數據接收源碼徹底研究
1、Receiver接受數據的過程類似于MVC模式:
Receiver,ReceiverSupervisor和Driver的關系相當于Model,Control,View,也就是MVC。
Model就是Receiver,存儲數據Control,就是ReceiverSupervisor,Driver是獲得元數據,也就是View。
2、數據的位置信息會被封裝到RDD里面。
3、Receiver接受數據,交給ReceiverSupervisor去存儲數據。
4、ReceiverTracker是通過發送一個又一個的Job,每個Job只有一個Task,每個Task里面就只有一個ReceiverSupervisor,用這個函數啟動每一個Receiver。
下面我們簡單的看下Receiver啟動流程,應用程序首先通過JobScheduler的start方法來啟動receiverTracker的start方法:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() //receiver啟動 jobGenerator.start() logInfo("Started JobScheduler") }
通過調用receiverTracker.start()方法來進行一系列的操作:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,獲取receiver的狀態 if (!skipReceiverLaunch) launchReceivers() //啟動receiver logInfo("ReceiverTracker started") trackerState = Started } }
下面通過畫圖簡單的描述下Receiver啟動的內部機制:
參考博客:http://blog.csdn.net/hanburgud/article/details/51471047
http://lqding.blog.51cto.com/9123978/1774426
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。