您好,登錄后才能下訂單哦!
@[TOC]
?????首先,接到hdf文件輸入,在mapreduce中的map task開始之前,將文件按照指定的大小切割成若干個部分,每一部分稱為一個split,默認是split的大小與block的大小相等,均為128MB。split大小由minSize、maxSize、blocksize決定,以wordcount代碼為例,以下是main()方法
進入waitForCompletion(true)
方法,進入submit()
方法
找到 return submitter .submitJobInternal(Job.this, cluster);
進入,找到 int maps = writeSplits(job, submitJobDir);
進入writeNewSplits()
方法
?????進入writeNewSplits()方法,可以看出該方法首先獲取splits數組信息后,排序,將會優先處理大文件。最終返回mapper數量。這其中又分為兩部分:確定切片數量 和 寫入切片信息。確定切片數量的任務交由FileInputFormat的getSplits(job)完成,寫入切片信息的任務交由JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法,該方法會將切片信息和SplitMetaInfo都寫入HDFS中,return array.length;
返回的是map任務數,默認map的數量是: default_num = total_size / block_size;
?????實際的mapper數量就是輸入切片的數量,而切片的數量又由使用的輸入格式決定,默認為TextInputFormat,該類為FileInputFormat的子類。確定切片數量的任務交由FileInputFormat的getSplits(job)完成。FileInputFormat繼承自抽象類InputFormat,該類定義了MapReduce作業的輸入規范,其中的抽象方法List<InputSplit> getSplits(JobContext context)定義了如何將輸入分割為InputSplit,不同的輸入有不同的分隔邏輯,而分隔得到的每個InputSplit交由不同的mapper處理,因此該方法的返回值確定了mapper的數量。
?????每個map task都有一個內存緩沖區, map的輸出結果先寫到內存中的環形緩沖區,緩沖區為100M,不斷的向緩沖區力寫數據,當達到80M時,需要將緩沖區中的數據以一個臨時文件的方式存到磁盤,當整個map task結束后再對磁盤中這個map task所產生的所有臨時文件做合并,生成最終的輸出文件。最后,等待reduce task來拉取數據。當然,如果map task的結果不大,能夠完全存儲到內存緩沖區,且未達到內存緩沖區的閥值,那么就不會有寫臨時文件到磁盤的操作,也不會有后面的合并。在寫入的過程中會進行分區、排序、combine操作。
?????環形緩沖區:是使用指針機制把內存中的地址首尾相接形成一個存儲中間數據的緩存區域,默認100MB;80M閾值,20M緩沖區,是為了解決寫入環形緩沖區數據的速度大于寫出到spill文件的速度是數據的不丟失;Spill文件:spill文件是環形緩沖區到達閾值后寫入到磁盤的單個文件.這些文件在map階段計算結束時,會合成分好區的一個merge文件供給給reduce任務抓取;spill文件過小的時候,就不會浪費io資源合并merge;默認情況下3個以下spill文件不合并;對于在環形緩沖區中的數據,最終達不到80m但是數據已經計算完畢的情況,map任務將會調用flush將緩沖區中的數據強行寫出spill文件。
?????經過map類處理后,輸出到內存緩沖區(默認大小100M),超過一定大小后,文件溢寫到磁盤上,按照key分類
按照key合并成大文件,減少網絡開銷
看一下MapReduce自帶的分區器HashPartitioner
假設有聽個reduce任務,則分區的計算如下:
在對map結果進行分區之后,對于落在相同的分區中的鍵值對,要進行排序。
?????Shuffle過程是MapReduce的核心,描述著數據從map task輸出到reduce task輸入的這段過程。reducetask根據自己的分區號,去各個maptask分區機器上取相應的結果分區數據,reducetask會將這些文件再進行合并(歸并排序)。
?????所有相同key的數據匯集到一個partition
?????將相同的key value匯聚到一起, 但不計算
reduce階段分三個步驟:
抓取,合并,排序
?????1 reduce 任務會創建并行的抓取線程(fetcher)負責從完成的map任務中獲取結果文件,是否完成是通過rpc心跳監聽,通過http協議抓取;默認是5個抓取線程,可調,為了是整體并行,在map任務量大,分區多的時候,抓取線程調大;
?????2 抓取過來的數據會先保存在內存中,如果內存過大也溢出,不可見,不可調,但是單位是每個merge文件,不會切分數據;每個merge文件都會被封裝成一個segment的對象,這個對象控制著這個merge文件的讀取記錄操作,有兩種情況出現:在內存中有merge數據 ?在溢寫之后存到磁盤上的數據 ?通過構造函數的區分,來分別創建對應的segment對象
?????3 這種segment對象會放到一個內存隊列中MergerQueue,對內存和磁盤上的數據分別進行合并,內存中的merge對應的segment直接合并,磁盤中的合并與一個叫做合并因子的factor有關(默認是10)
?????4 排序問題,MergerQueue繼承輪換排序的接口,每一個segment 是排好序的,而且按照key的值大小邏輯(和真的大小沒關系);每一個segment的第一個key都是邏輯最小,而所有的segment的排序是按照第一個key大小排序的,最小的在前面,這種邏輯總能保證第一個segment的第一個key值是所有key的邏輯最小文件合并之后,最終交給reduce函數計算的,是MergeQueue隊列,每次計算的提取數據邏輯都是提取第一個segment的第一個key和value數據,一旦segment被調用了提取key的方法,MergeQueue隊列將會整體重新按照最小key對segment排序,最終形成整體有序的計算結果;
partition 、Reduce、輸出文件數量相等
Reduce任務數量
在大數據量的情況下,如果只設置1個Reduce任務,其他節點將被閑置,效率底下 所以將Reduce設置成一個較大的值(max:72).調節Reduce任務數量的方法 一個節點的Reduce任務數并不像Map任務數那樣受多個因素制約
通過參數調節mapred.reduce.tasks(在配置文件中)
在代碼中調用job.setNumReduceTasks(int n)方法(在code中)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。