91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop中數據傾斜的示例分析

發布時間:2021-12-08 10:08:33 來源:億速云 閱讀:147 作者:小新 欄目:云計算

這篇文章給大家分享的是有關Hadoop中數據傾斜的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

數據分布:

    正常的數據分布理論上都是傾斜的,就是我們所說的20-80原理:80%的財富集中在20%的人手中, 80%的用戶只使用20%的功能 , 20%的用戶貢獻了80%的訪問量 , 不同的數據字段可能的數據傾斜一般有兩種情況:

一種是唯一值非常少,極少數值有非常多的記錄值(唯一值少于幾千)

一種是唯一值比較多,這個字段的某些值有遠遠多于其他值的記錄數,但是它的占比也小于百分之一或千分之一

分區:

常見的mapreduce分區方式為hash 和range ,

hash partition 的好處是比較彈性,跟數據類型無關,實現簡單(設定reduce個數就好,一般不需要自己實現)

range partition 需要實現者自己了解數據分布, 有時候需要手工做sample取樣. 同時也不夠彈性, 表現在幾個方面,1. 對同一個表的不同字段都需要實現不同的range partition,  對于時間這種字段根據查詢類型的不同或者過濾條件的不同切分range 的大小都不一定.

2 .有時候可能設計使用多個字段組合的情況, 這時候又不能使用之前單個字段的partition 類, 并且多個字段組合之間有可能有隱含的聯系,比如出生日期和星座,商品和季節.

3. 手工做sample 非常耗時間,需要使用者對查詢使用的數據集的分布有領域知識.

4. 分配方式是死的,reduce 個數是確定的,一旦某種情況下發生傾斜,調整參數

其他的分區類型還有hbase 的hregionpartitioner  或者totalorder partitioner  等.

能夠想到的關于數據傾斜的一些解決方式(歡迎補充,尤其是有沒有做搜索或者數據挖掘的朋友有碰到類似問題):

1. 增加reduce 的jvm內存

2. 增加reduce 個數

3. customer partition

4. 其他優化的討論.

5. reduce sort merge排序算法的討論

6. 正在實現中的hive skewed join.

7. pipeline

8. distinct

9. index 尤其是bitmap index

方式1:既然reduce 本身的計算需要以合適的內存作為支持,在硬件環境容許的情況下,增加reduce 的內存大小顯然有改善數據傾斜的可能,這種方式尤其適合數據分布第一種情況,單個值有大量記錄, 這種值的所有紀錄已經超過了分配給reduce 的內存,無論你怎么樣分區這種情況都不會改變. 當然這種情況的限制也非常明顯, 1.內存的限制存在,2.可能會對集群其他任務的運行產生不穩定的影響.

方式2:  這個對于數據分布第二種情況有效,唯一值較多,單個唯一值的記錄數不會超過分配給reduce 的內存. 如果發生了偶爾的數據傾斜情況,增加reduce 個數可以緩解偶然情況下的某些reduce 不小心分配了多個較多記錄數的情況. 但是對于第一種數據分布無效.

方式3: 一種情況是某個領域知識告訴你數據分布的顯著類型,比如hadoop definitive guide 里面的溫度問題,一個固定的組合(觀測站點的位置和溫度) 的分布是固定的, 對于特定的查詢如果前面兩種方式都沒用,實現自己的partitioner 也許是一個好的方式.

方式4: 目前有的一些針對數據傾斜的優化比如pig 的skewed join

http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins

pig 文檔上面說是根據數據輸入的統計信息來確定分區(也就是range partition?),另外不清楚這個行為是否是動態運行時候才決定的,也就是運行之前有一步pig 自動做sample 的工作,因為pig 是沒有統計信息這一說的.

hive 中的group by

<property> 
  <name>hive.groupby.skewindata</name> 
  <value>false</value> 
  <description>Whether there is skew in data to optimize group by queries</description> 
</property> 
<property> 
  <name>hive.optimize.groupby</name> 
  <value>true</value> 
  <description>Whether to enable the bucketed group by from bucketed partitions / tables.</description> 
