您好,登錄后才能下訂單哦!
API
應用可以通過使用Spark提供的庫獲得Spark集群的計算能力,這些庫都是Scala編寫的,但是Spark提供了面向各種語言的API,例如Scala、Python、Java等,所以可以使用以上語言進行Spark應用開發。?
Spark的API主要由兩個抽象部件組成:SparkContext和RDD,應用程序通過這兩個部件和Spark進行交互,連接到Spark-集群并使用相關資源。
1.SparkContext
是定義在Spark庫中的一個類,作為Spark庫的入口。包含應用程序main()方法的Driver program通過SparkContext對象訪問Spark,因為SparkContext對象表示與Spark集群的一個連接。每個Spark應用都有且只有一個激活的SparkContext類實例,如若需要新的實例,必須先讓當前實例失活。?
(在shell中SparkContext已經自動創建好,就是sc)
實例化SparkContext:
val sc = new SparkContext()
1
2.RDD
RDD基礎概念
彈性分布式數據集(Resilient Distributed Dataset)
并行分布在整個集群中?
把指定路徑下的文本文件加載到lines這個RDD中,這個lines就是一個RDD,代表是就是整個文本文件
val lines = sc.textFile("home/haha/helloSpark.txt")
1
即使這個文件超大,分片存儲在多臺機器上,操作時可以直接使用RDD對整體文件進行操作
RDD是Spark分發數據和計算的基礎抽象類?
例如:lines.count()?
在.count()的函數操作是在RDD數據集上的,而不是對某一具體分片
一個RDD是一個不可改變的分布式集合對象?
就lines來說,如果我們對其所代表的源文件進行了增刪改操作,則相當于生成了一個新的RDD,來存放修改后的數據集
Spark中所有的計算都是通過RDD的創建、轉換,操作完成的
一個RDD內部由許多partitions(分片)組成?
partitions:?
每個分片包括一部分數據,分片可在集群不同節點上計算?
分片是Spark并行處理的單元,Spark順序的,并行的處理分片
RDD創建方法
1.把一個存在的集合傳給SparkContext的parallize()方法(一般測試的時候使用這個方法)
val rdd = sc.parallelize(Array(1,2,2,1),4)?
//參數1:待并行化處理的集合;參數2:分區個數
rdd.count() //查看個數
rdd.foreach(print) //查看RDD的所有對象
//注意:每次foreach出來數值順序會不一樣,因為數據存儲于4個分區,從哪個分區讀取數值是隨機的
1
2
3
4
5
6
7
2.加載外部數據集
//用textFile方法加載
//該方法返回一個RDD,該RDD代表的數據集每個元素都是一個字符串,每個字符串代表輸入文件中的一行
val rddText = sc.textFile("helloSpark.txt")
//用wholeTextfiles方法加載
//這個方法讀取目錄下的所有文本文件,然后返回一個KeyValue對RDD(每一個鍵值對對應一個文件,key為文件路徑,value為文件內容)
val rddW = sc.wholeTextFile("path/to/my-data/*.txt")
//用sequenceFile方法加載
//此方法要求從SequenceFile文件中獲取鍵值對數據,返回一個KeyValue對RDD(使用此方法時,還需要提供類型)
val rdd = sc.sequenceFile[String,String]("some-file")
1
2
3
4
5
6
7
8
9
10
11
RDD的轉換操作
Transformation(轉換)—– 從之前的RDD中構建一個新的RDD
逐元素?
map():接收函數后,把函數應用到RDD的每一個元素,返回新的RDD
val lines1 = sc.parallelize(Array("hello","spark","hello","world","!"))
val lines2 = lines1.map(word=>(word,1))
lines2.foreach(println)
//打印出:
(hello,1)
(spark,1)
(hello,1)
(world,1)
(!,1)
1
2
3
4
5
6
7
8
9
10
11
filter():接收函數后,返回只包含滿足filter()函數的元素的新RDD
val lines3 = lines.filter(word=>word.contains("hello"))
lines3.foreach(println)
//打印出:
hello
hello
1
2
3
4
5
6
7
flatMap():對每個輸入元素,輸出多個輸出元素,也就是將RDD中元素flat(壓扁)后返回一個新的RDD
val input = sc.textFile('/home/haha/helloSpark.txt')
val lines4 = input.flatMap(line=>line.split(" ")) //用空格切分一行的單詞
1
2
3
集合運算?
RDD支持數學集合的計算,例如并集、交集計算
val rdd1 = sc.parallelize(Array("coffe","coffe","panda","monkey","tea"))
val rdd2 = sc.parallelize(Array("coffe","monkey","kitty"))
val rdd_distinct = rdd1.distinct(rdd2) //合并,去重
val rdd_union = rdd1.union(rdd2) // 并集
val rdd_inter = rdd1.intersection(rdd2) // 交集
val rdd_sub = rdd1.subtract(rdd2) // rdd1中有rdd2中沒有的
1
2
3
4
5
6
7
RDD的行動操作
Action(行動) —– 在RDD上計算出一個結果,并且把結果返回給driver program或保存在文件系統。例如:count(),save
RDD幾個常用的Action:
?
舉例說說幾個重要操作:
reduce():接收一個函數后,作用在RDD兩個類型相同的元素上,返回新元素;可以實現RDD中元素的累加、計數,和其它類型的聚集操作。
val rdd = sc.parallelize(Array(1,2,3,3))
rdd.collect() //數組形式輸出
//打印出:
Array[Int] = Array(1,2,3,3)
rdd.reduce((x,y)=>x+y) //以x,y代表同種類型數據
//打印出:
Int = 9 //因為該RDD表示的數組中int類型數據總和為9
1
2
3
4
5
6
7
8
9
collect():遍歷整個RDD,向driver program返回RDD的內容,需要單機內存能夠容納下(因為數據要拷貝給dirver,測試用該函數,方便看到數據所有內容)。在處理大量數據時,使用saveAsTextFile()等
take(n):返回RDD的n個元素(同時嘗試訪問最少的partitions),返回結果是無序的(測試時使用) —– 測試時,隨機取n個元素
top(n):返回排序后的前n個元素,排序(根據RDD中數據的比較器)
rdd.top(1)
//打印出:Array[Int] = Array(3)
rdd.top(2)
//打印出:Array[Int] = Array(3,3)
rdd.top(3)
//打印出:Array[Int] = Array(3,3,2)
1
2
3
4
5
6
foreach():?
計算RDD中每個元素,但不返回到本地(只過一遍,不保存),一般配合print/println打印出數據
rdd.foreach(println)
1
RDD的特性
RDDs的血統關系圖?
Spark維護著RDDs之間的依賴關系和創建關系,叫做血統關系圖?
Spark使用血統關系圖來計算每個RDD的需求和恢復丟失的數據
以上為RDDs的血統關系圖示例(記錄RDD從哪來的,往后又去干啥了)
惰性操作/延遲計算(Lazy Evaluation)?
RDD的創建和轉換方法都是惰性操作,并不會立即執行?
例如,當使用SparkContext的textFile方法從HDFS中讀取文件時,Spark并不會馬上從硬盤中讀取文件,數據只有在必要時才會被加載?
Spark僅僅記錄了這個RDD是怎么創建的,在它上面進行操作又會創建怎樣的RDD等信息,為每個RDD維護其血統信息,在需要時創建或重建RDD?
Spark對RDD的計算,在第一次使用action操作的時候才會執行?
Spark通過內部記錄metadata表,以表明transformations操作已經被響應了
緩存?
回顧RDD的創建有兩種方法,可以從存儲系統中讀取數據,也可以從現有RDD(集合)的轉換操作而來
默認每次在RDDs上面進行action操作時,Spark都遍歷這個調用者RDD的血統關系圖,執行所有的轉換來創建它重新計算RDDs?
如果想要重復利用一個RDD(直接利用之前計算出的某個RDD結果),可以使用cache()/persist()
cache?
把RDD存儲在集群中執行者的內存中,實際上是將RDD物化在內存中
persist?
是一個通用版的cache方法,通過傳參的方法告知,緩存級別、緩存在哪:
?
移除?
Spark只有在必要時才會從緩存占用的內存中移除老的RDD分區,可以調用RDD提供的unpersist方法手動移除RDD分區
KeyValue對RDDs
創建?
使用map()函數,返回key/value對?
例如,包含數行數據的RDD,把每行數據的第一個單詞作為keys,整行作為Value
val rdd = sc.textFile("/home/haha/helloSpark.txt")
val rdd2 = rdd.map(line=>(line.split("")(0),line)) // line=>(key,value) key值:用空格分割整行,第0個數據即為第一個單詞;value值:整行
//打印出:
(hello,hello spark)
(hello,hello spark !)
//rdd2則為KeyValue對RDD
1
2
3
4
5
6
7
8
//一般在測試中手動創建KeyValue對RDDs方法:
val rdd3 = sc.parallelize(Array((1,2),(3,4),(3,6)))
1
2
3
4
Transformation操作
?
?
combineByKey():?
參數有:createCombiner,mergeValue,mergeCombiners,partitioner
此函數是基于key的聚合函數中最常使用的,返回的類型可以與輸入類型不一樣?
許多基于key的聚合函數都用到了它,像groupByKey()
實現過程:?
遍歷partition中的元素,以元素的key作為判斷依據?
如果遍歷到的是第一次出現的key,則該元素視為新元素,使用提供的createCombiner()函數初始化?
如果遍歷到的是一個partition中已經存在的key,使用mergeValue函數merge(整合)?
合計每個partition結果時,使用mergeCombiners()函數,對所有分區處理結果進行整合
//已知Jack和Mike兩人的語文、數學、英語成績,求各自總分、平均分
val sorces = sc.parallelize(Array(("jack",80.0),("jack",90.0),("jack",85.0),("mike",85.0),("mike",92.0),("mike",94.0))
val score2 = score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))?
//createCombiner():score->value值,1->計數作用;
//mergeValue():聲明key/value形式的變量c1,Int->科目數,Double->科目成績累加值,newScore->遍歷到的新的分數,c1._1->c1的key值,c1._2是c1的value值(再次掃到某人,科目加1,成績累加)
//createCombiners():c1->某個分區最終值,c2->另一個分區的最終值,分區的科目、成績分別對應相加
score2.foreach(println)
//打印出:
(jack,(3,255.0))
(mike,(3,267.0))
val average = score2.map{case(name,(num,score))=>(name,score/num)} //case:判斷傳過來的數據類型是否匹配
average.foreach(println)
//打印出:
(mike,89.0)
(jake,85.0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Action操作?
(以鍵值對集合{(1,2),(3,4),(3,6)}為例)
函數 描述 示例 結果
countByKey() 對每個鍵對應的元素分別計數 rdd.countBykey() {(1,1),(3,2)}
CollectAsMap() 將結果以映射表的形式返回,以便查詢 rdd.collectAsMap() Map{(1,2),(3,6)}
lookup(key) 返回給定鍵對應的所有值 rdd.lookup(3) [4,6]
對于CollectAsMap()的說明:如果RDD中同一個Key中存在多個Value,那么后面的Value將會把前面的Value覆蓋,最終得到的結果就是Key唯一,而且對應一個Value
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。