91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第11課:Spark Streaming源碼解讀之Driver中的ReceiverTracker架構設計以及具體實現徹底研究

發布時間:2020-09-11 16:24:08 來源:網絡 閱讀:695 作者:lqding1980 欄目:大數據

上節課將到了Receiver是如何不斷的接收數據的,并且接收到的數據的元數據會匯報給ReceiverTracker,下面我們看看ReceiverTracker具體的功能及實現。

一、 ReceiverTracker主要的功能:

  1. 在Executor上啟動Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收數據的速率(也就是限流)

  4. 不斷的等待Receivers的運行狀態,只要Receivers停止運行,就重新啟動Receiver。也就是Receiver的容錯功能。

  5. 接受Receiver的注冊。

  6. 借助ReceivedBlockTracker來管理Receiver接收數據的元數據。

  7. 匯報Receiver發送過來的錯誤信息


ReceiverTracker 管理了一個消息通訊體ReceiverTrackerEndpoint,用來與Receiver或者ReceiverTracker 進行消息通信。

在ReceiverTracker的start方法中,實例化了ReceiverTrackerEndpoint,并且在Executor上啟動Receivers:

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

啟動Receivr,其實是ReceiverTracker給ReceiverTrackerEndpoint發送了一個本地消息,ReceiverTrackerEndpoint將Receiver封裝成RDD以job的方式提交給集群運行。

endpoint.send(StartAllReceivers(receivers))

這里的endpoint就是ReceiverTrackerEndpoint的引用。


Receiver啟動后,會向ReceiverTracker注冊,注冊成功才算正式啟動了。

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

當Receiver端接收到數據,達到一定的條件需要將數據寫入BlockManager,并且將數據的元數據匯報給ReceiverTracker:

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}


當ReceiverTracker收到元數據后,會在線程池中啟動一個線程來寫數據:

case AddBlock(receivedBlockInfo) =>
  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        if (active) {
          context.reply(addBlock(receivedBlockInfo))
        } else {
          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }

數據的元數據是交由ReceivedBlockTracker管理的。

數據最終被寫入到streamIdToUnallocatedBlockQueues中:一個流對應一個數據塊信息的隊列。

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]


每當Streaming 觸發job時,會將隊列中的數據分配成一個batch,并將數據寫入timeToAllocatedBlocks數據結構。

private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
....
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    }.toMap
    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
    // This situation occurs when:
    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    // possibly processed batch job or half-processed batch job need to be processed again,
    // so the batchTime will be equal to lastAllocatedBatchTime.
    // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    // lastAllocatedBatchTime.
    // This situation will only occurs in recovery time.
    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}

可見一個batch會包含多個流的數據。


每當Streaming 的一個job運行完畢后:

private def handleJobCompletion(job: Job, completedTime: Long) {
  val jobSet = jobSets.get(job.time)
  jobSet.handleJobCompletion(job)
  job.setEndTime(completedTime)
  listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
  logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
  if (jobSet.hasCompleted) {
    jobSets.remove(jobSet.time)
    jobGenerator.onBatchCompletion(jobSet.time)
    logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
      jobSet.totalDelay / 1000.0, jobSet.time.toString,
      jobSet.processingDelay / 1000.0
    ))
    listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
  }
  ...

JobScheduler會調用handleJobCompletion方法,最終會觸發

jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)


這里的maxRememberDuration是DStream中每個時刻生成的RDD保留的最長時間。

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
  require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
  val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
  logInfo("Deleting batches " + timesToCleanup)
  if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
    timeToAllocatedBlocks --= timesToCleanup
    writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
  } else {
    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
  }
}

而最后

listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))

這個代碼會調用

case batchCompleted: StreamingListenerBatchCompleted =>
  listener.onBatchCompleted(batchCompleted)
  
  ... 一路跟著下去...
  
  /**
 * A RateController that sends the new rate to receivers, via the receiver tracker.
 */
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
  override def publish(rate: Long): Unit =
    ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
  if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
  }
}
case UpdateReceiverRateLimit(streamUID, newRate) =>
  for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
    eP.send(UpdateRateLimit(newRate))
  }

發送調整速率的消息給Receiver,Receiver接到消息后,最終通過BlockGenerator來調整數據的寫入的時間,而控制數據流的速率。

case UpdateRateLimit(eps) =>
  logInfo(s"Received a new rate limit: $eps.")
  registeredBlockGenerators.foreach { bg =>
    bg.updateRate(eps)
  }


備注:

1、DT大數據夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

北票市| 陈巴尔虎旗| 浦城县| 万荣县| 丹寨县| 科技| 淳安县| 甘洛县| 浪卡子县| 四子王旗| 霍州市| 巴马| 西安市| 六枝特区| 保亭| 永仁县| 安丘市| 板桥市| 新津县| 河曲县| 屯昌县| 武义县| 房山区| 镇宁| 芜湖县| 门源| 台江县| 泸溪县| 乌兰县| 房山区| 襄城县| 策勒县| 普陀区| 仙桃市| 大荔县| 安平县| 新平| 陇川县| 曲松县| 甘泉县| 灌阳县|