</property>

<property> 
  <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name> 
  <value>0.3</value> 
  <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description> 
</property> 
<property> 
  <name>hive.groupby.mapaggr.checkinterval</name> 
  <value>100000</value> 
  <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description> 
</property>

其中最后一個參數hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 相似, in-memeory combiner  是發生在mapper 端sort 之前,而不是現在的combiner發生在mapper sort 之后甚至在寫入磁盤之后重新讀磁盤然后排序合并. in-memeory combiner 最早好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介紹ppt 里面好像提到它們也有這個優化. mapper 端減少數據的機會比reduce 端的要大,所以一般不會看到reduce 端的combiner 的討論,但是這種思路也有,比如google tenzing 的join 討論里面有一個prev-next 的小優化就是基于reduce 端的combiner, 但那個前提是基于block shuffle 實現的基礎上,數據已經排過序了,所以join 時候前一條數據跟后一條數據相同的概率很大.

hive 中的skewed join :  之前的文章已經介紹過兩表join 中hive 的幾個優化,其中的skewed join 的類似思路就是上面介紹的skewed 的第二種:增加reduce 的個數,hive 中是通過判斷閾值如果大于一個reduce 需要處理的數據量,重新起額外的task 來處理這些超額的reduce 本身需要處理的數據, 這是一種較晚的補救措施,本身hive 開始分區的時候已經傾斜(partition 的方式不合理), 當運行的時候通過運行時監控reduce 發現傾斜的特殊key 然后額外的起task 去處理,效果比較一般,感興趣的同學可以參考HIVE-3086 里面我和facebook 團隊對這種優化思路的討論. 第六節我會討論一下我所認為的思路和facebook 正在做的思路之間的差別.

方式5 :  reduce 分配的內存遠小于處理的數據量時,會產生multi-pass sort 的情況是瓶頸,那么就要問

1. 這種排序是有必要的嘛?

2. 是否有其他排序算法或優化可以根據特定情況降低他瓶頸的閾值?

3. map reduce 適合處理這種情況嘛?

關于問題1. 如果是group by , 那么對于數據分布情況1 ,hash 比sort 好非常多,即使某一個reduce 比其他reduce 處理多的多的數據,hash 的計算方式也不會差距太大.

問題2. 一個是如果實現block shuffle 肯定會極大的減少排序本身的成本, 另外,如果分區之后的reduce 不是使用copy –> sort-merge –> reduce 的計算方式, 在copy 之后將每個block 的頭部信息保存在內存中,不用sort – merge 也可以直接計算reduce, 只不過這時候變成了隨機訪問,而不是現在的sort-merge 之后的順序訪問. block shuffle 的實現有兩種類型,一種是當hadoop 中真正有了列數據格式的時候,數據有更大的機會已經排過序并且按照block 來切分,一般block 為1M ( 可以關注avro-806 )  , 這時候的mapper 什么都不做,甚至連計算分區的開銷都小了很多倍,直接進入reduce 最后一步,第二種類型為沒有列數據格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在內存中保存最大最小值,copy  完成后直接用這個值來做隨機讀然后進行reduce. ( block shuffle  的實現可以關注 MAPREDUCE-4039 , hash 計算可以關注 MAPREDUCE-1639)

問題3 . map reduce 只有兩個函數,一個map 一個 reduce, 一旦發生數據傾斜就是partition 失效了,對于join 的例子,某一個key 分配了過多的記錄數,對于只有一次partittion的機會,分配錯了數據傾斜的傷害就已經造成了,這種情況很難調試,但是如果你是基于map-reduce-reduce 的方式計算,那么對于同一個key 不需要分配到同一個reduce 中,在第一個reduce 中得到的結果可以在第二個reduce 才匯總去重,第二個reduce 不需要sort – merge 的步驟,因為前一個reduce 已經排過序了,中間的reduce 處理的數據不用關心partition 怎么分,處理的數據量都是一樣大,而第二個reduce 又不使用sort-merge 來排序,不會遇到現在的內存大小的問題,對于skewed join 這種情況瓶頸自然小很多.

