您好,登錄后才能下訂單哦!
本篇博客將詳細探討DStream模板下的RDD是如何被創建,然后被執行的。在開始敘述之前,先來思考幾個問題,本篇文章也就是基于此問題構建的。
1. RDD是誰產生的?
2. 如何產生RDD?
帶著這兩個問題開啟我們的探索之旅。
DStream是RDD的模板,每隔一個Batch Interval會根據DStream模板生成一個對應的RDD,然后將RDD存儲到DStream中的generatedRDDs數據結構中,下面是存儲結構格式。
// RDDs generated, marked as private[streaming] so that testsuites can access it @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
1、簡單的WordCount程序
object WordCount { def main(args:Array[String]): Unit ={ val sparkConf = new SparkConf().setMaster("Master:7077").setAppName("WordCount") val ssc = new StreamingContext(sparkConf,Seconds(10)) // Timer觸發頻率 val lines = ssc.socketTextStream("Master",9999) //接收數據 val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x,1)).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } }
首先我們先看看print方法,具體的代碼如下:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
首先定義了一個函數,該函數用來從RDD中取出前幾條數據,并打印出結果與時間等,后面會調用foreachRDD函數。
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
在foreachRDD中new出了一個ForEachDStream對象,并將這個注冊給DStreamGraph,ForEachDStream對象也就是DStreamGraph中的outputStreams。
當每到達一個BatchInterval時候,就會調用DStreamingGraph中的generateJobs.
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
這里就會調用outputStream的generateJob方法
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
這里會調用getOrCompute(time)來產生新RDD,并將其存入到generatedRDDs中,整理的過程如下圖:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。