您好,登錄后才能下訂單哦!
本篇內容介紹了“Hadoop的特點有哪些”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1 Hadoop 簡介
1.1 Hadoop 由來
數據容量
大數據時代數據量超級大,數據具有如下特性:
Volume(大量)
Velocity(高速)
Variety(多樣)
Value(低價值密度)
以前的存儲手段跟分析方法現在行不通了!Hadoop 就是用來解決海量數據的 存儲 跟海量數據的 分析計算 問題的,創始人 Doug Cutting 在創建 Hadoop 時主要思想源頭是 Google 三輛馬車
第一輛 GFS 產生了 HDFS。
第二輛 MapReduce 產生了MR。
第三輛 BigTable 產生了HBase。
現在說的 Hadoop 通常指的是 Hadoop 生態圈 這樣一個廣義概念,如下:
大數據知識體系
1.2 Hadoop 特點
1.2.1 Hadoop 特點
高可用
Hadoop 底層對同一個數據維護這多個復本,即使Hadoop某個計算元素或者存儲出現問題,也不會導致數據的丟失。
高擴展
在集群之間分配任務數據,可以方便的擴展跟刪除多個節點,比如美團節點就在3K~5k 個節點
高效性
在MapReduce的思想下 Hadoop是并行工作的,以加快任務的處理速度
高容錯性
如果一個子任務速度過慢或者任務失敗 Hadoop會有響應策略會自動重試跟任務分配。
1.2.2 Hadoop 架構設計
Hadoop 的 1.x 跟 2.x 區別挺大,2.x 主要是將1.x MapReduce中資源調度的任務解耦合出來交 Yarn 來管理了(接下來本文以2.7開展探索)。
1.x跟2.x變化
HDFS
Hadoop Distributed File System 簡稱 HDFS,是一個分布式文件系統。HDFS 有著高容錯性,被設計用來部署在低廉的硬件上來提供高吞吐量的訪問應用程序的數據,適合超大數據集的應用程序。
MapReduce
MapReduce是一種編程模型,包含Map(映射) 跟 Reduce(歸約)。你可以認為是歸并排序的深入化思想。
Yarn
Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度,它的引入為集群在利用率、資源統一管理和數據共享等方面帶來了巨大好處。
Common 組件
log組件。
獨有RPC體系ipc、I/O系統、序列化、壓縮。
配置文件conf。
公共方法類,比如checkSum校驗。
2 HDFS
產生背景:
隨著數據量變大,數據在一個OS的磁盤無法存儲了,需要將數據分配到多個OS管理的磁盤中,為了方面管理多個OS下的磁盤文件,迫切需要一種系統來管理多臺機器上的文件,這就是分布式文件管理系統,HDFS 是通過目錄樹定位文件。需注意 HDFS 只是分布式文件系統中的其中一種。
2.1 HDFS 優缺點
2.1.1 優點
高容錯性
數據會自動保存多個副本,默認為3個,通過增加副本來提高容錯性。
某個副本丟失后系統會自動恢復。
高擴展性
HDFS集群規模是可以動態伸縮的。
適合大數據處理
數據規模達到GB/TB/PB級別。
文件規模達到百萬規模以上。
流式訪問,它能保證數據的一致性。
低成本,部署廉價機器 提高了商業化能了。
統一對外接口,Hadoop本身用Java編寫,但基于此的應用程序可以用其他語言編寫調用。
2.1.1 缺點
做不到低延時
Hadoop對高吞吐做了優化,犧牲了獲取數據的延遲,比如毫秒級獲取數據在Hadoop上做不到。
不適合存儲大量小文件
存儲大量小文件的話,它會占用 NameNode 大量的內存來存儲文件、目錄和塊信息。因此該文件系統所能存儲的文件總數受限于 NameNode 的內存容量,根據經驗,每個文件、目錄和數據塊的存儲信息大約占150字節。
小文件存儲的尋道時間會超過讀取時間,它違反了HDFS的設計目標。
無法修改文件
對于上傳到HDFS上的文件,不支持修改文件,僅支持追加。HDFS適合一次寫入,多次讀取的場景。
無法并發寫入
HDFS不支持多用戶同時執行寫操作,即同一時間,只能有一個用戶執行寫操作。
2.2 HDFS 組成架構
2.2.1 Client
客戶端主要有如下功能:
文件切分,文件上傳 HDFS 的時候,Client 將文件切分成一個一個的Block,然后進行存儲。
與 NameNode 交互,獲取文件的位置信息。
與 DataNode 交互,讀取或者寫入數據。
Client 提供一些命令來管理 HDFS,比如啟動或者關閉 HDFS。
Client 可以通過一些命令來訪問 HDFS。
2.2.2 NameNode
NameNode 簡稱NN,就是HDFS中的 Master,是個管理者,主要有如下功能:
管理HDFS的名稱空間。
配置副本策略
處理客戶端讀寫請求。
管理數據塊(Block)映射信息。
映射信息:NameNode(文件路徑,副本數,{Block1,Block2},[Block1:[三個副本路徑],Block2:[三個副本路徑]])
2.2.3 DataNode
DataNode 簡稱 DN 就是HDFS集群中的Slave,NameNode 負責下達命令,DataNode執行實際的操作。
存儲實際的數據塊。
執行數據塊的讀/寫操作。
上面說過數據目錄信息存儲在NN中,而具體信息存儲在DN中,很形象的比喻如下
NN跟DN對比
DataNode 的工作機制
數據塊存儲在磁盤信息 包括 數據 + 數據長度 + 校驗和 + 時間戳。
DataNode 啟動后向 NameNode注冊,周期性(1小時)的向 NameNode 上報所有的塊信息。
NN 跟 DN 之間 心跳 3秒一次,心跳返回結果帶有 NameNode 給該 DataNode 的命令如復制塊數據到另一臺機器,或刪除某個數據塊。如果超過10分鐘沒有收到某個 DataNode 的心跳,則認為該節點不可用。
集群運行中可以安全加入和退出一些機器。
DataNode 確保數據完整性
當 DataNode 讀取 Block 的時候,它會計算 CheckSum。
如果計算后的 CheckSum,與 Block 創建時值不一樣,說明 Block 已經損壞。
Client 讀取其他 DataNode 上的 Block。
DataNode 在其文件創建后周期驗證 CheckSum
DN 進程死亡或無法跟 NN 通信后 NN 不會立即將 DN 判死,一般經過十分鐘 + 30秒再判刑。
2.2.4 Secondary NameNode
當 NameNode 掛掉的時候,它并不能馬上替換 NameNode 并提供服務。需要通過 HA等手段實現自動切換。SNN 主要提供如下功能:
輔助 NameNode,分擔其工作量。
定期合并 Fsimage 和 Edits,并推送給 NameNode。
在緊急情況下,可輔助恢復 NameNode。
2.2.5 Block
HDFS中的文件在物理上是分塊 Block 存儲的,在 1.x 版本中塊 = 64M,2.x中塊 = 128M。塊不是越大越好,也不是越小越好。因為用戶獲取數據信息時間 = 尋址塊時間 + 磁盤傳輸時間。
塊太小會增加尋址時間,程序大部分耗時在尋址上了。
快太大則會導致磁盤傳輸時間明顯大于尋址時間,程序處理塊數據時較慢。
2.3 HDFS 寫流程
2.3.1 具體寫流程
寫流程
客戶端通過 Distributed FileSystem 模塊向 NameNode 請求上傳文件,NameNode檢查目標文件是否已存在,父目錄是否存在。
NameNode 返回是否可以上傳。
客戶端請求第一個 Block上傳到哪幾個 DataNode 服務器上。
NameNode 返回3個 DataNode 節點,分別為dn1、dn2、dn3。
客戶端通過 FSDataOutputStream 模塊請求dn1上傳數據,dn1收到請求會繼續調用dn2,然后dn2調用dn3,將這個通信管道建立完成。
dn1、dn2、dn3逐級應答客戶端。
客戶端開始往dn1上傳第一個Block(先從磁盤讀取數據放到一個本地內存緩存),以Packet為單位,dn1收到一個Packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答隊列等待應答。
當一個 Block 傳輸完成之后,客戶端再次請求NameNode上傳第二個Block的服務器。(重復執行3-7步)。
2.3.2 節點距離計算
在 HDFS 寫數據的過程中,NameNode 會選擇距離待上傳數據最近距離的DataNode接收數據。
最近距離 = 兩個節點到達最近的共同祖先的距離總和。
節點距離計算
Distance(/d1/r1/n0,/d1/r1/n0) = 0 同一節點上的進程
Distance(/d1/r1/n1,/d1/r1/n2) = 2 同一機架上不同節點
Distance(/d1/r2/n0,/d1/r3/n2) = 4 同一數據中心不同機架節點
Distance(/d1/r2/n1,/d2/r4/n1) = 6 不同數據中心
2.3.3 副本節點選擇
第一個副本在Client所在節點上,如果在集群外則隨機選個。
第二個副本跟第一個副本位于同機架不同節點
第三個部分位于不同機架,隨機節點。
機架感知
2.4 HDFS 讀流程
讀流程
客戶端通過 Distributed FileSystem 向 NameNode 請求下載文件,NameNode 通過查詢元數據,找到文件塊所在的 DataNode 地址。
挑選一臺 DataNode(就近原則,然后隨機)服務器,請求讀取數據。
DataNode 開始傳輸數據給客戶端(從磁盤里面讀取數據輸入流,以Packet為單位來做校驗)。
客戶端以 Packet 為單位接收,先在本地緩存,然后寫入目標文件。
2.5 NameNode 和 Secondary NameNode
2.5.1 NN 和 2NN 工作機制
NameNode 中元數據單獨存到磁盤不方便讀寫。單獨存到內存時,斷電會丟失。Hadoop 采用的是如下方式。
FsImage :
元數據序列化后在磁盤存儲的地方。包含HDFS文件系統的所有目錄跟文件inode序列化信息。
Memory:
元數據在內存中存儲的地方。
Edit 文件:
Edit 記錄客戶端更新元數據信息的每一步操作(可通過Edits運算出元數據)。
一旦元數據有更新跟添加,元數據修改追加到Edits中然后修改內存中的元數據,這樣一旦NameNode 節點斷電,通過 FsImage 跟 Edits 的合并生成元數據。
Edits文件不要過大,系統會定期的由 Secondary Namenode 完成 FsImage 和 Edits 的合并。
NN跟2NN工作機制
第一階段:NameNode 啟動
第一次啟動 NameNode 格式化后,創建 Fsimage 和 Edits 文件。如果不是第一次啟動,直接加載編輯日志和鏡像文件到內存。
客戶端對元數據進行增刪改的請求。
NameNode 記錄操作日志,更新滾動日志。
NameNode 在內存中對數據進行增刪改。
第二階段:Secondary NameNode 工作
Secondary NameNode 詢問 NameNode 是否需要 CheckPoint。直接帶回 NameNode 是否檢查結果。一般下面條件任意滿足即可:
CheckPoint 默認1小時執行一次。
一分鐘檢查一次Edits文件操作次數,達閾值 CheckPoint 。
Secondary NameNode 請求執行 CheckPoint。
NameNode 滾動正在寫的 Edits 日志。
將滾動前的編輯日志Edit_001 和 鏡像文件FsImage 拷貝到 Secondary NameNode。
Secondary NameNode 加載編輯日志和鏡像文件到內存并合并。
生成新的鏡像文件 FsImage.chkpoint。
拷貝 FsImage.chkpoint 到 NameNode。
NameNode 將 FsImage.chkpoint 重新命名成 FsImage。
2.6 安全模式
NameNode 剛啟動時候系統進入安全模式(只讀),如果整個文件系統中99.9%塊滿足最小副本,NameNode 會30秒后退出安全模式。
2.6.1 NameNode 啟動
將 FsImage 文件載入內存再執行Edits文件各種操作,最終內存生成完整的元數據鏡像。
創建個新的 FsImage 跟空 Edits 文件。
NameNode 開始監聽 DataNode。
整個過程 NameNode 一直運行在安全模式,NameNode 對于 Client 是只讀的。
2.6.2 DataNode 啟動
系統數據塊位置不是由 NameNode 維護的,而是以塊列表形式存儲在 DataNode 中。
安全模式下 DataNode 向 NameNode 發送最新塊列表信息,促使 NameNode 高效運行。
正常運行期 NameNode 內存中保留所有塊位置映射信息。
2.7 HDFS-HA
HDFS 集群中 NameNode存在單點故障(SPOF),為了實現 High Available,其實包括 HDFS-HA 和YARN-HA。HDFS 可以 通過配置Active/Standby 兩個 NameNodes 實現在集群中對 NameNode 的熱備來解決上述問題。如果出現故障,如機器崩潰或機器需要升級維護,可將NameNode很快的切換到另外一臺機器。實現 HA 功能主要依賴ZooKeeper 跟 ZKFC 進程。
HA故障轉移
2.7.1 HDFS-HA工作要點
元數據管理方式需要改變
內存中各自保存一份元數據。
Edits 日志只有 Active 狀態的 NameNode 節點可以做寫操作。
兩個 NameNode 都可以讀取 Edits。
共享的 Edits 放在一個共享存儲中管理(qjournal 或 NFS)。
需要一個狀態管理功能模塊
實現了一個ZKFC,常駐在每一個namenode所在的節點,每一個ZKFC負責監控自己所在NameNode節點,利用zk進行狀態標識,當需要進行狀態切換時,由ZKFC來負責切換,切換時需要防止brain split現象的發生。
必須保證兩個 NameNode 之間能夠ssh無密碼登錄
防腦裂,同一時刻僅僅有一個 NameNode 對外提供服務。
2.7.2 ZooKeeper
ZooKeeper 提供如下功能:
故障檢測:集群中每個 NameNode 在 ZooKeeper 中維護一個持久會話,如果機器崩潰,ZooKeeper中的會話將終止,ZooKeeper通知另一個NameNode需要觸發故障轉移。
現役NameNode選擇:ZooKeeper提供了一個簡單的機制用于唯一的選擇一個節點為active狀態。如果目前現役NameNode崩潰,另一個節點可能從ZooKeeper獲得特殊的排外鎖以表明它應該成為現役NameNode。
2.7.3 ZKFC進程
在 NameNode 主機上有個 ZKFC(ZKFailoverController) 這樣的ZK客戶端,負責監視管理 NameNode 狀態。ZKFC負責:
健康監測:ZKFC周期性檢測同主機下NameNode監控撞庫。
ZooKeeper會話管理:NameNode健康時候ZKFC保持跟ZK集群會話打開狀態,ZKFC還持有個znode鎖,如果會話終止,鎖節點將自動刪除。
基于ZooKeeper的選擇:ZKFC發現本地NameNode健康前提下會嘗試獲取znode鎖,獲得成功則Active狀態。
3 MapReduce
MapReduce是個分布式運算程序的編程框架,是基于 Hadoop 的 數據分析計算核心框架。處理過程分為兩個階段:Map 階段跟 Reduce 階段。
Map 負責把一個任務分解成多個任務。該階段的 MapTask 并發實例,完全并行運行,互不相干。
Reduce 負責把多個任務處理結果匯總。該階段的 ReduceTask 并發實例互不相干,但是他們的數據依賴于上一個階段的所有 MapTask 并發實例的輸出。
MapReduce 編程模型只能包含一個 Map 階段和一個 Reduce 階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序串行運行。
用戶編寫MR任務時候 程序實現部分分成三個部分:Mapper、Reducer、Driver(提交運行mr程序的客戶端)。
3.1 優缺點
3.1.1 優點
易于編程
簡單實現了一些接口就可以完成個分布式程序,你寫個分布式程序跟寫個串行化程序一樣,類似八股文編程。
良好的擴展
計算資源不足時可以簡單的增加機器來擴展計算能力。
高容錯性
MapReduce任務部署在多臺機器上后如果其中一臺掛了,系統會進行自動化的任務轉移來保證任務正確執行。
適合PB級數據離線處理
比如 美團3K個節點的集群并發,提供超大數據處理能力。
3.1.2 缺點
不擅長實時計算
MapReduce 不會想 MySQL 一樣毫秒級返回結果。
不擅長流式計算
流式計算的 輸入數據是動態的,而 MapReduce 的輸入數據集是靜態的。
不擅長DAG計算
多個應用程序存在依賴關系,MapReduce的作業結果會落盤導致大量磁盤IO,性能賊低,此時上Spark吧!
3.2 序列化
序列化
把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便于存儲(持久化)和網絡傳輸。
反序列化
將收到字節序列(或其他數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。
因為 Hadoop 在集群之間進行通訊或者 RPC 調用時是需要序列化的,而且要求序列化要快、且體積要小、占用帶寬要小。而Java自帶的序列化是重量級框架,對象序列化后會附帶額外信息,比如各種校驗信息,header,繼承體系等。所以 Hadoop 自研了序列化框架。
Java類型 | Hadoop Writable類型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
3.3 MapTask 并行度
數據塊:Block 是 HDFS 物理上把數據分成一塊一塊。
數據切片:數據切片只是在邏輯上對輸入進行分片,并不會在磁盤上將其切分成片進行存儲。
切片核心注意點:
一個 Job 的 Map 階段并行度又客戶端提交Job時的切片數決定
每個 Split 切片分配個 MapTask 并行實例處理
模型情況下 切片大小 = BlockSize
切片時不會考慮數據集整體大小,而是逐個針對每個文件單獨切片的。
3.3.1 FileInputFormat 切片源碼追蹤
FileInputFormat切片源碼追蹤
程序先找到目標數據存儲目錄
開始遍歷目錄下每個文件。每個文件都會做如下操作
獲取切片大小,默認情況下切片大小 = blocksize
開始切片,每次切片都要判斷剩余部分是否大于塊的1.1倍,不大于則就劃分到一個切片。
切片信息寫到切片規劃文件中。
切片核心過程在getSplit方法完成。
InputSplit只是記錄了切片元數據信息,如起始位置、長度跟所在節點列表等。
3.3.2 切片大小計算
SplitSize= Math.max(minSize,Math.min(maxSize,blockSize))
mapreduce.input.fileinputformat.split.minsize 默認 1
mapreduce.input.fileinputformat.split.maxsize 默認 Long.MAXValue
blockSize 默認128M
maxsize :該參數如果比blockSize小灰導致切片變小,且就等于配置的整個參數。
minsize :該參數如果調的比blockSize大,則切片大小會比blockSize還大。
3.3.3 切片舉例
切片舉例
3.4 FileInputFormat
3.4.1 實現類簡介
MR任務輸入文件個數各有不同,針對不同類型MR定義了一個接口跟若干實現類來讀取不同的數據。
input繼承關系
TextInputFormat
默認使用類,按行讀取每條數據,Key是該行數據的 offset,Value = 行內容。
KeyValueTExtInputFormat
每行都是一條記錄,被指定分隔符分割為Key跟Value,默認是 \t 。
NLineInputFormat
該模型下每個 map 處理 InputSplit 時不再按照 Block 塊去劃分,而是按照指定的行數N來劃分文件。
自定義InputFormat
基礎接口,改寫 RecordReader,實現一次讀取一個完整文件封裝為 KV,使用 SequenceFileOutPutFormat 輸出合并文件。
CombineTextInputFormat
用于小文件過多場景,邏輯上合并多個小文件個一個切片任務。較重要 中
3.4.2 CombineTextInputFormat
默認框架 TextInputFormat 切片機制是對任務按文件規劃切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產生大量的MapTask,處理效率極其低下。CombineTextInputFormat 可以將多個小文件從邏輯上規劃到一個切片中,這樣多個小文件就可以交給一個MapTask處理。主要包含 虛擬存儲過程 跟 切片過程。
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
虛擬存儲過程:
文件 <= SplitSize 則單獨一塊。
1 * SplitSize < 文件 < 2 * SplitSize 時對半分。
文件 >= 2*SplitSize時,以 SplitSize 切割一塊,剩余部分若 < 2 * SplitSize 則對半分。
切片過程:
判斷虛擬存儲的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨形成一個切片。
如果不大于則跟下一個虛擬存儲文件進行合并,共同形成一個切片。
切片過程
3.6 OutputFormat
OutputFormat 是 MapReduce 輸出的基類,常見的實現類如下:
3.5.1 TextOutputFormat
系統默認輸出格式,把每條記錄寫為文本行,他的K跟V是任意類型,系統在寫入時候會統一轉化為字符串。
3.5.2 SequenceFileOutputFormat
此模式下的輸出結果作為后續MapReduce任務的輸入,該模式下數據格式緊湊,很容易被壓縮。
3.5.3 自定義OutputFormat
如果需求不滿足可按需求進行自定義。
自定義類繼承自FileOutputFormat。
重寫RecordWriter,改寫具體輸出數據的方法write。
3.6 MapReduce 流程
3.6.1 整體流程圖
MapReduce流程
MapTask 工作機制
Read階段:MapTask 通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
Map階段:將解析出的key/value交給用戶編寫map()函數處理,并產生一系列新的key/value。
Collect收集階段:它會將生成的key/value分區(調用Partitioner),并寫入一個環形內存緩沖區中。
Spill階段:先按照分區進行排序,然后區內按照字典對key進行快排,并在必要時對數據進行合并、壓縮等操作。
Combine階段:選擇性可進行MapTask內的優化提速。
ReduceTask 工作機制
Copy階段:從所有的MapTask中收集結果然后決定將數據放入緩存還是磁盤。
Merge階段:copy數據時后天會對磁盤還有內存數據進行Merge。
Sort階段:ReduceTask需對所有數據進行一次歸并排序,方便執行reduce 函數。
Reduce階段:調用用戶 reduce() 函數將計算結果寫到HDFS上。
3.6.2 Shuffle
Shuffle機制
MapReduce 的核心就是 Shuffle 過程,Shuffle 過程是貫穿于 map 和 reduce 兩個過程的!在Map端包括Spill過程,在Reduce端包括copy和sort過程。 具體Shuffle過程如下:
MapTask 收集我們的map()方法輸出的kv對,放到內存緩沖區中。
從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件,溢出前會按照分區針對key進行區內快排。
多個溢出文件會被合并成大的溢出文件。
在溢出過程及合并的過程中,都要調用 Partitioner 進行分區和針對key進行排序。
ReduceTask 根據自己的分區號,去各個 MapTask 機器上取相應的結果分區數據。
ReduceTask 對收集后的數據進行合并跟歸并排序。
進入 ReduceTask 的邏輯運算過程,調用用戶自定義的reduce()方法。
Shuffle 中的緩沖區大小會影響到 MapReduce 程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
3.6.3 Partition
MapReduce 默認的分區方式是hashPartition,在這種分區方式下,KV 對根據 key 的 hashcode 值與reduceTask個數進行取模,決定該鍵值對該要訪問哪個ReduceTask。
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; // numReduceTasks 默認 = 1 所以導致默認的reduce結果 = 1 }
自定義的時候一般就是類繼承Partitioner然后重寫getPartition 方法。用戶也可以設置ReduceTask數量,不過會遵循如下規則。
如果 ReduceTask 數 > getPartition 數, 會多產生幾個空的輸出part-r-000xx。
如果 1 < ReduceTask < getPartition 數,會有部分數據無法安放導致報錯。
如果ReduceTask = 1,不管MapTask端輸出多少分區文件結果都是一個文件。
分區必須從0開始,逐步累加。
比如 假設自定義分區數為5。
job.setNumReduceTasks(1):會正常運行,只不過會產生一個輸出文件。
job.setNumReduceTasks(2):會報錯。
job.setNumReduceTasks(6):大于5,程序會正常運行,會產生空文件。
3.6.4 環形緩沖區
Map 的輸出結果由 Collector 處理,每個 Map 任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。
環形數據結構其實就是個字節數組byte[],叫kvbuffer,默認值100M。里面主要存儲 數據 跟 元數據。中間有個分界點,并且分界點是變化的。當環形緩沖區寫入的buffer的大小達到 80% 滿足溢寫條件的時候,開始溢寫spill。系統有兩個線程一個負責寫入數據,一個負責spill數據。
數據:
存儲 Key + Value + bufindex。其中 bufindex(即數據的存儲方向)是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
元數據:
元數據是為了排序而生,是關于數據描述的數據。
Kvmeta = Partition + keystart + valstart + valLength , 共占用4個Int長度,其中K的長度 = V的起點 - K的起點。
Kvmeta 的存放指針 Kvindex 每次都是向下跳四個 格子,然后再向上一個格子一個格子地填充四元組的數據。比如Kvindex初始位置是-4,當第一個鍵值對寫完之后,(Kvindex+0)的位置存放partition的起始位置、(Kvindex+1)的位置存放keystart、(Kvindex+2)的位置存放valstart、(Kvindex+3)的位置存放value length,然后Kvindex跳到 -8位置,等第二個鍵值對和索引寫完之后,Kvindex跳到-12位置。
kvmeta.put(kvindex + PARTITION, partition); 2kvmeta.put(kvindex + KEYSTART, keystart); 3kvmeta.put(kvindex + VALSTART, valstart); 4kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); 5// advance kvindex 改變每次index的值 每次4個位置! 6kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
環形緩沖區
3.6.5 Combiner 合并
Combiner 是 MR 程序中 Mapper 跟 Reducer 之外的組件。
Combiner 是在每一個MapTask 所在節點運行,Reducer 是接受全部 Mapper 輸出結果。
Combiner 屬于局部匯總的意思,來減少網絡傳輸。
Combiner 用的時候要注意不能影響最終業務邏輯!比如求平均值就不能用。求和就OK。
3.6.6 關于 MapReduce 排序
MapReduce框架最重要的操作就是排序,MapTask 跟 ReduceTask 都會根據key進行按照字典順序進行快排。
MapTask 將緩沖區數據快排后寫入到磁盤,然后磁盤文件會進行歸并排序。
ReduceTask統一對內存跟磁盤所有數據進行歸并排序。
3.6.7 ReduceJoin 跟 MapJoin
Reducejoin
思路:通過將關聯條件作為Map 輸出的 Key,將兩表滿足 Join條件的數據并攜帶數據源文件發送同一個ReduceTask,在Reduce端進行數據串聯信息合并。
缺點:合并操作在Reduce端完成,Reduce 端處理壓力太大,并且Reduce端易產生數據傾斜。
MapJoin
適用:適用于一張表十分小、一張表很大的場景。
思路:在 Map 端緩存多張表,提前處理業務邏輯,這樣增加 Map 端業務,減少 Reduce 端數據的壓力,盡可能的減少數據傾斜。
3.6.8 注意點
ReduceTask = 0 說明沒有Reduce節點,輸出文件個數和 Map 個數一樣。
ReduceTask 默認= 1,所以結果是一個文件。
ReduceTask 的個數不是任意設置的,需跟集群性能還有結果需求而定。
邏輯處理 Mapper 時候可根據業務需求實現其中三個方法,map、setup、cleanup。
3.7 壓縮
壓縮是提高Hadoop運行效率的一種優化策略,通過在Mapper、Reducer運行過程的數據進行壓縮來減少磁盤空間跟網絡傳輸,最終實現提高MR運行速度。但需注意壓縮也給CPU運算帶來了負擔。
壓縮的基本原則:
運算密集型任務 ,少壓縮。
IO密集型任務,多壓縮。
壓縮格式 | 自帶 | 算法 | 擴展名 | 可切分嗎 | 壓縮后,代碼修改 |
---|---|---|---|---|---|
DEFLATE | 是 | DEFLATE | .deflate | 否 | 不需要修改 |
Gzip | 是 | DEFLATE | .gz | 否 | 不需要修改 |
bzip2 | 是 | bzip2 | .bz2 | 是 | 不需要修改 |
Snappy | 否 | Snappy | .snappy | 否 | 不需要修改 |
LZO | 否 | LZO | .lzo | 是 | 需要建索引 還需要指定輸入格式 |
4 YARN
Yarn 是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統平臺,而 MapReduce 等運算程序則相當于運行于操作系統之上的應用程序。
4.1 基本組成
Yarn架構
YARN主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等組件構成。
ResourceManager
處理客戶端請求
監控NodeMananger
啟動或監控ApplicationMaster
計算資源的分配跟調度
NodeManager
管理單個節點上資源
處理來著ResourceManager的命令
處理來自ApplicationMaster的命令
ApplicationMaster
負責數據切分。
為應用程序申請資源并分配給內部任務。
任務的監控跟容錯。
Container
Container 是 YARN 中資源的抽象,封裝了某個節點上的多維度資源,比如內存、CPU、磁盤、網絡等。
YarnChild 其實它就是一個運行程序的進程。MrAppMaster 運行程序時向 Resouce Manager 請求的 Maptask / ReduceTask。
4.2 Yarn 調度 MapReduce 任務
Yarn調度流程
當 MR 程序提交到客戶端所在的節點時后 大致運行流程如下:
作業提交
Client 調用 job.waitForCompletion 方法 YarnRunner ,向整個集群提交MapReduce作業。Client 向 RM 申請一個作業id。
RM 給 Client 返回該 job 資源的提交路徑和作業 id。
Client 提交jar包、切片信息和配置文件到指定的資源提交路徑。
Client 提交完資源后,向 RM 申請運行 MrAppMaster。
作業初始化
當 RM 收到 Client 的請求后,將該 Task 添加到容量調度器中。
某一個空閑的 NodeManager 領取到該 Task 。
該 NodeManager 創建 Container,并產生 MRAppMaster。
下載 Client 提交的資源 到本地。
任務分配
MRAppMaster 向 RM 申請運行多個 MapTask 任務資源。
RM 將運行 MapTask 任務分配給倆 NodeManager。其中分配原則 是優先 jar 跟 數據在一臺機器上,其次就盡可能在一個機房。最后 隨便來個空閑機器。
任務運行
MR 向兩個接收到任務的 NodeManager 發送程序啟動腳本,這兩個 NodeManager 分別啟動MapTask,MapTask 對數據分區排序。
MrAppMaster 等待所有 MapTask 運行完畢后,向RM申請容器 運行ReduceTask。
ReduceTask 向 MapTask 獲取相應分區的數據。
程序運行完畢后,MR會向RM申請注銷自己。
進度和狀態更新
YARN 中的任務將其進度和狀態(包括counter)返回給應用管理器, 客戶端每秒向應用管理器請求進度更新來展示給用戶。
作業完成
除了向應用管理器請求作業進度外, 客戶端每5秒都會通過調用 waitForCompletion() 來檢查作業是否完成。作業完成之后, 應用管理器和Container會清理工作狀態。作業的信息會被作業歷史服務器存儲以備之后用戶核查。
4.3 資源調度器
目前,Hadoop作業調度器主要有三種:FIFO、Capacity Scheduler 和 Fair Scheduler。Hadoop2.7.2默認的資源調度器是Capacity Scheduler。
4.3.1 FIFO
FIFO調度
4.3.2 容量調度器 Capacity Scheduler
容量調度器
支持多個隊列,每個隊列配置一定資源,每個隊列采用FIFO策略。
為防止同一個童虎作業獨占隊列資源,會對同一用戶提交作業所占資源量限制。
計算每個隊列中在跑任務數與其應該分得的計算只有比值,選擇個比值最小的隊列(最閑的)。
按照作業優先級跟提交時間,同時還考慮用戶資源限制跟內存限制對隊列任務排序。
比如job1、job2、job3分配排在最前面也是并行運行。
4.3.3 公平調度器 Fair Scheduler
支持多隊列多用戶,每個隊列中資源可以配置,同一隊列中作業公平共享隊列中所有資源。
公平調度器
比如有queue1、queue2、queue3三個任務隊列,每個隊列中的job按照優先級分配資源,優先級高獲得資源多,但會確保每個任務被分配到資源。
每個任務理想所需資源跟實際獲得資源的差距叫缺額,同一個隊列中是按照缺額高低來先后執行的,缺額越大越優先獲得資源。
4.4 任務推測執行
作業完成時間取決于最慢的任務完成時間。系統中有99%的Map任務都完成了,只有少數幾個Map老是進度很慢,此時系統會發現拖后腿的任務,比如某個任務運行速度遠慢于任務平均速度。為拖后腿任務啟動一個備份任務,同時運行。誰先運行完,則采用誰的結果。
5 MapReduce 優化方法
MapReduce優化方法主要從六個方面考慮:數據輸入、Map階段、Reduce階段、IO傳輸、數據傾斜問題和常用的調優參數。
5.1 數據輸入
數據采集時,用 Hadoop Archive 將多個小文件打包成一個Har文件。
業務處理前,SequenceFile 由一系列KV組成,key=文件名,value=文件內容,將大批小文件合并成大文件。
在 MapReduce 處理時,采用CombineTextInputFormat來作為輸入,解決輸入端大量小文件場景。
對于大量小文件任務開啟JVM 重用可提速,JVM 重用可以使得 JVM 實例在同一個 job 中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中進行配置,通常在10-20之間。
5.2 Map 階段
減少溢寫 Spill 次數,調整循環緩存區大小,減少磁盤IO。
減少合并 Merge 次數,增大Merge文件大小減少次數。
在不影響業務的情況下在Map端進行Combine處理。
5.3 Reduce 階段
設置合理的Map跟REduce數,太少會導致Task等待。太多會導致競爭資源激烈。
設置Map跟Reduce階段共存,map運行一定程度后Reduce 也可以運行。
規避使用Reduce,Reduce 端的Buffer也要合理設置,盡量防止溢寫到磁盤。
5.4 IO 傳輸
采用數據壓縮方式來減少網絡IO時間。
使用SequenceFile二進制文件。
5.5 數據傾斜
通過對數據抽樣得到結果集來設置分區邊界值。
自定義分區。
使用Combine來減少數據傾斜。
采用MapJoin,盡量避免ReduceJoin。
“Hadoop的特點有哪些”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。