91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Driver容錯安全性怎么實現

發布時間:2021-12-16 16:32:55 來源:億速云 閱讀:121 作者:iii 欄目:云計算

這篇文章主要介紹“Driver容錯安全性怎么實現”,在日常操作中,相信很多人在Driver容錯安全性怎么實現問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Driver容錯安全性怎么實現”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

  • ·  第一、看ReceiverTracker的容錯,主要是ReceiverTracker接收元數據的進入WAL,看ReceiverTracker的addBlock方法,代碼如下

    def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

     try {

       val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

       if (writeResult) {

         synchronized {

           getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

         }

         logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId}")

       } else {

         logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

       }

       writeResult

     } catch {

       case NonFatal(e) =>

         logError(s"Error adding block $receivedBlockInfo", e)

         false

     }

    }

    writeToLog方法就是進行WAL的操作,看writeToLog的代碼

    private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

     if (isWriteAheadLogEnabled) {

       logTrace(s"Writing record: $record")

       try {

         writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

           clock.getTimeMillis())

         true

       } catch {

         case NonFatal(e) =>

           logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

           false

       }

     } else {

       true

     }

    }

    首先判斷是否開啟了WAL,根據一下isWriteAheadLogEnabled值

    private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

    接著看writeAheadLogOption

    private val writeAheadLogOption = createWriteAheadLog()

    再看createWriteAheadLog()方法

    private def createWriteAheadLog(): Option[WriteAheadLog] = {

     checkpointDirOption.map { checkpointDir =>

       val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

       WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

     }

    }

    根據checkpoint的配置,獲取checkpoint的目錄,這里可以看出,checkpoint可以有多個目錄。
    寫完WAL才將receivedBlockInfo放到內存隊列getReceivedBlockQueue中

    ·  第二、看ReceivedBlockTracker的allocateBlocksToBatch方法,代碼如下

    def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

       val streamIdToBlocks = streamIds.map { streamId =>

           (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

       }.toMap

       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

       }

     } else {

       // This situation occurs when:

       // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

       // possibly processed batch job or half-processed batch job need to be processed again,

       // so the batchTime will be equal to lastAllocatedBatchTime.

       // 2. Slow checkpointing makes recovered batch time older than WAL recovered

       // lastAllocatedBatchTime.

       // This situation will only occurs in recovery time.

       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

     }

    }

    首先從getReceivedBlockQueue中獲取每一個receiver的ReceivedBlockQueue隊列賦值給streamIdToBlocks,然后包裝一下

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    allocatedBlocks就是根據時間獲取的一批元數據,交給對應batchDuration的job,job在執行的時候就可以使用,在使用前先進行WAL,如果job出錯恢復后,可以知道數據計算到什么位置

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    ·  第三、看cleanupOldBatches方法,cleanupOldBatches的功能是從內存中清楚不用的batches元數據,再刪除WAL的數據,再刪除之前把要刪除的batches信息也進行WAL

    def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

     require(cleanupThreshTime.milliseconds < clock.getTimeMillis())

     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq

     logInfo("Deleting batches " + timesToCleanup)

     if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

       timeToAllocatedBlocks --= timesToCleanup

       writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

     } else {

       logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")

     }

    }

    ·  總結一下上面的三種WAL,對應下面的三種事件,這就是ReceiverTracker的容錯

    /** Trait representing any event in the ReceivedBlockTracker that updates its state. */

    private[streaming] sealed trait ReceivedBlockTrackerLogEvent

    private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

    ·  看一下Dstream.graph和JobGenerator的容錯,從開始

    private def generateJobs(time: Time) {

    SparkEnv has been removed.

     SparkEnv.set(ssc.env)

     Try {

     

       // allocate received blocks to batch

       // 分配接收到的數據給batch

       jobScheduler.receiverTracker.allocateBlocksToBatch(time)

       // 使用分配的塊生成jobs

       graph.generateJobs(time) // generate jobs using allocated block

     } match {

       case Success(jobs) =>

         // 獲取元數據信息

         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

         // 提交jobSet

         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

       case Failure(e) =>

         jobScheduler.reportError("Error generating jobs for time " + time, e)

     }

     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

    }

    jobs生成完成后發送DoCheckpoint消息,最終調用doCheckpoint方法,代碼如下

    private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

     if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

       logInfo("Checkpointing graph for time " + time)

       ssc.graph.updateCheckpointData(time)

       checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

     }

    }

     

到此,關于“Driver容錯安全性怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

淅川县| 北辰区| 色达县| 松江区| 皋兰县| 额济纳旗| 喀喇沁旗| 前郭尔| 余庆县| 顺平县| 双流县| 方山县| 三门峡市| 黔南| 衡东县| 波密县| 正定县| 文安县| 绥滨县| 铁岭市| 五莲县| 惠安县| 新昌县| 宜章县| 鹤壁市| 西林县| 安陆市| 临澧县| 奉贤区| 贡觉县| 长泰县| 图片| 罗江县| 海伦市| 昌平区| 屏东市| 大邑县| 塔河县| 凭祥市| 开原市| 德庆县|