您好,登錄后才能下訂單哦!
保存Key/Value對的RDD叫做Pair RDD。
1.創建Pair RDD:
1.1 創建Pair RDD的方式:
很多數據格式在導入RDD時,會直接生成Pair RDD。我們也可以使用map()來將之前講到的普通RDD轉化為Pair RDD。
1.2 Pair RDD轉化實例:
下面例子中,把原始RDD,修改成首單詞做Key,整行做Value的Pair RDD。
Java中沒有tuple類型,所以使用scala的scala.Tuple2類來創建tuple。創建tuple: new Tuple2(elem1,elem2) ; 訪問tuple的元素: 使用._1()和._2()方法來訪問。
而且,在Python和Scala實現中使用基本的map()函數即可,java需要使用函數mapToPair():
/** * 將普通的基本RDD轉化成一個Pair RDD,業務邏輯: 將每一行的首單詞作為Key,整個句子作為Value 返回Key/Value PairRDD。 * @param JavaRDD<String> * @return JavaPairRDD<String,String> */ public JavaPairRDD<String,String> firstWordKeyRdd(JavaRDD<String> input){ JavaPairRDD<String,String> pair_rdd = input.mapToPair( new PairFunction<String,String,String>(){ @Override public Tuple2<String, String> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String,String>(arg0.split(" ")[0],arg0); } } ); return pair_rdd; }
當從內存中的集合創建PairRDD時,Python和Scala需要使用函數SparkContext.parallelize();而Java使用函數SparkContext.parallelizePairs()。
2.Pair RDD的轉化操作:
2.1 Pair RDD常見的轉化操作列表:
基礎RDD使用的轉化操作也可以在Pair RDD中使用。因為Pair RDD中使用tuple,所以需要傳遞操作tuple的函數給Pair RDD.
下表列出Pair RDD常用的轉化操作(事例RDD內容:{(1, 2), (3, 4), (3, 6)})
函數名 | 作用 | 調用例子 | 返回結果 |
reduceByKey(func) | Combine values with the same key. | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | Group values with the same key. | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner,mergeValue, mergeCombiners,partitioner) | Combine values with the same key using a different result type. | ||
mapValues(func) | Apply a function to each value of a pair RDD without changing the key. | rdd.mapValues(x =>x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) | Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization. | rdd.flatMapValues(x=> (x to 5) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | Return an RDD of just the keys. | rdd.keys() | {1, 3, 3} |
values() | Return an RDD of just the values. | rdd.values() | {2, 4, 6} |
sortByKey() | Return an RDD sorted by the key. | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
下表列舉2個RDD之間的轉化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3,9)}):
函數名 | 作用 | 調用例子 | 返回結果 |
subtractByKey | Remove elements with a key present in the other RDD. | rdd.subtractByKey(other) | {(1, 2)} |
join | Perform an inner join between two RDDs. | rdd.join(other) | {(3, (4, 9)),(3, (6, 9))} |
rightOuterJoin | Perform a join between two RDDs where the key must be present in the first RDD. | rdd.rightOuterJoin(other) | {(3,(Some(4),9)), (3,(Some(6),9))} |
leftOuterJoin | Perform a join between two RDDs where the key must be present in the other RDD. | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))} |
cogroup | Group data from both RDDs sharing the same key. | rdd.cogroup(other) | {(1,([2],[])),(3,([4, 6],[9]))} |
2.2 Pair RDD篩選操作:
Pair RDD也還是RDD,所以之前介紹的操作(例如filter)也同樣適用于PairRDD。下面程序,篩選長度大于20的行:
/** * PairRDD篩選長度大于20的行。 * @param JavaPairRDD<String,String> * @return JavaPairRDD<String,String> */ public JavaPairRDD<String,String> filterMoreThanTwentyLines (JavaPairRDD<String,String> input){ JavaPairRDD<String,String> filter_rdd = input.filter( new Function<Tuple2<String, String>,Boolean>(){ @Override public Boolean call(Tuple2<String, String> arg0) throws Exception { // TODO Auto-generated method stub return (arg0._2.length()>20); } } ); return filter_rdd; }
2.3 聚合操作:
./spark-shell --jars /spark/alluxio-1.2.0/core/client/target/alluxio-core-client-1.2.0-jar-with-dependencies.jar
Pair RDD提供下面方法:
1. reduceByKey()方法:可以分別歸約每個鍵對應的數據;
2. join()方法:可以把兩個RDD中鍵相同的元素組合在一起,合并為一個RDD。
一、創建Pair RDD:
當需要將一個普通RDD轉化成一個Pair RDD時,可以使用map()函數來實現。
程序4-1: 使用第一個單詞作為鍵建出一個Pair RDD:
val text1 = sc.textFile("file:///spark/spark/README.md")
val pair1 = text1.map(x=>( x.split(" ")(0),x))
println(pair1.collect().mkString(" "))
程序4-2: 對Pair RDD的第2個元素篩選:
val text2 = sc.textFile("file:///spark/spark/README.md")
val pair_base2 = text2.map(x=>( x.split(" ")(0),x))
val pair2 = pair_base2 .filter{case(key,value)=>value.length<20}
println(pair2.collect().mkString(" "))
對于二元組數據,有時我們只想訪問Pair RDD的值的部分,這時操作二元組很麻煩。可以使用mapValues(func)函數,單操作value,不操作key,功能類似于map{case(x,y):(x,func(y))}
reduceByKey()與reduce()類似,reduceByKey()會為數據集中的每個鍵進行并行的歸約操作,每個歸約操作會將鍵相同的值合并起來。返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。
foldByKey()使用一個與RDD和合并函數中的數據類型相同的零值作為初始值。
用scala從一個內存中的數據集創建PairRDD時,只需要對這個由二元組組成的集合調用SparkContext.parallelize()方法。
程序4-3: 計算每個鍵對應的平均值:
val text3 = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
val text3_final = text3.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
println(text3_final.collect().mkString(" "))
調用reduceByKey()和foldByKey()會在每個鍵計算全局的總結果之前先自動在每臺機器上進行本地合并。
其中,mapValues(x=>(x,1))得到的輸出結果是:"panda",(0,1) ;"pink",(3,1) ;"pirate",(3,1) ;"panda",(1,1) ;"pink",(4,1) ;
下一步,reduceByKey((x,y)=>(x._1+y._1,x._2+y._2));自動合并同一個key的數據。例如對于panda,(0,1) ,(1,1) => (0+1 , 1+1) {解釋:(0,1)的第一個數據加上(1,1)的第一個數據作為第一個數據,(0,1)的第二個數據加上(1,1)的第二個數據作為第二個數據} 也就是 (1,2)。
程序4-3的輸出結果類似于: (pink,(7,2)) (pirate,(3,1)) (panda,(1,2)) ;將每個key的總和得到、并將每個key出現的次數得到。
程序4-4: 實現單詞計數:
val text4 = sc.textFile("file:///spark/spark/README.md")
val words = text4.flatMap(x=>x.split(" "))
words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
combineByKey()是最為常用的基于鍵進行聚合的函數。combineByKey()可以讓用戶返回與輸入數據的類型不同的返回值。
如果第一次出現一個新的元素(鍵)(在每一個分區中第一次出現,而不是整個RDD中第一次出現),會使用createCombiner()函數來創建那個鍵對應的累加器的初始值。
如果是一個在處理當前分區之前已經遇到的鍵,則使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合并。
combineByKey()有多個參數分別對應聚合操作的各個階段,因而非常適合用來解釋聚合操作各個階段的功能劃分。
程序4-5: 使用combineByKey()來實現計算每個key的平均值:
val input = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
val result = input.combineByKey( (v) => (v,1),
(acc:(Int,Int),v) => (acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(key,value)=>(key,value._1/value._2.toFloat)}
result.collectAsMap().map(println(_))
其中combineByKey()接收3個函數:
第一個函數是createCombiner(),也就是在某一個分區第一次碰到一個新的key,比如panda是第一次出現,則調用createCombiner函數。
第二個函數是mergeValue(),在同一個分區中,如果出現了之前出現過的一個key,例如是panda,這時候調用這個函數,
第三個函數是mergeCombiners(),
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。