您好,登錄后才能下訂單哦!
spark的RDD以及代碼實操是怎樣進行的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
??在開始學習Spark工作原理之前, 先來介紹一下Spark中兩個最為重要的概念-- 彈性分布式數據集(Resilient Distributed Datasets, RDD) 和算子(Operation).
RDD背景
??Spark的核心是建立在RDD之上, 使Spark中的各個組件可以無縫進行集成, 從而在一個應用程序中完成大數據計算. 這也是為什么說在SparkCore中一切得計算都是基于RDD來完成的. RDD的設計理念源自AMP實驗室發表的論文–Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.
??MapReduce計算框架在實際應用中, 許多迭代式算法和交互式數據挖掘過程中的計算結果會寫到磁盤, 然后再重復使用, 這就帶來了大量的磁盤IO和序列化開銷. 為解決中間過程數據落地花費大量時間的需求, 出現了一種抽象的數據結構, 讓我們不必再考慮數據的分布式特性, 只需保存具體的邏輯轉換表達式即可, 這種數據結構就是RDD.
??RDD之間的轉換操作使父子RDD之間具有依賴關系, 滿足條件的RDD之間形成管道(Pipeline), 從而避免中間結果落地, 極大的降低了磁盤IO和序列化消耗的時間.
RDD介紹
??RDD(彈性分布式數據集), 雖然叫做數據集, 但RDD并不像集合一樣存儲真實的數據, 而是存儲這些數據轉換的邏輯, 可以將RDD理解為一個大的數據集合以分布式的形式保存在集群服務器的內存中. 每個RDD可以分成多個分區, 每個分區就是一個數據集片段, 并且一個RDD的不同分區可以被保存到集群中不同的節點上(但是同一個分區不能被拆分保存), 從而可以在集群中的不同節點上進行并行計算.
??RDD提供了一種高度受限的共享內存模型, 即RDD是只讀的記錄分區的集合, 不能直接修改, 只能基于穩定的物理存儲中的數據集來創建RDD, 或者通過在其他RDD上執行轉換操作(如map、join和groupBy) 創建得到新的RDD.
Operation介紹
??算子(Operation)是Spark中定義的函數, 用于對RDD中的數據結構進行操作和轉換等. Spark中的算子可以分為4類:
創建類(creation)算子, 用于將內存中的集合或外部文件創建為RDD.
轉換(transformation)算子, 用于將一種格式的RDD轉換為其他自定義格式.
緩存(cache)算子, 用于將RDD緩存在內存(memory)或磁盤(disk)中, 一般后續計算會用到重復數據時才會使用.
行動(action)算子, 用于觸發執行Spark作業, 并將計算結果保存為集合, 標量或保存到外部文件, 數據庫中.
??典型的RDD執行過程如下:
讀入外部數據源(或者內存中的集合) ,然后Create RDD;
RDD經過一系列Transformation, 每一次都會產生不同的RDD, 供給下一個Transformation 使用;
最后一個RDD經Action進行處理, 得到最后想要的值, 并進行后續輸出操作.
??需注意: RDD采用惰性調用, 即在RDD的執行過程中, 如圖所示, 真正的計算發生在RDD的Action操作, 對于Action之前的所有Transformation操作, Spark只是記錄下Transformation操作應用的一些基礎數據集以及RDD生成的軌跡, 即相互之間的依賴關系, 而不會觸發真正的計算.
??RDD提供的轉換接口都非常簡單, 都是類似map, filter, groupBy, join等粗粒度的數據轉換操作, 而不是針對某個數據項的細粒度修改. 因此, RDD比較適合對于數據集中元素執行相同操作的批處理式應用, 而不適合用于需要異步/細粒度狀態的應用, 比如Web應用系統, 增量式的網頁爬蟲等.
??轉換和行動兩種類型的算子, 前者指定RDD之間的相互依賴關系, 后者用于執行計算并指定輸出的形式. 兩類操作的主要區別是, 轉換操作接受RDD并返回RDD, 而行動操作(如count、collect等) 接受RDD但是返回非RDD(即輸出一個值或結果).
RDD五大特性
RDD是由一系列的Partition(分區)組成;
每一個函數作用在每一個分區上;
RDD之間存在依賴關系;
[可選項]分區器作用在KV格式的RDD上;
[可選項]RDD會提供最佳計算位置.
??接下來, 結合Spark實現的WC案例, 來理解這五個特性以及其他注意點(圖中綠色為block塊, 藍色為Partition分區):
HDFS存儲文件是以block塊的形式, Spark應用在讀取HDFS上的數據后, 會將同一個block塊中的數據轉換邏輯保存在同一個Partition中, 一個文件對應的所有Partition構成一個RDD. 即一個RDD中的Partition個數等于這個文件存儲在HDFS中的block個數. 但有一個例外, 如果一個block塊的最后存儲了某個數據的大部分字節后達到block規定的大小, 僅有少量字節存儲在另外一個block塊中, 這時這多余的小部分數據會放在與大部分數據相同的Partition中, 即Partition數小于block塊數.
Spark中沒有讀文件的方法, 但Spark依然能夠讀取文件內容依賴的是MapReduce中讀文件的方法. MR讀文件前, 會先將文件劃分為一個個的split(切片), 一個split的大小 = 一個block的大小; 但這個文件的split個數 ≈ 存儲這個文件的block個數(同上一個例外情況); 一個RDD中Partition的個數 = 這個文件切分的split個數.
每一個函數作用在每一個分區上, 即每個函數會在每一個Partition中各執行一次.
RDD之間存在依賴關系, 通過一個算子關聯的兩個RDD稱為父子RDD, 父子RDD之間存在寬窄依賴(后續講解), 子RDD知道它的父RDD是誰, 但父RDD不知道它的子RDD有誰. 這種依賴關系的優勢在于當數據因某種情形丟失時, 可以通過算子和父RDD重寫計算出子RDD, 從而提高了計算的容錯性. (RDD的依賴關系也被稱為RDD的血統–Lineage)
KV格式的RDD指RDD中的數據是二元組類型, 對于這類RDD可以使用分區器按照Key或者Value進行分組, 進而完成聚合計算. 在WC中, pairRDD和restRDD均為KV格式的RDD. 分區器用于決定數據被放到哪一個reduce task中處理.
每一個算子作用在每一個Partition上, Partition會分布式的存儲在集群各個節點的內存中, 對一個Partition的連續處理可以看作是一個task任務, 每一個task計算任務都在數據所在節點上執行, 從而實現數據本地化, 減少網絡IO. 簡單來說, RDD會提供一個方法接口, 調用這個接口就能直接拿到這個RDD所有Partition的位置, 拿到位置之后就可以分發task了. 至于這個接口是什么不需要我們關心, Spark應用在執行時會自動尋找.
實際操作:
案例說明
??大數據分析處理萬變不離其宗, 核心思想就是一個WorldCount–單詞統計. 單詞統計, 顧名思義就是將一個文件中出現的所有單詞讀一遍, 并對相同單詞的個數進行統計. 如何處理這個文件? 如何得到每一個單詞? 如何對相同的單詞進行統計? 這三個問題是需要解決的核心問題, 接下來就一起來看看是如何對一個文件進行WordCount的.
??首先, 來看一下我們測試的數據, 在這匹數據中, 同一行中每個單詞之間使用制表符’\t’ 來分隔, 接下來我們先對這批數據的計算思想進行解析, 然后再分別使用MapReduce和Spark技術的API編碼實現.
??通過對這兩種技術編碼的比較, 可以幫助大家更好的理解之前所說的Spark在表達能力上相較于Hadoop(MR)的優勢 Spark優勢鏈接. 除此之外, 更重要的一點是引入SparkCore中彈性分布式數據集(RDD) 的概念, 對RDD有一定認識之后, 將有利于學習RDD的具體原理以及如何使用等知識.
??在Spark中, 一切計算都是基于RDD實現的, RDD可以看作是一個集合, 類似于Scala中的List, Map, 它有著與這些普通集合相同的方法(map, flatmap, foreach…), 但是RDD是重新寫的這些方法, 初次之外還有許多其他的方法, 這些方法在Spark中稱為算子, 之后的博客中會對它們進行詳細介紹.
計算分析
無論是MapReduce還是Spark, 在讀取數據時都是一行一行讀取的而且讀取的數據都是字符串類型, 因此在處理時要把一行數據看成一條記錄;
既然一行是一條記錄, 那么我們在處理時只需要關注這一條記錄即可, 其余記錄格式與之相同, 不相同格式的數據一般為臟數據, 需要過濾掉. 相同格式的按照規律進行切分(split).
數據切分完成后, 就可以得到每一個單詞, 然后將每一個單詞當作key, 把它的value置為1, 得到一些列KV格式的數據, 這些數據中有的key相同, 有的key不同, 但value都是1.
對這一系列KV格式的數據進行統計, 先按照Key進行分組, 相同Key, 即同一個單詞為一組, 這個Key對應多個Value, 構成一個有一個或多個元素1組成的集合. 然后再將同一個Key中所有的Value進行累加, 累加完成之后將累加值最為新的Value, Key還是原來的Key.
最新的KV格式的數據中, Key代表的是出現的每一個單詞, Value則對應該單詞出現的次數.
??圖解:
————————————————
代碼實現
在SparkCore中一切得計算都是基于RDD(彈性分布式數據集), R(Resilient) D(Distributed ) D(Dataset). RDD 調用的方法稱為算子,一般情況下RDD的算子返回的還是RDD. 先對RDD有個大概的了解, 之后再對其進行詳細地介紹.
??準備環境:
Scala運行環境
導入jar包, 開發Spark應用程序時, 只需要導入一個整合包即可.
??用Spark寫WC:
package com.hpe.spark.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WCSpark {
def main(args: Array[String]): Unit = {
//創建配置對象
val conf = new SparkConf()
//設置App的名稱-->方便在監控頁面找到
conf.setAppName("WCSpark")
//設置Spark的運行模式-->local本地運行-->用于測試環境
conf.setMaster("local")
//創建Spark上下文 他是通往集群的唯一通道
val sc = new SparkContext(conf)
// textFile()讀取上述數據,讀取時是一行行讀取,可以是本地也可是HDFS的數據,返回RDD類型的數據
val lineRDD = sc.textFile("d:/wc.txt")
// 基于lineRDD中的數據按照\t進行分詞
val wordRDD = lineRDD.flatMap { _.split("\t") }
// 將wordRDD中的每一條數據封裝成一個二元組,每一個單詞計數為1 pairRDD[(K:word V:1)]
val pairRDD = wordRDD.map { (_,1) }
// 將pairRDD中相同的單詞分為一組,對組內的數據進行累加
val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2)
//可簡寫為:val restRDD = pairRDD.reduceByKey(_+_)
// 根據單詞出現的次數來排序,sortBy():根據指定字段來排序,false:指定為降序;
// foreach對RDD中排好序的數據進行遍歷
restRDD
.sortBy(x=>x._2, false)
.foreach(println)
//一直啟動,為查看而寫
while(true){}
//釋放資源
sc.stop()
}
}
??但從代碼的編寫上來看, 不難發現, Spark的表達能力著實比MR強, 上述代碼中間處理部分其實還可以更加簡潔:
val lineRDD = sc.textFile("d:/wc.txt") .flatMap { _.split("\t") } .map { (_,1) } .reduceByKey(_+_) .sortBy(_._2, false) .foreach(println)
??MR中復雜的程序, 在Spark中了了幾行就可以輕松解決, 既可以看出Scala語言的靈活性, 又表現了Spark超強的表達能力, 因此Spark在計算上逐漸取代MR.
??這里最后一句while(true){} ,讓程序一直執行, 可以在WebUI的監控頁面http://localhost:4040進行查看。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。