91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第12課:Spark Streaming源碼解讀之Execu

發布時間:2020-05-28 02:59:41 來源:網絡 閱讀:705 作者:lqding1980 欄目:大數據

Receiver接收到的數據交由ReceiverSupervisorImpl來管理。

ReceiverSupervisorImpl接收到數據后,會數據存儲并且將數據的元數據報告給ReceiverTracker 。

Executor的數據容錯可以有三種方式:

  1. WAL日志

  2. 數據副本

  3. 接收receiver的數據流回放

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

數據的存儲,是借助receiverBlockHandler,它的實現有兩種方式:

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    if (checkpointDirOption.isEmpty) {
      throw new SparkException(
        "Cannot enable receiver write-ahead log without checkpoint directory set. " +
          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
          "See documentation for more details.")
    }
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}


WriteAheadLogBaseBlockHandler 一方面將數據交由BlockManager管理,另一方面會寫WAL日志。

一旦節點崩潰,可以由WAL日志恢復內存中的數據。在WAL開始時,就不在建議數據存儲多個副本。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}


而BlockManagerBaseBlockHandler直接將數據交由BlockManager管理。

如果不寫WAL,當節點崩潰了一定會數據丟失嗎? 這個也不一定。因為在構建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的時候會將receiver的storageLevel傳入。storageLevel用來描述數據保存的地方(內存、磁盤)以及副本個數。

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable

公有如下種類的StorageLevel:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)


默認情況,數據采用MEMORY_AND_DISK_2,也就是說數據會產生兩個副本,并且內存不足時會寫入磁盤。


數據的最終存儲是由BlockManager完成并管理的:

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

  var numRecords = None: Option[Long]

  val putResult: Seq[(BlockId, BlockStatus)] = block match {
    case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
        tellMaster = true)
    case IteratorBlock(iterator) =>
      val countIterator = new CountingIterator(iterator)
      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
        tellMaster = true)
      numRecords = countIterator.count
      putResult
    case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
    case o =>
      throw new SparkException(
        s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
  if (!putResult.map { _._1 }.contains(blockId)) {
    throw new SparkException(
      s"Could not store $blockId to block manager with storage level $storageLevel")
  }
  BlockManagerBasedStoreResult(blockId, numRecords)
}


對于從kafka中直接讀取數據,可以通過記錄數據offset的方法來進行容錯。如果程序崩潰,下次啟動時,從上次未處理數據的offset再次讀取數據即可。



備注:

1、DT大數據夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

准格尔旗| 武义县| 桃源县| 四子王旗| 焉耆| 吴旗县| 仙游县| 遵化市| 浪卡子县| 渝北区| 蕲春县| 蓬莱市| 灌南县| 贵德县| 涡阳县| 大竹县| 石河子市| 尤溪县| 犍为县| 门源| 宝坻区| 吉安市| 安龙县| 安岳县| 裕民县| 客服| 台湾省| 昔阳县| 阿拉善左旗| 黑河市| 克什克腾旗| 郴州市| 诏安县| 彩票| 黔江区| 四子王旗| 昭通市| 二连浩特市| 潞西市| 黎城县| 东安县|