您好,登錄后才能下訂單哦!
[TOC]
RDD:彈性分布式數據集,是spark的核心重點
算子:操作RDD的一些函數
application:用戶的寫的spark程序(DriverProgram + ExecutorProgram)
job:一個action類算子觸發的操作
stage:一組任務,會根據依賴關系將job劃分成若干個stage
task:同一個stage內部有多個同樣操作的task(但處理的數據不同),是集群中最小的執行單元
可能說完這些概念,其實還是不太懂,沒關系,這只是先有點印象
? RDD,全稱:Resilient Distributed Dataset,也就是彈性分布式數據集。是spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可并行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。可能這還不清晰,我舉個例子:
假設我使用sc.textFile(xxxx)從hdfs中讀取一個文件的數據,那么文件的數據就相當于一個RDD,但是事實上這個文件的數據在處理時是處于多個不同的worker節點上的,但是在邏輯上,在這個spark集群,這些數據都是屬于一個RDD中的。這也就是為什么說RDD是個邏輯概念,它是整個集群的一個抽象對象,分布在集群中。由此看出,RDD是spark實現數據分布式計算處理的關鍵。例如:
? 圖 2.1 RDD原理
關于RDD的屬性,源碼中有一段注釋,如下:
* Internally, each RDD is characterized by five main properties:
* - A list of partitions
1、是一組分區
理解:RDD是由分區組成的,每個分區運行在不同的Worker上,通過這種方式,實現分布式計算,即數據集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。
* - A function for computing each split
2、split理解為分區
在RDD中,有一系列函數,用于處理計算每個分區中的數據。這里把函數叫做算子。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
算子類型:
transformation action
* - A list of dependencies on other RDDs
3、RDD之間存在依賴關系。窄依賴,寬依賴。
需要用依賴關系來劃分Stage,任務是按照Stage來執行的。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
4、可以自動以分區規則來創建RDD
創建RDD時,可以指定分區,也可以自定義分區規則。
當前Spark中實現了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
5、優先選擇離文件位置近的節點來執行任務。
移動計算,不移動數據
這點需要解釋下:一般來說spark是構建在hdfs之上,從hdfs中讀取數據進行處理的。而hdfs是一個分布式存儲,比如有A、B、C三個datanode,假設spark要處理的數據剛好存儲在C節點上。如果spark此時將任務放在B節點或者A節點上執行,那么就得先從C節點讀取數據,然后經過網絡傳輸到A或B節點,然后才能處理,這其實是很耗費性能。而這里spark的意思是優先在離處理數據比較近的節點上執行任務,也就是優先在C節點上執行任務。這就可以節省數據傳輸所耗費的時間和性能。也就是移動計算而不移動數據。
創建RDD首先需要創建 SparkContext對象:
//創建spark配置文件對象.設置app名稱,master地址,local表示為本地模式。
//如果是提交到集群中,通常不指定。因為可能在多個集群匯上跑,寫死不方便
val conf = new SparkConf().setAppName("wordCount").setMaster("local")
//創建spark context對象
val sc = new SparkContext(conf)
通過sc.parallelize() 創建RDD:
sc.parallelize(seq,numPartitions)
seq:為序列對象,如list,array等
numPartitions:分區個數,可以不指定,默認是2
例子:
val rdd1 = sc.parallelize(Array(1,2,3,4,5),3)
rdd1.partitions.length
通過外部數據源創建
val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
算子有分為transformation和action類型,
transformation:
延遲計算,lazy懶值,不會觸發計算。只有遇到action算子才會觸發計算。它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行
action:
和transformation類似,但是是直接觸發計算的,不會等待
這里為了方便講解,實現創建一個rdd,都使用spark-shell中進行演示:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,8,34,100,79))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
map[U](f:T=>U)()
參數是一個函數,并且要求函數參數是單個,返回值也是單個。用函數處理傳入的數據,然后返回處理后的數據
例子:
//這里傳入一個匿名函數,將rdd1中的每個值*2,并返回處理的新數組
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
//這里collect是一個action算子,觸發計算并打印結果
scala> rdd2.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 200, 158)
filter(f:T=>boolean)
參數是一個判斷函數,判斷傳入的參數,然后返回true還是false,常用來過濾數據。最后將true的數據返回
例子:
//過濾出大于20的數據
scala> rdd2.filter(_>20).collect
res4: Array[Int] = Array(68, 200, 158)
flatMap(f:T=>U)
先map后flat,flat就是將多個列表等對象展開合并成一個大列表。并返回處理后的數據。這個函數一般用來處理多個一個列表中還包含多個列表的情況
例子:
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","x y z"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24
//處理邏輯是:將array中每個字符串按空格切割,然后生成多個array,接著將多個array展開合并一個新的array
scala> val rdd5 = rdd4.flatMap(_.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at <console>:26
scala> rdd5.collect
res5: Array[String] = Array(a, b, c, d, e, f, x, y, z)
union(otherDataset) 并集
intersection(otherDataset) 交集
distinct([numTasks]))去重
例子:
scala> val rdd6 = sc.parallelize(List(5,6,7,8,9,10))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val rdd7 = sc.parallelize(List(1,2,3,4,5,6))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
//并集
scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[22] at union at <console>:28
scala> rdd8.collect
res6: Array[Int] = Array(5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6)
//去重
scala> rdd8.distinct.collect
res7: Array[Int] = Array(4, 8, 1, 9, 5, 6, 10, 2, 7, 3)
//交集
scala> val rdd9 = rdd6.intersection(rdd7)
rdd9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at intersection at <console>:28
scala> rdd9.collect
res8: Array[Int] = Array(6, 5)
groupByKey([numTasks]):只是將同一key的進行分組聚合
reduceByKey(f:(V,V)=>V, [numTasks]) 首先是將同一key的KV進行聚合,然后對value進行操作。
scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("Jerry",1000),("Tom",3000),("Mike",2000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[34] at union at <console>:28
scala> rdd3.collect
res9: Array[(String, Int)] = Array((Tom,1000), (Jerry,3000), (Mary,2000), (Jerry,1000), (Tom,3000), (Mike,2000))
scala> val rdd4 = rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[35] at groupByKey at <console>:30
//分組
scala> rdd4.collect
res10: Array[(String, Iterable[Int])] =
Array(
(Tom,CompactBuffer(1000, 3000)),
(Jerry,CompactBuffer(3000, 1000)),
(Mike,CompactBuffer(2000)),
(Mary,CompactBuffer(2000)))
注意:使用分組函數時,不推薦使用groupByKey,因為性能不好,官方推薦reduceByKey
//分組并聚合
scala> rdd3.reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((Tom,4000), (Jerry,4000), (Mike,2000), (Mary,2000))
這個函數的功能不太好總結,直接看例子吧
scala> val rdd1 = sc.parallelize(List(("Tom",1),("Tom",2),("jerry",1),("Mike",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry",2),("Tom",1),("Bob",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[40] at cogroup at <console>:28
scala> rdd3.collect
res12: Array[(String, (Iterable[Int], Iterable[Int]))] =
Array(
(Tom,(CompactBuffer(1, 2),CompactBuffer(1))),
(Mike,(CompactBuffer(2),CompactBuffer())),
(jerry,(CompactBuffer(1),CompactBuffer(2))),
(Bob,(CompactBuffer(),CompactBuffer(2))))
sortByKey(acsending:true/false) 根據KV中的key排序
sortBy(f:T=>U,acsending:true/false) 一般排序,且是對處理后的數據進行排序,可以用來給KV的,按照value進行排序
例子:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,8,34,100,79))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 200, 158)
scala> rdd2.sortBy(x=>x,true)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:29
scala> rdd2.sortBy(x=>x,true).collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 158, 200)
scala> rdd2.sortBy(x=>x,false).collect
res3: Array[Int] = Array(200, 158, 68, 16, 10, 8, 6, 4, 2)
另外一個例子:
需求:
我們想按照KV中的value進行排序,但是SortByKey按照key排序的。
做法一:
1、第一步交換,把key value交換,然后調用sortByKey
2、KV再次調換位置
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",1),("kitty",2),("bob",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",3),("kitty",5),("bob",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24
scala> val rdd3 = rdd1 union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[44] at union at <console>:28
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[45] at reduceByKey at <console>:30
scala> rdd4.collect
res13: Array[(String, Int)] = Array((bob,3), (tom,4), (jerry,3), (kitty,7))
//調換位置再排序,然后再調回來
scala> val rdd5 = rdd4.map(t => (t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[50] at map at <console>:32
scala> rdd5.collect
res14: Array[(String, Int)] = Array((kitty,7), (tom,4), (bob,3), (jerry,3))
做法二:
直接使用sortBy 這個算子,可以直接按照value排序
reduce
類似前面的reduceByKey,但是用于非KV的數據合并,且是action算子
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:24
scala> val rdd2 = rdd1.reduce(_+_)
rdd2: Int = 15
還有一些action算子:
reduce(func) 通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可并聯的
collect() 在驅動程序中,以數組的形式返回數據集的所有元素。通常只是用于觸發計算
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似于take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子
takeOrdered(n, [ordering]) ,返回一個由數據集的前n個元素組成的數組,并排序
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在數據集的每一個元素上,運行函數func進行更新。
RDD也存在緩存機制,也就是將RDD緩存到內存或者磁盤中,不用重復計算。
這里涉及到幾個算子:
cache() 標識該rdd可以被緩存,默認緩存在內存中,底層其實是調用persist()
persist() 標識該rdd可以被緩存,默認緩存在內存中
persist(newLevel : org.apache.spark.storage.StorageLevel) 和上面類似,但是可以指定緩存的位置
可以緩存的位置有:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
基本就分為兩類:
純內存緩存
純磁盤緩存
磁盤+內存緩存
一般來說,直接是默認的位置,也就是緩存在內存中性能較好,但是會耗費很多內存,這點要注意,如無需要,就不要緩存了。
舉例:
讀取一個大文件,統計行數
scala> val rdd1 = sc.textFile("hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD[52] at textFile at <console>:24
scala> rdd1.count
res15: Long = 923452
觸發計算,統計行數
scala> rdd1.cache
res16: rdd1.type = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD[52] at textFile at <console>:24
標識這個RDD可以被緩存,不會觸發計算
scala> rdd1.count
res17: Long = 923452
觸發計算,并把結果進行緩存
scala> rdd1.count
res18: Long = 923452
直接從緩存中讀取數據。
要注意一個點:調用cache方法的時候,只是說標識該rdd在后續觸發計算的時候,結果可以被緩存,而不是說當前rdd就被緩存了,這點要分清楚
? spark在計算時,中間涉及到RDD的多個轉換過程,如果這時候RDD的某個分區計算故障,導致結果丟失了。最簡單的辦法自然是從頭開始重新計算,但是這樣很浪費時間。而checkpoint就是在觸發計算的時候,對RDD進行檢查點狀態保存,如果后面的計算出錯了,還可以從檢查點開始重新計算。
? checkpoint一般是在具有容錯能力,高可靠的文件系統上(比如HDFS, S3等)設置一個檢查點路徑,用于保存檢查點數據。出錯的時候直接從檢查點目錄讀取。有本地目錄和遠程目錄兩種模式。
這種模式下,要求運行在本地模式下,不能用在集群模式下,一般用于測試開發
sc.setCheckpointDir(本地路徑) 設置本地checkpoint路徑
rdd1.checkpoint 設置檢查點
rdd1.count 遇到action類算子,觸發計算,就會在checkpoint目錄生成檢查點
這種模式要求運行在集群模式下,用于生產環境
scala> sc.setCheckpointDir("hdfs://192.168.109.132:8020/sparkckpt0619")
scala> rdd1.checkpoint
scala> rdd1.count
res22: Long = 923452
用法都是類似,只是目錄不一樣
要注意,使用checkpoint的時候,源碼中有一段話如下:
this function must be called before any job has been executed on this RDD. It is strongly recommend that this rdd is
persisted in memory,otherwise saving it on a file will require recomputation.
大致意思就是:
要在開始計算前就調用這個方法,也就是action算子之前。而且最好設置這個rdd緩存在內存中,否則保存檢查點的時候,需要重新計算一次。
? 這個是RDD運行原理中的一個重點概念。
? 首先要說說依賴,依賴的意思就是RDD之間是有依賴關系的,因為spark計算過程中涉及到多個RDD的轉換。RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。看圖
? 圖2.2 RDD的寬窄依賴
寬依賴:
一個父RDD的partition被多個子RDD的partition依賴。其實就是將父RDD的數據進行shuffle的過程,因為父RDD一個partition被多個RDD的partition依賴,意味著需要將父RDD的數據打亂分配給多個RDD,打亂的過程其實就是shuffle。一般實際情況是多個父RDD的partition和多個子RDD的partition互相交錯的依賴。
窄依賴:
一個父RDD的partition最多被一個子RDD的partition依賴
? 圖2.3 RDD依賴
? DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG劃分成不同的Stage。寬窄依賴的作用就是用劃分stage,stage之間是寬依賴,stage內部是窄依賴。
? 對于窄依賴,由于父和子RDD的partition都是一對一的依賴關系,所以可以父和子的轉換可以放在一個task中執行,例如上面的task0,CDF都是窄依賴,所以CDF直接的轉換是可以放在一個task中執行的。一個stage內部都是窄依賴
? 對于寬依賴,由于有shuffle的存在,那么就要求所有父RDD都處理完成后,才能執行執行shuffle,接著子RDD才能進行處理。由于shuffle的存在,導致task任務鏈必定不是連續的了,需要重新開始規劃task任務鏈,所以寬依賴是劃分stage的依據。
? 再往深的說,為什么要劃分stage?
? 根據寬依賴劃分了stage之后,因為寬依賴的shuffle,所以導致task鏈是無法連續。而劃分stage就是讓一個stage內部只有窄依賴,窄依賴是一對一的關系,那么task鏈就是連續的,沒有shuffle,就比如上面task0中,C->D->F,中的一個分區,轉換過程都是一對一的,所以是一個連續的task鏈,放在一個task中,而另外一個分區類似,就放在task1中。因為F->G是寬依賴,需要shuffle,所以task鏈無法連續。像這種一條線把RDD轉換邏輯串起來,直到遇到寬依賴,就是一個task,而一個task處理的實際上是一個分區的數據轉換過程。而在spark中,task是最小的調度單位,spark會將task分配到離分區數據近的worker節點上執行。所以其實spark調度是task。
? 那么回到最開始的問題上,為什么要劃分stage,因為根據寬窄依賴劃分出stage之后,stage內部就可以很方便劃分出一個個task,每個task處理一個分區的數據,然后spark就將task調度到對應的worker節點上執行。所以從劃分stage到劃分task,核心就在于實現并行計算。
? 所以,最后就是一句話,劃分stage的目的是為了更方便的劃分task
? 說到這里,其實我們想到一個問題,RDD里面存儲的是數據嗎?實際上并不是,它存儲的實際上對數據的轉換鏈,說的具體點是對分區的轉換鏈,也就是task中包含的算子。而當劃分stage,接著劃分task之后,一個task內部有什么算子就已經很清楚了,接著就是把計算任務發送到worker節點執行。這種計算我們稱為 pipeline計算模式,算子就是在管道中的。
? 由此,其實RDD叫做彈性分布式數據集,并不是說它存儲數據,而是存儲了操作數據的方法函數,也就是算子。
2.10.1 mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ? Iterator[U])
參數說明:
f是一個函數參數,需要自定義。
f 接收兩個參數,第一個參數是Int,代表分區號。第二個Iterator[T]代表該分區中的所有元素。
通過這兩個參數,可以定義處理分區的函數。
Iterator[U] : 操作完成后,返回的結果。
舉例:
將每個分區中的元素,包括分區號,直接打印出來。
//先創建一個rdd,指定分區數為3
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def fun1(index:Int,iter:Iterator[Int]):Iterator[String]={
| iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res0: Array[String] = Array(
[PartId: 0 , value = 1 ], [PartId: 0 , value = 2 ],
[PartId: 1 , value = 3 ], [PartId: 1 , value = 4 ], [PartId: 1 , value = 5 ],
[PartId: 2 , value = 6 ], [PartId: 2 , value = 7 ], [PartId: 2 , value = 8 ]
)
2.10.2 aggregate
聚合操作,類似于分組(group by).
但是aggregate是先局部聚合(類似于mr中的combine),然后再全局聚合。性能比直接使用reduce算子要好,因為reduce是直接全局聚合的。
def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)
參數說明:
zeroValue:初始值,這個初始值會加入到每一個分區中,最后也會加入到全局操作中
seqOp: (U, T) ? U:局部聚合操作函數
combOp: (U, U) ? U:全局聚合操作函數
=================================================
例子1:
初始值是10
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
//打印看看分區情況
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res7: Array[String] = Array([PartId: 0 , value = 1 ], [PartId: 0 , value = 2 ], [PartId: 1 , value = 3 ], [PartId: 1 , value = 4 ], [PartId: 1 , value = 5 ])
//求出每個分區的最大值,最后得出每個分區最大值,然后全局之后將每個最大值相加
scala> rdd2.aggregate(10)(max(_,_),_+_)
res8: Int = 30
為什么這里是10呢?
初始值是10 代表每個分區都多了一個10
局部操作,每個分區最大值都是10
全局操作,也多一個10 即 10(局部最大) + 10(局部最大) + 10(全局操作默認值) = 30
=================================================
例子2:
使用aggregate將所有分區全局數據求和,有兩種方式:
1、reduce(_+_)
2、aggregate(0)(_+_,_+_)
2.10.3 aggregateByKey
類似于aggregate操作,區別:操作的 <key value> 的數據,只操作同一key的中的value。是將同一key的KV先局部分組,然后對value聚合。然后再全局分組,再對value聚合。
aggregateByKey和reduceByKey實現的類似的功能,但是效率比reduceByKey高
例子:
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
def fun1(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={
iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
}
pairRDD.mapPartitionsWithIndex(fun1).collect
scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:27
scala> def fun1(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={
| iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
scala> pairRDD.mapPartitionsWithIndex(fun1).collect
res31: Array[String] = Array(
[PartId: 0 , value = (cat,2) ], [PartId: 0 , value = (cat,5) ], [PartId: 0 , value = (mouse,4) ],
[PartId: 1 , value = (cat,12) ], [PartId: 1 , value = (dog,12) ], [PartId: 1 , value = (mouse,2)
])
需求:
找到每個分區中,動物最多的動物,進行就和
pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
0:(cat,2)和(cat,5) --> (cat,5) (mouse,4)
1:(cat,12) (dog,12) (mouse,2)
求和:(cat,17) (mouse,6) (dog,12)
2.10.4 coalesce 和 repartition
這兩者都用于重分區
repartition(numPartition) 指定重分區個數,一定會發生shuffle
coalesce(numPartition,shuffleOrNot) 指定重分區個數,默認不會發生shuffle,可以指定shuffle
要看更多算子的用法,<http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html>
寫的很詳細
spark自帶了兩個分區類:
HashPartitioner:這個是默認的partitioner,在一些涉及到shuffle的算子會用到。在一些可以指定最小分區數量的算子中,就會涉及到分區。這些分區只能用于KV對
RangePartitioner:按照key的范圍進行分區,比如1~100,101~200分別是不同分區的
用戶也可以自己自定義分區,步驟如下:
1、先繼承Partitioner類,里面寫分區邏輯,形成一個新的分區類
2、rdd.partitionBy(new partiotionerClassxxx())
例子:
數據格式如下:
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
需求:
將同一個頁面的訪問日志各自寫到一個單獨的文件中
代碼:
package SparkExer
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 自定義分區:
* 1、先繼承Partitioner類,里面寫分區邏輯,形成一個新的分區類
* 2、rdd.partitionBy(new partiotionerClassxxx())
*/
object CustomPart {
def main(args: Array[String]): Unit = {
//指定hadoop的家目錄,寫入文件到本地時需要用到hadoop的一些包
System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.2")
val conf = new SparkConf().setAppName("Tomcat Log Partitioner").setMaster("local")
val sc = new SparkContext(conf)
//切割文件
val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt").map(
line => {
val jspName = line.split(" ")(6)
(jspName,line)
}
)
//提取出所有key,也就是網頁名
val rdd2 = rdd1.map(_._1).distinct().collect()
//分區
val rdd3 = rdd1.partitionBy(new TomcatWebPartitioner(rdd2))
//將分區數據寫到文件中
rdd3.saveAsTextFile("G:\\test\\tomcat_localhost")
}
}
class TomcatWebPartitioner(jspList:Array[String]) extends Partitioner{
private val listMap = new mutable.HashMap[String,Int]()
var partitionNum = 0
//根據網頁名稱,規劃整個分區個數
for (s<-jspList) {
listMap.put(s, partitionNum)
partitionNum += 1
}
//返回分區總個數
override def numPartitions: Int = listMap.size
//按照key返回某個分區號
override def getPartition(key: Any): Int = listMap.getOrElse(key.toString, 0)
}
? 首先我們知道一點,一個spark程序其實是分為兩部分的,一部分driver,它也是在executor中運行的,另一部分則是普通的executor,用于運行操作RDD的task的。所以其實也可以看出,只有是對RDD操作的代碼才會進行分布式運行,分配到多個executor中運行,但是不屬于RDD的代碼是不會的,它僅僅是在driver中執行。這就是關鍵了。
例子:
object test {
val sc = new SparkContext()
print("xxxx1")
val rdd1 = sc.textFile(xxxx)
rdd1.map(print(xxx2))
}
例如上面的例子,rdd1中的print(xxx2)會在多個executor中執行,因為它是在rdd內部執行,而外面的print("xxxx1")只會在driver中執行,也沒有實現序列化,所以實際上也不可能通過網絡傳遞。所以這種區別一定要了解。由此我們可以知道,如果變量什么的不是在rdd內部的話,是不可能被多個executor上的程序獲得的。但是如果我們想這樣呢?而且是不需要定義在rdd內部。那么就得用到下面的共享變量了
廣播變量就是可以實現將driver中的變量給在不同的executor中運行的rdd算子調用,而且無需再rdd算子內部定義。常見的比如連接mysql等數據庫的連接對象,可以設置為廣播變量,這樣就可以只創建一個連接了。
用法例子:
//定義共享變量,用于共享從mongodb讀取的數據,需要將數據封裝成 map(mid1,[map(mid2,score),map(mid3,score)....])的形式
val moviesRecsMap = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MOVIES_RECS)
.format("com.mongodb.spark.sql")
.load().as[MoviesRecs].rdd.map(item=> {
(item.mid, item.recs.map(itemRecs=>(itemRecs.mid,itemRecs.socre)).toMap)
}).collectAsMap()
這是關鍵的一步,就是廣播變量出去
//將此變量廣播,后面就可以在任意一個executor中調用了
val moviesRecsMapBroadcast = spark.sparkContext.broadcast(moviesRecsMap)
//因為是懶值加載,所以需要手動調用一次才會真正廣播出去
moviesRecsMapBroadcast.id
需求:根據網站訪問日志統計出訪問量前N位的網頁名稱
數據格式如下:
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
代碼:
package SparkExer
import org.apache.spark.{SparkConf, SparkContext}
/**
* 分析tomcat日志
* 日志例子:
* 192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
*
* 統計每個頁面的訪問量
*/
object TomcatLog {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Tomcat Log analysis").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
.map(_.split(" ")(6))
.map((_,1))
.reduceByKey(_+_)
.map(t=>(t._2,t._1))
.sortByKey(false)
.map(t=>(t._2,t._1))
.collect()
//也可以使用 sortBy(_._2,false)直接根據value進行排序
//取出rdd中的前N個數據
rdd1.take(2).foreach(x=>println(x._1 + ":" + x._2))
println("=========================================")
//取出rdd中的后N個數據
rdd1.takeRight(2).foreach(x=>println(x._1 + ":" + x._2))
sc.stop()
}
}
請看前面2.11中分區的例子
package SparkExer
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
object SparkConMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
.map(_.split(" ")(6))
rdd1.foreach(l=>{
//jdbc操作需要包含在rdd中才能被所有worker上的executor調用,也就是借用rdd實現序列化
val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8"
var conn:Connection = null
//sql語句編輯對象
var ps:PreparedStatement = null
conn = DriverManager.getConnection(jdbcUrl, "root", "wjt86912572")
//?是占位符,后面需要ps1.setxxx(rowkey,value)的形式填充值進去的,按先后順序
ps = conn.prepareStatement("insert into customer values (?,?)")
ps.setString(1,l)
ps.setInt(2,1)
})
}
}
注意:
spark操作jdbc時,如果直接使用jdbc操作數據庫,會有序列化的問題。
因為在spark分布式框架中,所有操作RDD的對象應該是屬于RDD內部的,
才有可能在整個分布式集群中使用。也就是需要序列化。
通俗來說:5個worker共享一個jdbc連接對象,和5個worker每個單獨創建一個連接對象的區別
所以在定義jdbc連接對象時,需要在RDD內部定義
上面的方式過于繁瑣,而且每個數據都會新建一個jdbc連接對象
優化:使用rdd1.foreachPartition()來對每個分區操作,而不是對每條數據操作
這樣可以通過只為每個分區創建一個jdbc連接對象來節省數據庫資源
package SparkExer
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
object SparkConMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
.map(_.split(" ")(6))
rdd1.foreachPartition(updateMysql)
/**
* 上面的方式過于繁瑣,而且每個數據都會新建一個jdbc連接對象
* 優化:使用rdd1.foreachPartition()來對每個分區操作,而不是對每條數據操作
* 這樣可以通過只為每個分區創建一個jdbc連接對象來節省數據庫資源
*/
}
def updateMysql(it:Iterator[String]) = {
val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8"
var conn:Connection = null
//sql語句編輯對象
var ps:PreparedStatement = null
conn = DriverManager.getConnection(jdbcUrl, "root", "wjt86912572")
//conn.createStatement()
//ps = conn.prepareStatement("select * from customer")
//?是占位符,后面需要ps1.setxxx(rowkey,value)的形式填充值進去的,按先后順序
ps = conn.prepareStatement("insert into customer values (?,?)")
it.foreach(data=>{
ps.setString(1,data)
ps.setInt(2,1)
ps.executeUpdate()
})
ps.close()
conn.close()
}
}
另外一種連接mysql的方式就是通過JdbcRDD對象去連接
package SparkExer
import java.sql.DriverManager
import org.apache
import org.apache.spark
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlJDBCRdd {
def main(args: Array[String]): Unit = {
val conn = () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8",
"root",
"wjt86912572")
}
val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
val sc = new SparkContext(conf)
//創建jdbcrdd對象
val mysqlRdd = new JdbcRDD(sc,conn,"select * from customer where id>? and id<?", 2,7,2,r=> {
r.getString(2)
})
}
}
這個對象的使用局限性很大,只能用于select,而且必須傳入where中的兩個限制值,還要指定分區
https://www.cnblogs.com/diaozhaojian/p/9635829.html
1、數據傾斜原理
(1)在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。
(2)由于shuffle之后的分區規則,導致某個分區數據量過多,導致數據傾斜
2、數據傾斜問題發現與定位
通過Spark Web UI來查看當前運行的stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
知道數據傾斜發生在哪一個stage之后,接著我們就需要根據stage劃分原理,推算出來發生傾斜的那個stage對應代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。通過countByKey查看各個key的分布。
3、數據傾斜解決方案
過濾少數導致傾斜的key
提高shuffle操作的并行度
局部聚合和全局聚合
1、去重:
def distinct()
def distinct(numPartitions: Int)
2、聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
3、排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
4、重分區
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
5、集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。