91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第12課:Spark Streaming源碼解讀之Executor容錯安全性

發布時間:2020-08-06 12:20:05 來源:網絡 閱讀:386 作者:Spark_2016 欄目:大數據

本期內容:

    1、Executor的WAL容錯機制

    2、消息重放


Executor的安全容錯主要是數據的安全容錯,那為什么不考慮數據計算的安全容錯呢?

原因是計算的時候Spark Streaming是借助于Spark Core上RDD的安全容錯的,所以天然的安全可靠的。

Executor的安全容錯主要有:

    1、數據副本:

         有兩種方式:a.借助底層的BlockManager,BlockManager做備份,通過傳入的StorageLevel進行備份。

                              b. WAL方式進行容錯。

    2、接受到數據之后,不做副本,但是數據源支持存放,所謂存放就是可以反復的讀取源數據。

容錯的弊端:耗時間、耗空間。

    

簡單的看下源代碼:

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
  ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}


private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
    }
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通過WAL容錯
  } else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通過BlockManager進行容錯
  }
}
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
tellMaster = true)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
tellMaster = true)
      numRecords = countIterator.count
      putResult
case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
  }
BlockManagerBasedStoreResult(blockId, numRecords)
}

簡單流程圖:


(版本定制)第12課:Spark Streaming源碼解讀之Executor容錯安全性(版本定制)第12課:Spark Streaming源碼解讀之Executor容錯安全性

參考博客:http://blog.csdn.net/hanburgud/article/details/51471089

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

常宁市| 湖北省| 延庆县| 高唐县| 津市市| 湘潭县| 外汇| 志丹县| 邹城市| 蓝山县| 元朗区| 晴隆县| 汝城县| 斗六市| 成都市| 丹凤县| 景宁| 灵宝市| 武定县| 东乌珠穆沁旗| 凌源市| 嘉善县| 台山市| 横峰县| 宣汉县| 韶山市| 德州市| 宁德市| 濮阳市| 汝南县| 吴江市| 本溪| 南岸区| 蓬莱市| 嘉义县| 洱源县| 万年县| 阜宁县| 昭通市| 屏南县| 赤水市|