91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark閉包中driver及executor程序代碼是怎樣執行的

發布時間:2021-12-17 09:22:19 來源:億速云 閱讀:309 作者:柒染 欄目:大數據

Spark閉包中driver及executor程序代碼是怎樣執行的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Spark中的閉包

閉包的作用可以理解為:函數可以訪問函數外部定義的變量,但是函數內部對該變量進行的修改,在函數外是不可見的,即對函數外源變量不會產生影響。

Spark閉包中driver及executor程序代碼是怎樣執行的

其實,在學習Spark時,一個比較難理解的點就是,在集群模式下,定義的變量和方法作用域的范圍和生命周期。這在你操作RDD時,比如調用一些函數map、foreach時,訪問其外部變量進行操作時,很容易產生疑惑。為什么我本地程序運行良好且結果正確,放到集群上卻得不到想要的結果呢?

首先通過下邊對RDD中的元素進行求和的示例,來看相同的代碼本地模式和集群模式運行結果的區別:

Spark閉包中driver及executor程序代碼是怎樣執行的

Spark為了執行  任務,會將RDD的操作分解為多個task,并且這些task是由executor執行的。  在執行之前,Spark會計算task的閉包即定義的一些變量和方法,比如例子中的counter變量和foreach方法,并且閉包必須對executor而言是可見的,這些閉包會被序列化發送到每個executor。
在集群  模式下,driver和executor運行在不同的JVM進程中,發送給每個executor的閉包中的變量是driver端變量的副本。  因此,當foreach函數內引用counter時,其實處理的只是driver端變量的副本,與driver端本身的counter無關。  driver節點的內存中仍有一個計數器,但該變量對executor是不可見的!  executor只能看到序列化閉包的副本。  因此,上述例子輸出的counter最終值仍然為零,因為counter上的所有操作都只是引用了序列化閉包內的值。
在本地模式下,往往driver和executor運行在同一JVM進程中。那么這些閉包將會被共享,executor操作的counter和driver持有的counter是同一個,那么counter在處理后最終值為6。

但是在生產中,我們的任務都是在集群模式下運行,如何能滿足這種業務場景呢?

這就必須引出一個后續要重點講解的概念:Accumulator即累加器。Spark中的累加器專門用于提供一種機制,用于在集群中的各個worker節點之間執行時安全地更新變量。

Spark閉包中driver及executor程序代碼是怎樣執行的

一般來  說,closures - constructs比如循環或本地定義的方法,就不應該被用來改變一些全局狀態,Spark并沒有定義或保證對從閉包外引用的對象進行更新的行為。  如果你這樣操作只會導致一些代碼在本地模式下能夠達到預期的效果,但是在分布式環境下卻事與愿違。  如果需要某些全局聚合,請改用累加器。  對于其他的業務場景,我們適時考慮引入外部存儲系統、廣播變量等。  
 
閉包函數從產生到在executor執行經歷了什么?

首先,對RDD相關的操作需要傳入閉包函數,如果這個函數需要訪問外部定義的變量,就需要滿足一定條件(比如必須可被序列化),否則會拋出運行時異常。閉包函數在最終傳入到executor執行,需要經歷以下步驟:

1.driver通過反射,運行時找到閉包訪問的變量,并封裝成一個對象,然后序列化該對象

2.將序列化后的對象通過網絡傳輸到worker節點

3.worker節點反序列化閉包對象

4.worker節點的executor執行閉包函數

簡而言之,就是要通過網絡傳遞函數、然后執行,期間會經歷序列化和反序列化,所以要求被傳遞的變量必須可以被序列化和反序列化,否則會拋類似Error:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects這樣的異常。即使是本地執行時,也會按照上述的步驟執行,這也是為什么不允許在RDD內部直接操作RDD的原因(SparkContext不支持序列化)。同時,在這些算子閉包內修改外部定義的變量不會被反饋到driver端。
driver & executor

driver是運行用戶編寫Application 的main()函數的地方,具體負責DAG的構建、任務的劃分、task的生成與調度等。job,stage,task生成都離不開rdd自身,rdd的相關的操作不能缺少driver端的sparksession/sparkcontext。

executor是真正執行task地方,而task執行離不開具體的數據,這些task運行的結果可以是shuffle中間結果,也可以持久化到外部存儲系統。一般都是將結果、狀態等匯集到driver。但是,目前executor之間不能互相通信,只能借助第三方來實現數據的共享或者通信。
編寫的Spark程序代碼,運行在driver端還是executor端呢?
先看個簡單例子:通常我們在本地測試程序的時候,要打印RDD中的數據。
在本地模式下,直接使用rdd.foreach(println)或rdd.map(println)在單臺機器上,能夠按照預期打印并輸出所有RDD的元素。
但是,在集群模式下,由executor執行輸出寫入的是executor的stdout,而不是driver上的stdout,所以driver的stdout不會顯示這些!
要想在driver端打印所有元素,可以使用collect()方法先將RDD數據帶到driver節點,然后在調用foreach(println)(但需要注意一點,由于會把RDD中所有元素都加載到driver端,可能引起driver端內存不足導致OOM。如果你只是想獲取RDD中的部分元素,可以考慮使用take或者top方法)
總之,在這里RDD中的元素即為具體的數據,對這些數據的操作都是由負責task執行的executor處理的,所以想在driver端輸出這些數據就必須先將數據加載到driver端進行處理。

最后做個總結:所有對RDD具體數據的操作都是在executor上執行的,所有對rdd自身的操作都是在driver上執行的。比如foreach、foreachPartition都是針對rdd內部數據進行處理的,所以我們傳遞給這些算子的函數都是執行于executor端的。但是像foreachRDD、transform則是對RDD本身進行一列操作,所以它的參數函數是執行在driver端的,那么它內部是可以使用外部變量,比如在SparkStreaming程序中操作offset、動態更新廣播變量等。

關于Spark閉包中driver及executor程序代碼是怎樣執行的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

诸暨市| 白山市| 津南区| 梁平县| 牙克石市| 磐石市| 互助| 彰化市| 察雅县| 衡阳县| 仙居县| 韩城市| 郴州市| 太谷县| 长顺县| 盐山县| 彰武县| 冕宁县| 永清县| 铁岭市| 淮北市| 南汇区| 盐城市| 新野县| 睢宁县| 南城县| 浏阳市| 广河县| 八宿县| 宾阳县| 江油市| 项城市| 石台县| 昌平区| 峨眉山市| 乐昌市| 邯郸市| 乳源| 甘孜县| 通城县| 哈巴河县|