您好,登錄后才能下訂單哦!
引言
Join是SQL語句中的常用操作,良好的表結構能夠將數據分散在不同的表中,使其符合某種范式,減少表冗余、更新容錯等。而建立表和表之間關系的最佳方式就是Join操作。
對于Spark來說有3中Join的實現,每種Join對應著不同的應用場景:
Broadcast Hash Join :適合一張較小的表和一張大表進行join
Shuffle Hash Join : 適合一張小表和一張大表進行join,或者是兩張小表之間的join
Sort Merge Join :適合兩張較大的表之間進行join
前兩者都基于的是Hash Join,只不過在hash join之前需要先shuffle還是先broadcast。下面將詳細的解釋一下這三種不同的join的具體原理。
Hash Join
先來看看這樣一條SQL語句:
select * from order,item where item.id = order.i_id
確定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就可以join在一起。通常情況下,小表會作為Build Table,大表作為Probe Table。此事例中item為Build Table,order為Probe Table;很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。現在假設這個Join采用的是hash join算法,整個過程會經歷三步:
構建Hash Table:依次讀取Build Table(item)的數據,對于每一行數據根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。數據緩存在內存中,如果內存放不下需要dump到外存;
基本流程可以參考上圖,這里有兩個小問題需要關注:
hash join性能如何?很顯然,hash join基本都只掃描兩表一次,可以認為o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街;
上文說過,hash join是傳統數據庫中的單機join算法,在分布式環境下需要經過一定的分布式改造,說到底就是盡可能利用分布式計算資源進行并行化計算,提高總體效率。hash join分布式改造一般有兩種經典方案:
broadcast hash join:將其中一張小表廣播分發到另一張大表所在的分區節點上,分別并發地與其上的分區記錄進行hash join。broadcast適用于小表很小,可以直接廣播的場景;
Broadcast Hash Join
大家知道,在數據庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯系人、物品種類等,一般數據有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。
因為Join操作是對兩個表中key值相同的記錄進行連接,在SparkSQL中,對兩個表做Join最直接的方式是先根據key分區,再在每個分區中把key值相同的記錄拿出來做連接操作。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操作,我們應該盡可能的設計Spark應用使其避免大量的shuffle。
當維度表和事實表進行Join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部數據分發到每個節點上,供事實表使用。executor存儲維度表的全部數據,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join,如下圖所示:
Table B是較小的表,黑色表示將其廣播到每個executor節點上,Table A的每個partition會通過block manager取到Table A的數據。根據每條記錄的Join Key取到Table B中相對應的記錄,根據Join Type進行操作。這個過程比較簡單,不做贅述。
Broadcast Join的條件有以下幾個:
被廣播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M (或者加了broadcast join的hint)
看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用于廣播較小的表,否則數據的冗余傳輸就遠大于shuffle的開銷;另外,廣播時需要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的內存也是一個考驗。
如下圖所示,broadcast hash join可以分為兩步:
broadcast階段:將小表廣播分發到大表所在的所有主機。廣播算法可以有很多,最簡單的是先發給driver,driver再統一分發給所有executor;要不就是基于bittorrete的p2p思路;
SparkSQL規定broadcast hash join執行的基本條件為被廣播小表必須小于參數spark.sql.autoBroadcastJoinThreshold,默認為10M。
Shuffle Hash Join
當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver段,然后被冗余分發到每個executor上,所以當表比較大時,采用broadcast join會對driver端和executor端造成較大的壓力。
但由于Spark是一個分布式的計算引擎,可以通過分區的形式將大批量的數據劃分成n份較小的數據集進行并行計算。這種思想應用到Join上便是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,兩個表中,key相同的行都會被shuffle到同一個分區中,SparkSQL將較大表的join分而治之,先將表劃分成n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的內存消耗。其原理如下圖:
Shuffle Hash Join分為兩步:
對兩張表分別按照join keys進行重分區,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分區中
Shuffle Hash Join的條件有以下幾個:
分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M
基表不能被廣播,比如left outer join時,只能廣播右表
我們可以看到,在一定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行重新分區,并且對小表中的分區進行hash化,從而完成join。在保持一定復雜度的基礎上,盡量減少driver和executor的內存壓力,提升了計算時的穩定性。
在大數據條件下如果一張表很小,執行join操作最優的選擇無疑是broadcast hash join,效率最高。但是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join就不再是最優方案。此時可以按照join key進行分區,根據key相同必然分區相同的原理,就可以將大表join分而治之,劃分為很多小表的join,充分利用集群資源并行化。如下圖所示,shuffle hash join也可以分為兩步:
shuffle階段:分別將兩個表按照join key進行分區,將相同join key的記錄重分布到同一節點,兩張表的數據會被重分布到集群中所有節點。這個過程稱為shuffle
看到這里,可以初步總結出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join算法;那如果是兩張大表進行join呢?
Sort Merge Join
上面介紹的兩種實現對于一定大小的表比較適用,但當兩個表都非常大時,顯然無論適用哪種都會對計算內存造成很大壓力。這是因為join時兩者采取的都是hash join,是將一側的數據完全加載到內存中,使用hash code取join keys值相等的記錄進行連接。
當兩個表都非常大時,SparkSQL采用了一種全新的方案來對表進行Join,即Sort Merge Join。這種實現方式不用將一側數據全部加載后再進星hash join,但需要在join前將數據排序,如下圖所示:
可以看到,首先將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區后對每個分區內的數據進行排序,排序后再對相應的分區內的記錄進行連接,如下圖示:
看著很眼熟吧?也很簡單,因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊。
可以看出,無論分區有多大,Sort Merge Join都不用把某一側的數據全部加載到內存中,而是即用即取即丟,從而大大提升了大數據量下sql join的穩定性。
SparkSQL對兩張大表join采用了全新的算法-sort-merge join,如下圖所示,整個過程分為三個步驟:
shuffle階段:將兩張大表根據join key進行重新分區,兩張表數據會分布到整個集群,以便分布式并行處理;
sort階段:對單個分區節點的兩表數據,分別進行排序;
經過上文的分析,可以明確每種Join算法都有自己的適用場景,數據倉庫設計時最好避免大表與大表的join查詢,SparkSQL也可以根據內存資源、帶寬資源適量將參數spark.sql.autoBroadcastJoinThreshold調大,讓更多join實際執行為broadcast hash join。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。