您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關spark RDD有什么特點的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
概念
RDD具有以下一些特點:
創建:只能通過轉換( transformation,如map/filter/groupBy/join等,區別于動作action)從兩種數據源中創建RDD:1)穩定存儲中的數據;2)其他RDD。
只讀:狀態不可變,不能修改
分區:支持使RDD中的元素根據那個key來分區( partitioning),保存到多個結點上。還原時只會重新計算丟失分區的數據,而不會影響整個系統。
路徑:在RDD中叫世族或血統( lineage),即RDD有充足的信息關于它是如何從其他RDD產生而來的。
持久化:支持將會·被重用的RDD緩存(如in-memory或溢出到磁盤)
延遲計算:像DryadLINQ一樣,Spark也會延遲計算RDD,使其能夠將轉換管道化(pipeline transformation)
操作:豐富的動作( action),count/reduce/collect/save等。
關于轉換(transformation)與動作(action)的區別,前者會生成新的RDD,而后者只是將RDD上某項操作的結果返回給程序,而不會生成新的RDD
RDD底層實現原理
RDD是一個分布式數據集,顧名思義,其數據應該分部存儲于多臺機器上。事實上,每個RDD的數據都以Block的形式存儲于多臺機器上,下圖是Spark的RDD存儲架構圖,其中每個Executor會啟動一個BlockManagerSlave,并管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。
RDD cache的原理
RDD的轉換過程中,并不是每個RDD都會存儲,如果某個RDD會被重復使用,或者計算其代價很高,那么可以通過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?
RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,通過CacheManager把RDD計算出來,并存儲到BlockManager中,下次獲取該RDD的數據時便可直接通過CacheManager從BlockManager讀出。
RDD的容錯機制實現分布式數據集容錯方法
數據檢查點和記錄更新RDD采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持粗顆粒變換,即只記錄單個塊上執行的單個操作,然后創建某個RDD的變換序列(血統)存儲下來;變換序列指,每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴:窄依賴:子RDD中的每個數據塊只依賴于父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴于父RDD中的所有數據塊。例如:map變換,子RDD中的數據塊只依賴于父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴于多有父RDD中的數據塊,因為一個key可能錯在于父RDD的任何一個數據塊中 將依賴關系分類的兩個特性:第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,并且父RDD的計算結果進行hash并傳到對應節點上之后才能計算子RDD。第二,數據丟失時,對于窄依賴只需要重新計算丟失的那一塊數據來恢復;對于寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在長“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。也是這兩個特性要求對于不同依賴關系要采取不同的任務調度機制和容錯恢復機制。
RDD內部的設計
每個RDD有5個主要的屬性:
1)一組分片(Partition),即數據集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。圖3-1描述了分區存儲的計算模型,每個分配的存儲是由BlockManager實現的。每個分區都會被邏輯映射成BlockManager的一個Block,而這個Block會被一個Task負責計算。
2)一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
3)RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。
Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
5)一個列表,存儲存取每個Partition的優先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
以Spark中內建的幾個RDD舉例來說:
信息/RDD | HadoopRDD | FilteredRDD | JoinedRDD |
Partitions | 每個HDFS塊一個分區,組成集合 | 與父RDD相同 | 每個Reduce任務一個分區 |
PreferredLoc | HDFS塊位置 | 無(或詢問父RDD) | 無 |
Dependencies | 無(父RDD) | 與父RDD一對一 | 對每個RDD進行混排 |
Iterator | 讀取對應的塊數據 | 過濾 | 聯接混排的數據 |
Partitioner | 無 | 無 | HashPartitioner |
工作原理
主要分為三步:創建RDD對象,DAG調度器創建執行計劃,Task調度器分配任務并調度Worker開始運行。
以下面一個按A-Z首字母分類,查找相同首字母下不同姓名總個數的例子來看一下RDD是如何運行起來的。
步驟1:創建RDD。上面的例子除去最后一個collect是個動作,不會創建RDD之外,前面四個轉換都會創建出新的RDD。因此第一步就是創建好所有RDD(內部的五項信息)。
步驟2:創建執行計劃。Spark會盡可能地管道化,并基于是否要重新組織數據來劃分 階段(stage),例如本例中的groupBy()轉換就會將整個執行計劃劃分成兩階段執行。最終會產生一個 DAG(directed acyclic graph,有向無環圖)作為邏輯執行計劃。
步驟3:調度任務。將各階段劃分成不同的 任務(task),每個任務都是數據和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織數據的,所以必須等當前階段所有結果數據都計算出來了才能繼續。
感謝各位的閱讀!關于“spark RDD有什么特點”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。