您好,登錄后才能下訂單哦!
這篇文章主要講解了“創建RDD的方式有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“創建RDD的方式有哪些”吧!
1.從集合中創建RDD
val conf =
new SparkConf().setAppName("Test").setMaster("local") |
2.從外部存儲創建RDD
//從外部存儲創建RDD |
RDD支持兩種操作:轉化操作和行動操作。RDD 的轉化操作是返回一個新的 RDD的操作,比如 map()和 filter(),而行動操作則是向驅動器程序返回結果或把結果寫入外部系統的操作。比如 count() 和 first()。
Spark采用惰性計算模式,RDD只有第一次在一個行動操作中用到時,才會真正計算。Spark可以優化整個計算過程。默認情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。
Transformation算子
RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
轉換 | 含義 |
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 |
filter(func) | 返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 |
flatMap(func) | 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求并集后返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集后返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重后返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 相同的Key值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值zeroValue:中立值,定義返回value的類型,并參與運算seqOp:用來在同一個partition中合并值combOp:用來在不同partiton中合并值 |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | 將一些shell命令用于Spark中生成新的RDD |
coalesce(numPartitions) | 重新分區 |
repartition(numPartitions) | 重新分區 |
repartitionAndSortWithinPartitions(partitioner) | 重新分區和排序 |
Action算子
在RDD上運行計算,并返回結果給Driver或寫入文件系統
動作 | 含義 |
reduce(func) | 通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似于take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | takeOrdered和top類似,只不過以和top相反的順序返回元素 |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) | |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在數據集的每一個元素上,運行函數func進行更新。 |
RDD支持兩種操作:轉化操作和行動操作。RDD 的轉化操作是返回一個新的 RDD的操作,比如 map()和 filter(),而行動操作則是向驅動器程序返回結果或把結果寫入外部系統的操作。比如 count() 和 first()。
Spark采用惰性計算模式,RDD只有第一次在一個行動操作中用到時,才會真正計算。Spark可以優化整個計算過程。默認情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。
Transformation算子****
RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 |
filter(func) | 返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 |
flatMap(func) | 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求并集后返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集后返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重后返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 相同的Key值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值zeroValue:中立值,定義返回value的類型,并參與運算seqOp:用來在同一個partition中合并值combOp:用來在不同partiton中合并值 |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | 將一些shell命令用于Spark中生成新的RDD |
coalesce(numPartitions) | 重新分區 |
repartition(numPartitions) | 重新分區 |
repartitionAndSortWithinPartitions(partitioner) | 重新分區和排序 |
** Action算子**
在RDD上運行計算,并返回結果給Driver或寫入文件系統
動作 | 含義 |
---|---|
reduce(func) | 通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似于take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | takeOrdered和top類似,只不過以和top相反的順序返回元素 |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) | |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在數據集的每一個元素上,運行函數func進行更新。 |
感謝各位的閱讀,以上就是“創建RDD的方式有哪些”的內容了,經過本文的學習后,相信大家對創建RDD的方式有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。