您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關大數據開發中如何進行Spark閉包的理解分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
閉包是一個函數,返回值依賴于聲明在函數外部的一個或多個變量。閉包通常來講可以簡單的認為是可以訪問一個函數里面局部變量的另外一個函數。
如下面這段匿名的函數:
val multiplier = (i:Int) => i * 10
函數體內有一個變量 i,它作為函數的一個參數。如下面的另一段代碼:
val multiplier = (i:Int) => i * factor
在 multiplier
中有兩個變量:i 和 factor。其中的一個 i 是函數的形式參數,在 multiplier
函數被調用時,i 被賦予一個新的值。然而,factor不是形式參數,而是自由變量,考慮下面代碼:
var factor = 3 val multiplier = (i:Int) => i * factor
這里我們引入一個自由變量 factor
,這個變量定義在函數外面。
這樣定義的函數變量 multiplier
成為一個"閉包",因為它引用到函數外面定義的變量,定義這個函數的過程是將這個自由變量捕獲而構成一個封閉的函數
完整的例子:
object Test { def main(args: Array[String]) { println( "muliplier(1) value = " + multiplier(1) ) println( "muliplier(2) value = " + multiplier(2) ) } var factor = 3 val multiplier = (i:Int) => i * factor }
先來看下面一段代碼:
val data=Array(1, 2, 3, 4, 5) var counter = 0 var rdd = sc.parallelize(data) // ???? 這樣做會怎么樣 rdd.foreach(x => counter += x) println("Counter value: " + counter)
首先肯定的是上面輸出的結果是0,park將RDD操作的處理分解為tasks,每個task由Executor
執行。在執行之前,Spark會計算task的閉包。閉包是Executor
在RDD上進行計算的時候必須可見的那些變量和方法(在這種情況下是foreach())。閉包會被序列化并發送給每個Executor,但是發送給Executor的是副本,所以在Driver上輸出的依然是counter
本身,如果想對全局的進行更新,用累加器,在spark-streaming
里面使用updateStateByKey
來更新公共的狀態。
另外在Spark中的閉包還有別的作用,
1.清除Driver發送到Executor上的無用的全局變量等,只復制有用的變量信息給Executor
2.保證發送到Executor上的是序列化以后的數據
比如在使用DataSet時候 case class的定義必須在類下,而不能是方法內,即使語法上沒問題,如果使用過json4s來序列化,implicit val formats = DefaultFormats
的引入最好放在類下,否則要單獨將這個format序列化,即使你沒有使用到它別的東西。
關于大數據開發中如何進行Spark閉包的理解分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。