您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何理解Receiver啟動以及啟動源碼分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
為什么要Receiver?
Receiver不斷持續接收外部數據源的數據,并把數據匯報給Driver端,這樣我們每隔BatchDuration會把匯報數據生成不同的Job,來執行RDD的操作。
Receiver是隨著應用程序的啟動而啟動的。
Receiver和InputDStream是一一對應的。
RDD[Receiver]只有一個Partition,一個Receiver實例。
Spark Core并不知道RDD[Receiver]的特殊性,依然按照普通RDD對應的Job進行調度,就有可能在同樣一個Executor上啟動多個Receiver,會導致負載不均衡,會導致Receiver啟動失敗。
Receiver在Executor啟動的方案:
1,啟動不同Receiver采用RDD中不同Partiton的方式,不同的Partiton代表不同的Receiver,在執行層面就是不同的Task,在每個Task啟動時就啟動Receiver。
這種方式實現簡單巧妙,但是存在弊端啟動可能失敗,運行過程中Receiver失敗,會導致TaskRetry,如果3次失敗就會導致Job失敗,會導致整個Spark應用程序失敗。因為Receiver的故障,導致Job失敗,不能容錯。
2.第二種方式就是Spark Streaming采用的方式。
在ReceiverTacker的start方法中,先實例化Rpc消息通信體ReceiverTrackerEndpoint,再調用
launchReceivers方法。
/** Start the endpoint and receiver execution thread. */ |
在launchReceivers方法中,先對每一個ReceiverInputStream獲取到對應的一個Receiver,然后發送StartAllReceivers消息。Receiver對應一個數據來源。
/** |
ReceiverTrackerEndpoint接收到StartAllReceivers消息后,先找到Receiver運行在哪些Executor上,然后調用startReceiver方法。
override def receive: PartialFunction[Any, Unit] = { |
startReceiver方法在Driver層面自己指定了TaskLocation,而不用Spark Core來幫我們選擇TaskLocation。其有以下特點:終止Receiver不需要重啟Spark Job;第一次啟動Receiver,不會執行第二次;為了啟動Receiver而啟動了一個Spark作業,一個Spark作業啟動一個Receiver。每個Receiver啟動觸發一個Spark作業,而不是每個Receiver是在一個Spark作業的一個Task來啟動。當提交啟動Receiver的作業失敗時發送RestartReceiver消息,來重啟Receiver。
/** |
看完上述內容,你們對如何理解Receiver啟動以及啟動源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。