您好,登錄后才能下訂單哦!
本篇內容主要講解“Spark里的閉包是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Spark里的閉包是什么”吧!
閉包的概念如下圖:
在spark應用里,變量及函數的作用范圍和聲明周期在spark的集群運行模式下是比較難理解的,尤其是對初學者來說。RDD的操作,要修改其作用范圍的變量,經常會出點叉子。下面,可以舉個用foreach,修改一個計數器的例子。
例子
求和RDD元素的例子,該例子會根據該段代碼是否執行在同一個jvm里面有不同的輸出結果,比如local模式,運行于同一個jvm,輸出是15;cluster模式運行于不同jvm輸出是0。
val data = Array(1, 2, 3, 4, 5)
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
本地或集群模式
上述代碼的行為是未定義的,并且不同模式下運行情況不同。為了執行作業,Spark將RDD操作的處理分解為tasks,每個task由Executor執行。在執行之前,Spark會計算task的閉包。閉包是Executor在RDD上進行計算的時候必須可見的那些變量和方法(在這種情況下是foreach())。閉包會被序列化并發送給每個Executor。
發送給每個Executor的閉包中的變量是副本,因此,當foreach函數內引用計數器時,它不再是driver節點上的計數器。driver節點的內存中仍有一個計數器,但該變量是Executor不可見的!執行者只能看到序列化閉包的副本。因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包內的值。
在本地模式下,在某些情況下,該foreach函數實際上將在與driver相同的JVM內執行,并且會引用相同的原始計數器,并可能實際更新它。
為了確保在這些場景中明確定義的行為,應該使用一個Accumulator。Spark中的累加器專門用于提供一種機制,用于在集群中的工作節點之間執行拆分時安全地更新變量。
一般來說,closures - constructs像循環或本地定義的方法,不應該被用來改變一些全局狀態。Spark并沒有定義或保證從閉包外引用的對象的改變行為。這樣做的一些代碼可以在本地模式下工作,但這只是偶然,并且這種代碼在分布式模式下的行為不會像你想的那樣。如果需要某些全局聚合,請改用累加器。
打印RDD的元素
另一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。在單臺機器上,這將產生預期的輸出并打印所有RDD的元素。但是,在cluster模式下,由Executor執行輸出寫入的是Executor的stdout,而不是driver上的那個stdout,所以driver的stdout不會顯示這些!要在driver中打印所有元素,可以使用該collect()方法首先將RDD數據帶到driver節點:rdd.collect().foreach(println)。但這可能會導致driver程序內存不足,因為collect()會將整個RDD數據提取到driver端; 如果您只需要打印RDD的一些元素,則更安全的方法是使用take():rdd.take(100).foreach(println)。
到此,相信大家對“Spark里的閉包是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。