您好,登錄后才能下訂單哦!
這篇文章主要講解了“Hadoop生態之分析MapReduce及Hive”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop生態之分析MapReduce及Hive”吧!
1.計算框架
Hadoop 是一個計算框架,目前大型數據計算框架常用的大致有五種:
僅批處理框架:Apache hadoop.
僅流處理框架:Apache Storm、Apache Samza.
混合框架:Apache Spark、Apache Flink.
這其中名氣最大、使用最廣的當屬 Hadoop 和 Spark。
雖然兩者都被稱為大數據框架,但實際層級不同。Hadoop 是一個分布式數據基礎設施,包括計算框架 MapReduce、分布式文件系統 HDFS、YARN 等。而Spark 是專門用來對分布式存儲的大數據的處理工具,并不會進行數據存儲,更像是 MapReduce 的替代。
在使用場景上,Hadoop 主要用于離線數據計算,Spark更適用于需要精準實時的場景。
2. MapReduce
2.1 MapReduce 是什么
一個基于 Java 的并行分布式計算框架。
前文有提到 HDFS 提供了基于主從結構的分布式文件系統,基于此存儲服務支持,MapReduce 可以實現任務的分發、跟蹤、執行等工作,并收集結果。
2.2 MapReduce 組成
MapReduce 主要思想講的通俗一點就是將一個大的計算拆分成 Map(映射)和 Reduce(化簡)。說到這里,其實 JAVA8 在引入 Lambda 后,也有 map 和 reduce 方法。下面是一段 Java 中的用法:
List<Integer> nums = Arrays.asList(1, 2, 3); List<Integer> doubleNums = nums.stream().map(number -> number * 2).collect(Collectors.toList()); 結果:[2,4,6] Optional<Integer> sum = nums.stream().reduce(Integer::sum); 結果:[6]
代碼很簡單,map 負責歸類,reduce 負責計算。而 Hadoop 中的 MapReduce 也有異曲同工之處。
下面結合官方案例 WordCount 進行分析:
public class WordCount { // Mapper泛型類,4個參數分別代表輸入鍵、值,輸出鍵、值類型 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 字符解析 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { // nextToken():返回從當前位置到下一個分隔符的字符串 word.set(itr.nextToken()); context.write(word, one); } } } // Reducer同樣也是四個參數 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { int sum = 0; // 循環values,并記錄“單詞”個數 for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
在這段代碼中,不難看出程序核心是 map 函數和 reduce 函數。是否 MapReduce 就是由這兩者組成的?接著往下看。
2.3 Map 和 Reduce
2.3.1 Map
在 WordCount 案例中,明顯看到 map 函數的輸入主要是一個
Context 在這里暫時性忽略,其是 Mapper 類的內部抽象類,一般計算中不會用到,可以先當做“上下文”理解。
map 函數計算過程是: 將這行文本中的單詞提取出來,針對每個單詞輸出一個
2.3.2 Reduce
接著就來看看 reduce ,這里輸入參數 Values 就是上面提到的由很多個 1 組成的集合,而 Key 就是具體“單詞” word。
它的計算過程是: 將集合里的1求和,再將單詞(word)與這個和(sum)組成一個
假設有兩個數據塊的文本數據需要進行詞頻統計,MapReduce 計算過程如下圖所示:
到這都很容易理解,畢竟只是個 HelloWorld 的例子~,但整個MapReduce過程中最關鍵的部分其實是在 map 到 reduce 之間。
還拿上面例子來說:統計相同單詞在所有輸入數據中出現的次數,一個 Map 只能處理一部分數據,而熱點單詞就很可能會出現在所有 Map 中了,意味著同一單詞必須要合并到一起統計才能得到正確結果。這種數據關聯幾乎在所有的大數據計算場景都需要處理,如果是例子這種的當然只對 Key 合并就OK了,但類似數據庫 join 操作這種較復雜的,就需對兩種類型(或更多)的數據依據 Key 關聯。
這個數據關聯操作在 MapReduce中的叫做:shuffle。
2.4 shuffle
shuffle 從字面意思來看,洗牌。下面是一個完整的MR過程,看一看如何洗牌。
先看左半邊
1. 從 HDFS 中讀取數據,輸入數據塊到一個個的 map,其中 map 完成計算時,計算結果會存儲到本地文件系統。而當 map 快要進行完時,就會啟動 shuffle 過程。
2. 如圖,shuffle 也可分為兩種,在Map端的是 Map shuffle。大致過程為:Map 任務進程會調用一個 Partitioner 接口,對 Map 產生的每個
這里就實現了對 Map 結果的分區、排序、分割,以及將同一分區的輸出合并寫入磁盤,得到一個分區有序的文件。這樣不管 Map 在哪個服務器節點,相同的 Key 一定會被發送給相同 Reduce 進程。Reduce 進程對收到的
再看右半邊
1. Reduce shuffle,又可分為復制 Map 輸出、排序合并兩階段。
Copy:Reduce 任務從各個 Map 任務拖取數據后,通知父 TaskTracker 狀態已更新,TaskTracker 通知 JobTracker。Reduce 會定期向JobTracker 獲取 Map 的輸出位置,一旦拿到位置,Reduce 任務會從此輸出對應的 TaskTracker 上復制輸出到本地,不會等到所有的Map任務結束。
Merge sort:
Copy 的數據先放入內存緩沖區,若緩沖區放得下就把數據寫入內存,即內存到內存 merge。
Reduce 向每個 Map 去拖取數據,內存中每個 Map 對應一塊數據,當內存緩存區中存儲的數據達到一定程度,開啟內存中 merge,把內存中數據merge 輸出到磁盤文件中,即內存到磁盤 merge。
當屬于該 reduce 的 map 輸出全部拷貝完成,會在 reduce 上生成多個文件,執行合并操作,即磁盤到磁盤 merge。此刻 Map 的輸出數據已經是有序的,Merge 進行一次合并排序,所謂 Reduce 端的 sort 過程就是這個合并的過程。
2. 經過上一步Reduce shuffle后,reduce進行最后的計算,將輸出寫入HDFS中。
以上便是 shuffle 大致四個步驟,關鍵是 map 輸出的 shuffle 到哪個 Reduce 進程,它由 Partitioner 來實現,MapReduce 框架默認的 Partitioner 用 Key 哈希值對 Reduce 任務數量取模,相同 Key 會落在相同的 Reduce 任務 ID 上。
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
如果對 Shuffle 總結一句話: 分布式計算將不同服務器中的數據合并到一起進行后續計算的過程。
shuffle 是大數據計算過程中神奇的地方,不管是 MapReduce 還是 Spark,只要是大數據批處理計算,一定會有 shuffle 過程,只有讓數據關聯起來,它的內在關系和價值才會呈現。
3. Hive
上一部分介紹了 MapReduce,接下來簡單談談 Hive .
我覺得任何一項技術的出現都是為了解決某類問題, MapReduce 毫無疑問簡化了大數據開發的編程難度。但實際上進行數據計算更常用的手段可能是 SQL,那么有沒有辦法直接運行 SQL ?
3.1 Hive是什么
基于Hadoop的一個數據倉庫系統,定義了一種類SQL查詢語言:Hive SQL。
這里有一個名詞 數據倉庫,數據倉庫是指:面向主題(Subject Oriented)、集成(Integrated)、相對穩定(Non-Volatile)、反應歷史變化(Time Variant)的數據集合,用于支持管理決策。
這么說可能有點抽象,分解一下:
主題:數據倉庫針對某個主題來進行組織,指使用數據倉庫決策時所關心的重點方面。比如訂閱分析就可以當做一個主題。
集成:數據倉庫要將多個數據源數據存到一起,但數據以前的存儲方式不同,要經過抽取、清洗、轉換。(也就是 ETL)
穩定:保存的數據是一系列歷史快照,不允許修改,只能分析。
時變:會定期接收到新的數據,反應出新的數據變化。
現在再看下定義:數據倉庫是將多個數據源的數據按照一定的主題集成,進行抽取、清洗、轉換。且處理整合后的數據不允許隨意修改,只能分析,還需定期更新。
3.2 為什么是 Hive
了解了 Hive 的基礎定義,想一下:一個依賴于 HDFS 的數據倉庫在 Hadoop 環境中可以扮演什么角色?
前面說到,可不可以讓 SQL 直接運行在 Hadoop 平臺,這里的答案便是 Hive。它可以將 Hive SQL 轉換為 MapReduce 程序運行。
Hive 初期版本默認 Hive on Mapreduce
啟動 hive 前通常要先啟動 hdfs 和 yarn, 同時一般需要配置 MySQL,Hive 依賴于 HDFS 的數據存儲,但為了能操作 HDFS 上的數據集,要知道數據切分格式、存儲類型、地址等。這些信息通過一張表存儲,稱為元數據,可以存儲到 MySQL 中。
現在來看下 Hive 的部分命令
新建數據庫:create database xxx;
刪除數據庫:drop database xxx;
建表:
create table table_name(col_name data_type);
Hive 的表有兩個概念:**內部表和外部表**。默認內部表,簡單來說,內部表數據存儲在每個表相應的HDFS目錄下。外部表的數據存在別處,要刪除這個外部表,該外部表所指向的數據是不會被刪除的,只會刪除外部表對應的元數據。
查詢:
select * from t_table **where** a<100 **and** b>1000;
連接查詢:
select a.*,b.* from t_a a join t_b b on a.name=b.name;
看到這里,可能會覺得我在寫 SQL, 沒錯,對于熟悉 SQL 的人來說,Hive 是非常易于上手的。
3.3 HIVE SQL To MapReduce
前面說到 HQL 可以‘轉換’為 MapReduce, 下面就來看看:一個 HQL 是如何轉化為 MapReduce 的Hive的基礎架構:
通過 Client 向 Hive 提交 SQL 命令。如果是 DDL,Hive 就會通過執行引擎 Driver 將數據表的信息記錄在 Metastore 元數據組件中,這個組件通常用一個關系數據庫實現,記錄表名、字段名、字段類型、關聯 HDFS 文件路徑等 Meta 信息(元信息)。
如果是DQL,Driver 就會將該語句提交給自己的編譯器 進行語法分析、解析、優化等一系列操作,最后生成一個 MapReduce 執行計劃。再根據執行計劃生成一個 MapReduce 的作業,提交給 Hadoop 的 MapReduce 計算框架處理。
比如輸入一條 select xxx from a ; 其執行順序為:首先在 metastore 查詢--> sql 解析--> 查詢優化---> 物理計劃--> 執行 MapReduce。
感謝各位的閱讀,以上就是“Hadoop生態之分析MapReduce及Hive”的內容了,經過本文的學習后,相信大家對Hadoop生態之分析MapReduce及Hive這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。