91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark中任務的示例分析

發布時間:2021-12-16 11:23:23 來源:億速云 閱讀:124 作者:小新 欄目:云計算

這篇文章主要介紹了Spark中任務的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

任務(Task)是Spark的最小執行單元,Spark任務是通過Task來執行的。Spark的任務體系是最神秘也是最容易學習的核心模塊,任務執行機制點透了那么Spark也就了解的更深入了。Task是任務體系的一個抽象類,有兩個子類:ResultTask和ShuffleMapTask,這三個類構成了任務系統的核心。

ResultTask好理解,就是直接執行Task中RDD某個分區的數據操作,還記得之前的RDD的結構嗎,里面有一個compute函數,任務就是執行compute函數。

ShuffleMapTask也是執行Task中RDD某個分區的數據操作,所不同的是輸出結果的存儲方式不一樣。ShuffleMapTask會把數據操作的結果保存到類似BlockManager的全局存儲中,ShuffleMapTask的結果可供下一個Task作為輸入數據。為什么分兩種呢?換個說法就很清楚了,ResultTask對應窄依賴的RDD,ShuffleMapTask對應寬依賴的RDD操作(如全連接操作)。ShuffleMapTask需要對數據的讀寫進行特殊的處理,要用BlockManager來輸出數據集的;同樣,ShuffleMapTask的子RDD的讀取數據集也是從BlockManager來的。

ResultTask和ShuffleMapTask的類的代碼非常簡單,就是重寫runTask方法。

Task通過Task描述對象來反序列化,獲得RDD和分區等對象后,創建TaskContextImpl作為任務上下文,然后執行run方法運行任務,讀取RDD中的迭代器數據并處理數據。run方法實際是調用子類重寫的runTask方法具體執行的。而runTask方法在ResultTask和ShuffleMapTask中被重寫。

1、 ResultTask

直接結果任務,這類任務執行完也就完了,其數據不需要被下一個任務再次處理。可以任務是終結者任務。

重寫runTask方法。runTask方法的核心代碼如下:

override def runTask(context: TaskContext): U = { 
 val ser = SparkEnv.get.closureSerializer.newInstance()
 val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
 ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) 
 func(context, rdd.iterator(partition, context))
 }

反序列化得到RDD中定義的數據處理函數func,func符合格式:

(TaskContext, Iterator[T]) => U

然后執行:

func(context, rdd.iterator(partition, context))

這方法的意思就是對rdd分區的數據迭代器輪詢,每次取出一條數據執行func操作。ResultTask的重寫部分就是這么簡單。

2、ShuffleMapTask

ShuffleMap格式的任務,這類任務的執行結果是要被下一個RDD消費的,因此輸出數據需要寫出到Shuffle區域。Shuffle區域會在分區數據管理中詳細的介紹。

重寫runTask方法。runTask方法的核心代碼如下:

override def runTask(context: TaskContext): MapStatus = {
 val ser = SparkEnv.get.closureSerializer.newInstance()
 val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
 ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
 val rdd = rddAndDep._1
 val dep = rddAndDep._2
 dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
 }

前半段和Result類似,反序列化得到RDD和分區,以及依賴分區dep。然后迭代rdd中的數據并寫入到依賴dep的shuffle區域中。

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Spark中任務的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

大石桥市| 宝清县| 鹤山市| 监利县| 蓬溪县| 会宁县| 马鞍山市| 兰考县| 上杭县| 景德镇市| 凤阳县| 奉贤区| 澳门| 宜春市| 嘉义县| 华池县| 漳平市| 安西县| 华容县| 沧州市| 沐川县| 东乌| 固始县| 攀枝花市| 彰武县| 安仁县| 雷山县| 承德市| 麦盖提县| 武义县| 叙永县| 宣威市| 隆德县| 黎平县| 广东省| 桐乡市| 黄浦区| 通州区| 临夏市| 泰安市| 台南市|