91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第11課:Spark Streaming源碼解讀

發布時間:2020-06-05 21:41:05 來源:網絡 閱讀:290 作者:Spark_2016 欄目:大數據

本期內容:

    1、ReceiverTracker的架構設計

    2、消息循環系統

    3、ReceiverTracker具體實現


上節課講到了Receiver是如何不斷的接收數據的,并且接收到的數據的元數據會匯報給ReceiverTracker,下面我們看看ReceiverTracker具體的功能及實現。

ReceiverTracker主要的功能:

  1. 在Executor上啟動Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收數據的速度(也就是限流)

  4. 不斷的等待Receivers的運行狀態,只要Receivers停止運行,就重新啟動Receiver,也就是Receiver的容錯功能。

  5. 接受Receiver的注冊。

  6. 借助ReceivedBlockTracker來管理Receiver接收數據的元數據。

  7. 匯報Receiver發送過來的錯誤信息


ReceiverTracker 管理了一個消息通訊體ReceiverTrackerEndpoint,用來與Receiver或者ReceiverTracker 進行消息通信。

在ReceiverTracker的start方法中,實例化了ReceiverTrackerEndpoint,并且在Executor上啟動Receivers。

啟動Receivr,其實是ReceiverTracker給ReceiverTrackerEndpoint發送了一個本地消息,ReceiverTrackerEndpoint將Receiver封裝成RDD以job的方式提交給集群運行。

Receiver啟動后,會向ReceiverTracker注冊,注冊成功才算正式啟動了。

當Receiver端接收到數據,達到一定的條件需要將數據寫入BlockManager,并且將數據的元數據匯報給ReceiverTracker。

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
  ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

當ReceiverTracker收到元數據后,會在線程池中啟動一個線程來寫數據

case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
          context.reply(addBlock(receivedBlockInfo)) 
        } else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }

數據的元數據是交由ReceivedBlockTracker管理的

數據最終被寫入到streamIdToUnallocatedBlockQueues中,一個流對應一個數據塊信息的隊列。

每當Streaming 觸發job時,會將隊列中的數據分配成一個batch,并將數據寫入timeToAllocatedBlocks數據結構。

下面是簡單的流程圖:

(版本定制)第11課:Spark Streaming源碼解讀

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

塘沽区| 农安县| 商南县| 西藏| 闽清县| 牙克石市| 卫辉市| 岑溪市| 丰原市| 东兰县| 大荔县| 米林县| 班戈县| 莒南县| 太原市| 秦皇岛市| 武隆县| 哈尔滨市| 龙陵县| 巴东县| 古交市| 萨嘎县| 安国市| 鄂托克前旗| 信丰县| 平武县| 和硕县| 巴林右旗| 铁岭市| 鹰潭市| 老河口市| 昌吉市| 田东县| 北海市| 武宣县| 东安县| 翁源县| 建阳市| 满洲里市| 盐池县| 绵阳市|