您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何理解Spark 3.0 的動態分區裁剪優化,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
Spark 3.0 為我們帶來了許多令人期待的特性。動態分區裁剪(dynamic partition pruning)就是其中之一。本文將通過圖文的形式來帶大家理解什么是動態分區裁剪。
Spark 中的靜態分區裁剪
在介紹動態分區裁剪之前,有必要對 Spark 中的靜態分區裁剪進行介紹。在標準數據庫術語中,裁剪意味著優化器將避免讀取不包含我們正在查找的數據的文件。例如我們有以下的查詢 SQL:
Select * from iteblog.Students where subject = 'English';
在這個簡單的查詢中,我們試圖匹配和識別 Students 表中 subject = English 的記錄。比較愚蠢的做法是先把數據全部 scan 出來,然后再使用 subject = 'English' 去過濾。如下圖所示:
比較好的實現是查詢優化器將過濾器下推到數據源,以便能夠避免掃描整個數據集,Spark 就是這么來做的,如下圖所示:
在靜態分區裁剪技術中,我們的表首先是分區的,分區過濾下推的思想和上面的 filter push down 一致。因為在這種情況下,如果我們的查詢有一個針對分區列的過濾,那么在實際的查詢中可以跳過很多不必要的分區,從而大大減少數據的掃描,減少磁盤I/O,從而提升計算的性能。
然而,在現實中,我們的查詢語句不會是這么簡單的。通常情況下,我們會有多張維表,小表需要與大的事實表進行 join。因此,在這種情況下,我們不能再應用靜態分區裁剪,因為 filter 條件在 join 表的一側,而對裁剪有用的表在 Join 的另一側。比如我們有以下的查詢語句:
Select * from iteblog.Students join iteblog.DailyRoutine where iteblog.DailyRoutine.subject = 'English';
對于上面的查詢,比較垃圾的查詢引擎最后的執行計劃如下:
它把兩張表的數據進行關聯,然后再過濾。在數據量比較大的情況下效率可想而知。一些比較好的計算引擎可以進行一些優化,比如:
其能夠在一張表里面先過濾一些無用的數據,再進行 Join,效率自然比前面一種好。但是如果是我們人來弄,其實我們可以把 subject = 'English' 過濾條件下推到 iteblog.Students 表里面,這個正是 Spark 3.0 給我們帶來的動態分區裁剪優化。
動態分區裁剪
在 Spark SQL 中,用戶通常用他們喜歡的編程語言并選擇他們喜歡的 API 來提交查詢,這也就是為什么有 DataFrames 和 DataSet。Spark 將這個查詢轉化為一種易于理解的形式,我們稱它為查詢的邏輯計劃(logical plan)。在此階段,Spark 通過應用一組基于規則(rule based)的轉換(如列修剪、常量折疊、算子下推)來優化邏輯計劃。然后,它才會進入查詢的實際物理計劃(physical planning)。在物理規劃階段 Spark 生成一個可執行的計劃(executable plan),該計劃將計算分布在集群中。本文我將解釋如何在邏輯計劃階段實現動態分區修剪。然后,我們將研究如何在物理計劃階段中進一步優化它。
邏輯計劃階段優化
假設我們有一個具有多個分區的事實表(fact table),為了方便說明,我們用不同顏色代表不同的分區。另外,我們還有一個比較小的維度表(dimension table),我們的維度表不是分區表。然后我們在這些數據集上進行典型的掃描操作。在我們的例子里面,假設我們只讀取維度表里面的兩行數據,而這兩行數據其實對于另外一張表的兩個分區。所以最后執行 Join 操作時,帶有分區的事實表只需要讀取兩個分區的數據就可以。
因此,我們不需要實際掃描整個事實表。為了做到這種優化,一種簡單的方法是通過維度表構造出一個過濾子查詢(比如上面例子為 select subject from iteblog.DailyRoutine where subject = 'English'),然后在掃描事實表之前加上這個過濾子查詢。
通過這種方式,我們在邏輯計劃階段就知道事實表需要掃描哪些分區。
但是,上面的物理計劃執行起來還是比較低效。因為里面有重復的子查詢,我們需要找出一種方法來消除這個重復的子查詢。為了做到這一點,Spark 在物理計劃階段做了一些優化。
物理計劃階段優化
如果維度表很小,那么 Spark 很可能會以 broadcast hash join 的形式執行這個 Join。Broadcast Hash Join 的實現是將小表的數據廣播(broadcast)到 Spark 所有的 Executor 端,這個廣播過程和我們自己去廣播數據沒什么區別,先利用 collect 算子將小表的數據從 Executor 端拉到 Driver 端,然后在 Driver 端調用 sparkContext.broadcast 廣播到所有 Executor 端;另一方面,大表也會構建 hash table(稱為 build relation),之后在 Executor 端這個廣播出去的數據會和大表的對應的分區進行 Join 操作,這種 Join 策略避免了 Shuffle 操作。具體如下:
我們已經知道了 broadcast hash join 實現原理。其實動態分區裁剪優化就是在 broadcast hash join 中大表進行 build relation 的時候拿到維度表的廣播結果(broadcast results),然后在 build relation 的時候(Scan 前)進行動態過濾,從而達到避免掃描無用的數據效果。具體如下:
好了,以上就是動態分區裁剪在邏輯計劃和物理計劃的優化。
動態分區裁剪適用條件
并不是什么查詢都會啟用動態裁剪優化的,必須滿足以下幾個條件:
spark.sql.optimizer.dynamicPartitionPruning.enabled 參數必須設置為 true,不過這個值默認就是啟用的;
需要裁減的表必須是分區表,而且分區字段必須在 join 的 on 條件里面;
Join 類型必須是 INNER, LEFT SEMI (左表是分區表), LEFT OUTER (右表是分區表), or RIGHT OUTER (左表是分區表)。
滿足上面的條件也不一定會觸發動態分區裁減,還必須滿足 spark.sql.optimizer.dynamicPartitionPruning.useStats 和 spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio 兩個參數綜合評估出一個進行動態分區裁減是否有益的值,滿足了才會進行動態分區裁減。
關于如何理解Spark 3.0 的動態分區裁剪優化就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。