您好,登錄后才能下訂單哦!
這篇文章主要介紹了Hive中SQL數據傾斜及優化的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
關鍵詞 | 情形 | 后果 |
Join | 其中一個表較小, 但是key集中 | 分發到某一個或幾個Reduce上的數據遠高于平均值 |
大表與大表,但是分桶的判斷字段0值或空值過多 | 這些空值都由一個reduce處理,灰常慢 | |
group by | group by 維度過小, 某值的數量過多 | 處理某值的reduce灰常耗時 |
Count Distinct | 某特殊值過多 | 處理此特殊值的reduce耗時 |
1)、key分布不均勻
2)、業務數據本身的特性
3)、建表時考慮不周
4)、某些SQL語句本身就有數據傾斜
任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。
單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大于平均時長。
hive.map.aggr=true
Map 端部分聚合,相當于Combiner
hive.groupby.skewindata=true
有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
如何Join:
關于驅動表的選取,選用join key分布最均勻的表作為驅動表
做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
大小表Join:
使用map join讓小的維度表(1000條以下的記錄條數) 先進內存。在map端完成reduce.
大表Join大表:
把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由于null值關聯不上,處理后并不影響最終結果。
count distinct大量相同特殊值
count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最后結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
group by維度過小:
采用sum() group by的方式來替換count(distinct)完成計算。
特殊情況特殊處理:
在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去。
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯(紅色字體為修改后)
select * from log a join users b on a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
解決方法2 :賦與空值分新的key值
select * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
解決方法:把數字類型轉換成字符串類型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
使用 map join 解決小表(記錄數少)關聯大表的數據傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理。 以下例子:
select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會碰到數據傾斜的問題。
解決方法:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
假如,log里user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點擊的會員不會太多,有傭金的會員不會太多等等。所以這個方法能解決很多場景下的數據傾斜問題。
計算 uv 的時候,經常會用到 COUNT(DISTINCT),但在數據比較傾斜的時候 COUNT(DISTINCT) 會比較慢。這時可以嘗試用 GROUP BY 改寫代碼計算 uv。
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329)
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid
關于COUNT(DISTINCT)的數據傾斜問題不能一概而論,要依情況而定,下面是我測試的一組數據:
測試數據:169857條
#統計每日IP CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS IP FROM logdfs WHERE logdate='2014_12_29'; 耗時:24.805 seconds #統計每日IP(改造) CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp; 耗時:46.833 seconds
測試結果表名:明顯改造后的語句比之前耗時,這是因為改造后的語句有2個SELECT,多了一個job,這樣在數據量小的時候,數據不會存在傾斜問題。
Hive 對 union all 的優化的特性:對 union all 優化只局限于非嵌套查詢。
消滅子查詢內的 group by
示例 1:子查詢內有 group by
SELECT * FROM (SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3)t3 GROUP BY c1,c2,c3
從業務邏輯上說,子查詢內的 GROUP BY 怎么都看顯得多余(功能上的多余,除非有 COUNT(DISTINCT)),如果不是因為 Hive Bug 或者性能上的考量(曾經出現如果不執行子查詢 GROUP BY,數據得不到正確的結果的 Hive Bug)。所以這個 Hive 按經驗轉換成如下所示:
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2)t3 GROUP BY c1,c2,c3
調優結果:經過測試,并未出現 union all 的 Hive Bug,數據是一致的。MapReduce 的 作業數由 3 減少到 1。
t1 相當于一個目錄,t2 相當于一個目錄,對 Map/Reduce 程序來說,t1,t2 可以作為 Map/Reduce 作業的 mutli inputs。這可以通過一個 Map/Reduce 來解決這個問題。Hadoop 的 計算框架,不怕數據多,就怕作業數多。
但如果換成是其他計算平臺如 Oracle,那就不一定了,因為把大的輸入拆成兩個輸入, 分別排序匯總后 merge(假如兩個子排序是并行的話),是有可能性能更優的(比如希爾排 序比冒泡排序的性能更優)。
消滅子查詢內的 COUNT(DISTINCT),MAX,MIN。
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT c1,c2,c3 COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3 GROUP BY c1,c2,c3;
由于子查詢里頭有 COUNT(DISTINCT)操作,直接去 GROUP BY 將達不到業務目標。這時采用 臨時表消滅 COUNT(DISTINCT)作業不但能解決傾斜問題,還能有效減少 jobs。
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3; SELECT c1,c2,c3,SUM(income),SUM(uv) FROM (SELECT c1,c2,c3,income,0 AS uv FROM t1 UNION ALL SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2) t3 GROUP BY c1,c2,c3;
job 數是 2,減少一半,而且兩次 Map/Reduce 比 COUNT(DISTINCT)效率更高。
調優結果:千萬級別的類目表,member 表,與 10 億級得商品表關聯。原先 1963s 的任務經過調整,1152s 即完成。
消滅子查詢內的 JOIN
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x GROUP BY c1,c2;
上面代碼運行會有 5 個 jobs。加入先 JOIN 生存臨時表的話 t5,然后 UNION ALL,會變成 2 個 jobs。
INSERT OVERWRITE TABLE t5 SELECT * FROM t2 JOIN t3 ON t2.id=t3.id; SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);
調優結果顯示:針對千萬級別的廣告位表,由原先 5 個 Job 共 15 分鐘,分解為 2 個 job 一個 8-10 分鐘,一個3分鐘。
使map的輸出數據更均勻的分布到reduce中去,是我們的最終目標。由于Hash算法的局限性,按key Hash會或多或少的造成數據傾斜。大量經驗表明數據傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。在此給出較為通用的步驟:
1、采樣log表,哪些user_id比較傾斜,得到一個結果表tmp1。由于對計算框架來說,所有的數據過來,他都是不知道數據分布情況的,所以采樣是并不可少的。
2、數據的分布符合社會學統計規則,貧富不均。傾斜的key不會太多,就像一個社會的富人不多,奇特的人不多一樣。所以tmp1記錄數會很少。把tmp1和users做map join生成tmp2,把tmp2讀到distribute file cache。這是一個map過程。
3、map讀入users和log,假如記錄來自log,則檢查user_id是否在tmp2里,如果是,輸出到本地文件a,否則生成<user_id,value>的key,value對,假如記錄來自member,生成<user_id,value>的key,value對,進入reduce階段。
4、最終把a文件,把Stage3 reduce階段輸出的文件合并起寫到hdfs。
如果確認業務需要這樣傾斜的邏輯,考慮以下的優化方案:
1、對于join,在判斷小表不大于1G的情況下,使用map join
2、對于group by或distinct,設定 hive.groupby.skewindata=true
3、盡量使用上述的SQL語句調節進行優化
hadoop處理數據的過程,有幾個顯著的特征:
不怕數據多,就怕數據傾斜。
對jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,沒半小時是跑不完的。map reduce作業初始化的時間是比較長的。
對sum,count來說,不存在數據傾斜問題。
對count(distinct ),效率較低,數據量一多,準出問題,如果是多count(distinct )效率更低。
優化可以從幾個方面著手:
好的模型設計事半功倍。
解決數據傾斜問題。
減少job數。
設置合理的map reduce的task數,能有效提升性能。(比如,10w+級別的計算,用160個reduce,那是相當的浪費,1個足夠)。
自己動手寫sql解決數據傾斜問題是個不錯的選擇。set hive.groupby.skewindata=true;這是通用的算法優化,但算法優化總是漠視業務,習慣性提供通用的解決方法。 Etl開發人員更了解業務,更了解數據,所以通過業務邏輯解決傾斜的方法往往更精確,更有效。
對count(distinct)采取漠視的方法,尤其數據大的時候很容易產生傾斜問題,不抱僥幸心理。自己動手,豐衣足食。
對小文件進行合并,是行至有效的提高調度效率的方法,假如我們的作業設置合理的文件數,對云梯的整體調度效率也會產生積極的影響。
優化時把握整體,單個作業最優不如整體最優。
細節上就是:
去除查詢中不需要的column
Where條件判斷等在TableScan階段就進行過濾
利用Partition信息,只讀取符合條件的Partition
Map端join,以大表作驅動,小表載入所有mapper內存中
調整Join順序,確保以大表作為驅動表
對于數據分布不均衡的表Group by時,為避免數據集中到少數的reducer上,分成兩個map-reduce階段。第一個階段先用Distinct列進行shuffle,然后在reduce端部分聚合,減小數據規模,第二個map-reduce階段再按group-by列聚合。
在map端用hash進行部分聚合,減小reduce端數據處理規模。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Hive中SQL數據傾斜及優化的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。