方式6:  目前hive 有幾個正在開發中的處理skewed join 情況的jira case,  HIVE-3086 , HIVE-3286 ,HIVE-3026 . 簡單介紹一下就是facebook 希望通過手工處理提前枚舉的方式列出單個傾斜的值,在join 的時候將這些值特殊列出當作map join 來處理,對于其他值使用原來的方式. 我個人覺得這太不伸縮了,值本身沒有考慮應用過濾條件和優化方式之后的數據量大小問題,他們提前列出的值都是基于整個分區的. join key 如果為組合key 的情況也應該沒有考慮,對metastore 的儲存問題有限制,對輸入的大表和小表都會scan 兩次( 一次處理非skew key , 一次處理skew key 做map join), 對輸出表也會scan 兩次(將兩個結果進行merge) , skew key 必須提前手工列出這又存在額外維護的成本,目前因為還沒有完整的開發完到能夠投入生產的情況,所以等所有特性處理完了有了文檔在看看這個處理方式是否有效,我個人認為的思路應該是接著bucked map join 的思路往下走,只不過不用提前處理cluster key 的問題, 這時候cluster key 的選擇應該是join key + 某個能分散join key 的列, 這等于將大表的同一個key 的值分散到了多個不同的reduce 中,而小表的join key 也必須cluster 到跟大表對應的同一個key , join 中對于數據分布第二種情況不用太難,增加reduce 個數就好,主要是第一種,需要大表的join key 能夠分散,對于同樣join key 的小表又能夠匹配到所有大表中的記錄. 這種思路就是不用掃描大表兩遍或者結果輸出表,不需要提前手工處理,數據是動態sample 的應用了過濾條件之后的數據,而不是提前基于統計數據的不準確結果. 這個基本思路跟tenzing 里面描述的distributed hash join 是一樣的,想辦法切成合適的大小然后用hash 和 map join .

方式7: 當同時出現join 和group 的時候, 那么這兩個操作應該是以pipeline (管道) 的方式執行. 在join 的時候就可以直接使用group 的操作符減少大量的數據,而不是等待join 完成,然后寫入磁盤,group 又讀取磁盤做group操作. HIVE-2206 正在做這個優化. hive 里面是沒有pipeline 這個概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有這種概念的.

方式8: distinct 本身就是group by 的一種簡寫,我原先以為count(distinct x)這種跟group by 是一樣的,但是發現hive 里面distinct 明顯比group by 要慢,可能跟group by 會有map 端的combiner有關, 另外觀察到hive 在預估count(distinct x) 的reduce 個數比group by 的個數要少 , 所以hive 中使用count(distinct x) , 要么盡量把reduce 個數設置大,直接設置reduce 個數或者hive.exec.reducers.bytes.per.reducer 調小,我個人比較喜歡調后面一個,hive 目前的reduce 個數沒有統計信息的情況下就是用map端輸入之前的數值, 如果你是join 之后還用count(distinct x) 的話,這個默認值一般都會悲劇,如果有where 條件并能過濾一定數量的數據,那么默認reduce 個數可能就還好一點. 不管怎樣,多浪費一點reduce slot 總比等十幾甚至幾十分鐘要好, 或者轉換成group by 的寫法也不錯,寫成group by 的時候distributed by 也很有幫助.

方式9: hive 中的index 就是物化視圖,對于group by 和distinct 的情況等于變成了map 端在做計算,自然不存在傾斜. 尤其是bitmap index , 對于唯一值比較少的列優勢更大,不過index 麻煩的地方在于需要判斷你的sql 是不是常用sql , 另外如果create index 的時候沒有選你查詢的時候用的字段,這個index 是不能用的( hive 中是永遠不可能有DBMS中的用index 去lookup 或者join 原始表這種概念的)

其他建議:

網上能找到的另外一份很好的描述數據傾斜的資料是

http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf

里面的map side skew 和expensive record 都不是關系型計算中的問題,所以不是這篇文章關注點. 對于關系型計算,其中數據傾斜影響最大的地方在reduce 的sort. 這篇文章里面最后總結的5點好的建議值得參考,

