91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第8課:Spark Streaming源碼解讀之

發布時間:2020-03-01 17:59:42 來源:網絡 閱讀:531 作者:Spark_2016 欄目:大數據

本篇博客將詳細探討DStream模板下的RDD是如何被創建,然后被執行的。在開始敘述之前,先來思考幾個問題,本篇文章也就是基于此問題構建的。 
1. RDD是誰產生的? 
2. 如何產生RDD? 
帶著這兩個問題開啟我們的探索之旅。

DStream是RDD的模板,每隔一個Batch Interval會根據DStream模板生成一個對應的RDD,然后將RDD存儲到DStream中的generatedRDDs數據結構中,下面是存儲結構格式。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

1、簡單的WordCount程序

object WordCount {  def main(args:Array[String]): Unit ={
    val sparkConf = new SparkConf().setMaster("Master:7077").setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(10)) // Timer觸發頻率

    val lines = ssc.socketTextStream("Master",9999) //接收數據
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
首先我們先看看print方法,具體的代碼如下:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
 def foreachFunc: (RDD[T], Time) => Unit = {
   (rdd: RDD[T], time: Time) => {
     val firstNum = rdd.take(num + 1)
     // scalastyle:off println
     println("-------------------------------------------")
     println("Time: " + time)
     println("-------------------------------------------")
     firstNum.take(num).foreach(println)
     if (firstNum.length > num) println("...")
     println()
     // scalastyle:on println
   }
 }
 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

首先定義了一個函數,該函數用來從RDD中取出前幾條數據,并打印出結果與時間等,后面會調用foreachRDD函數。

private def foreachRDD(
   foreachFunc: (RDD[T], Time) => Unit,
   displayInnerRDDOps: Boolean): Unit = {
   new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
 ssc.graph.addOutputStream(this)
 this
}

def addOutputStream(outputStream: DStream[_]) {
 this.synchronized {
   outputStream.setGraph(this)
   outputStreams += outputStream
 }

在foreachRDD中new出了一個ForEachDStream對象,并將這個注冊給DStreamGraph,ForEachDStream對象也就是DStreamGraph中的outputStreams。

當每到達一個BatchInterval時候,就會調用DStreamingGraph中的generateJobs.

def generateJobs(time: Time): Seq[Job] = {
 logDebug("Generating jobs for time " + time)
 val jobs = this.synchronized {
   outputStreams.flatMap { outputStream =>
     val jobOption = outputStream.generateJob(time)
     jobOption.foreach(_.setCallSite(outputStream.creationSite))
     jobOption
   }
 }
 logDebug("Generated " + jobs.length + " jobs for time " + time)
 jobs
}

這里就會調用outputStream的generateJob方法


private[streaming] def generateJob(time: Time): Option[Job] = {
 getOrCompute(time) match {
   case Some(rdd) => {
     val jobFunc = () => {
       val emptyFunc = { (iterator: Iterator[T]) => {} }
       context.sparkContext.runJob(rdd, emptyFunc)
     }
     Some(new Job(time, jobFunc))
   }
   case None => None
 }
}

這里會調用getOrCompute(time)來產生新RDD,并將其存入到generatedRDDs中,整理的過程如下圖:
(版本定制)第8課:Spark Streaming源碼解讀之


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泰安市| 龙岩市| 阜宁县| 巍山| 西城区| 安化县| 桐柏县| 元氏县| 特克斯县| 平武县| 台中市| 徐水县| 怀来县| 新安县| 耒阳市| 久治县| 峡江县| 景德镇市| 蓝山县| 原阳县| 临西县| 志丹县| 六枝特区| 阿拉善右旗| 怀仁县| 潼关县| 栾城县| 凭祥市| 工布江达县| 河南省| 阜宁县| 柳州市| 社旗县| 屏边| 长子县| 会昌县| 吴江市| 华阴市| 兰坪| 平果县| 合山市|