其中第三條需要你知道應用combiner 和特殊優化方式是否帶來了性能的提升,hive 的map aggr 在數據分布情況1效果會比較好,數據分布情況2效果就不大,還有combiner 應用的時候是消耗了系統資源的,確認這種消耗是否值得而不是任何情況下都使用combiner. 

對于第四點關系型計算中map 傾斜情況不太常見. 一種可以舉出來的例子是分區不合理,或者hive 中的cluster by 的key 選擇不合理(都是使用目錄的方式分區, 目錄是最小處理單元了).

  • Use domain knowledge when choosing the 
    map output partitioning scheme if the reduce operation is 
    expensive: Range partition or some other form of explicit 
    partition may be better than the default hash-partition

  • Try different partitioning schemes on sample 
    workloads or collect the data distribution at the reduce input 
    if a MapReduce job is expected to run several times

  • Implement a combiner to reduce the amount 
    of data going into the reduce-phase and, as such, significantly 
    dampen the effects of any type of reduce-skew

  • Use a pre-processing MapReduce job that 
    extracts properties of the input data in the case of a longruning, 
    skew-prone map phase. Appropriately partitioning the 
    data before the real application runs can significantly reduce 
    skew problems in the map phase.

  • Best Practice 5. Design algorithms whose runtime depends 
    only on the amount of input data and not the data distribution.

    另外一份是淘寶的數據傾斜總結:

    http://www.alidata.org/archives/2109

    不過我個人覺得幫助不是太大,里面第一個解決方式空值產生的影響第一個Union All 的方式個人是極力反對的,同一個表尤其是大表掃描兩遍這額外的成本跟收益太不匹配,不推薦,第二個將特殊值變成random 的方式, 這個產生的結果是正確的嘛? 尤其是在各種情況下輸出結果是正確的嘛?里面背景好像是那個小表users 的主鍵為userid, 然后userid 又是join key , 而且還不為空? 不太推薦,背景條件和輸出的正確性與否存疑.

    第二個數據類型不同的問題我覺得跟HIVE-3445 都算是數據建模的問題,提前修改好是一樣的.

    第三個是因為淘寶的hadoop 版本中沒有map side hash aggr 的參數吧. 而且寫成distinct 還多了一個MR 步驟,不太推薦.

     

    數據傾斜在MPP 中也是一個課題,這也設計到一個數據重分配的問題,但是相對于MPP 中有比較成熟的機制,一個是mpp 在處理數據初始分布的時候總是會指定segmented by 或者distributed by 這種顯示分配到不同物理機器上的建表語句. 還有就是統計信息會幫助執行引擎選擇合適的重新分布.但是統計信息也不是萬能的,比如

    1:統計信息的粒度和更新問題.

    2: 應用了過濾條件之后的數據也許不符合原始期望的數據分布.

    3: 統計信息是基于采樣的,總于真實所有數據存在誤差.

    4: 統計信息是基于partittion 的, 對于查詢沒有涉及到partition 字段的切分就不能使用各partition 只和來表示總體的統計信息.

    5. 臨時表或者多步驟查詢的中間過程數據沒有統計信息的情況.

    6. 各種其他的算法優化比如in-mapper combiner 或者google Tenzing 的prev – next combine 都會影響統計信息對于算法選擇的不同.

感謝各位的閱讀!關于“Hadoop中數據傾斜的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

右玉县| 平邑县| 积石山| 伊宁县| 都昌县| 信丰县| 克什克腾旗| 东乡族自治县| 永宁县| 鲁甸县| 岗巴县| 来安县| 武川县| 梁河县| 中阳县| 阳谷县| 仲巴县| 元江| 延寿县| 娄底市| 河西区| 苍梧县| 墨竹工卡县| 青神县| 白河县| 江孜县| 文山县| 岳阳市| 黄冈市| 利津县| 卢湾区| 延吉市| 黄梅县| 辽宁省| 塔城市| 桑植县| 秦皇岛市| 无极县| 义马市| 海淀区| 日土县|