您好,登錄后才能下訂單哦!
本節主要內容
Spark重要概念
彈性分布式數據集(RDD)基礎
1. Spark重要概念
本節部分內容源自官方文檔:http://spark.apache.org/docs/latest/cluster-overview.html
(1)Spark運行模式
目前最為常用的Spark運行模式有:?
- local:本地線程方式運行,主要用于開發調試Spark應用程序?
- Standalone:利用Spark自帶的資源管理與調度器運行Spark集群,采用Master/Slave結構,為解決單點故障,可以采用ZooKeeper實現高可靠(High Availability,HA)?
- Apache Mesos :運行在著名的Mesos資源管理框架基礎之上,該集群運行模式將資源管理交給Mesos,Spark只負責進行任務調度和計算?
- Hadoop YARN : 集群運行在Yarn資源管理器上,資源管理交給Yarn,Spark只負責進行任務調度和計算?
Spark運行模式中Hadoop YARN的集群運行方式最為常用,本課程中的第一節便是采用Hadoop YARN的方式進行Spark集群搭建。如此Spark便與Hadoop生態圈完美搭配,組成強大的集群,可謂無所不能。
(2)Spark組件(Components)
一個完整的Spark應用程序,如前一節當中SparkWordCount程序,在提交集群運行時,它涉及到如下圖所示的組件:?
各Spark應用程序以相互獨立的進程集合運行于集群之上,由SparkContext對象進行協調,SparkContext對象可以視為Spark應用程序的入口,被稱為driver program,SparkContext可以與不同種類的集群資源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 進行通信,從而分配到程序運行所需的資源,獲取到集群運行所需的資源后,SparkContext將得到集群中其它工作節點(Worker Node) 上對應的Executors (不同的Spark應用程序有不同的Executor,它們之間也是獨立的進程,Executor為應用程序提供分布式計算及數據存儲功能),之后SparkContext將應用程序代碼分發到各Executors,最后將任務(Task)分配給executors執行。
Term(術語) Meaning(解釋)
Application(Spark應用程序) 運行于Spark上的用戶程序,由集群上的一個driver program(包含SparkContext對象)和多個executor線程組成
Application jar(Spark應用程序JAR包) Jar包中包含了用戶Spark應用程序,如果Jar包要提交到集群中運行,不需要將其它的Spark依賴包打包進行,在運行時
Driver program 包含main方法的程序,負責創建SparkContext對象
Cluster manager 集群資源管理器,例如Mesos,Hadoop Yarn
Deploy mode 部署模式,用于區別driver program的運行方式:集群模式(cluter mode),driver在集群內部啟動;客戶端模式(client mode),driver進程從集群外部啟動
Worker node 工作節點, 集群中可以運行Spark應用程序的節點
Executor Worker node上的進程,該進程用于執行具體的Spark應用程序任務,負責任務間的數據維護(數據在內存中或磁盤上)。不同的Spark應用程序有不同的Executor
Task 運行于Executor中的任務單元,Spark應用程序最終被劃分為經過優化后的多個任務的集合(在下一節中將詳細闡述)
Job 由多個任務構建的并行計算任務,具體為Spark中的action操作,如collect,save等)
Stage 每個job將被拆分為更小的task集合,這些任務集合被稱為stage,各stage相互獨立(類似于MapReduce中的map stage和reduce stage),由于它由多個task集合構成,因此也稱為TaskSet
2. 彈性分布式數據集(RDD)基礎
彈性分布式數據集(RDD,Resilient Distributed Datasets),由Berkeley實驗室于2011年提出,原始論文名字:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 原始論文非常值得一讀,是研究RDD的一手資料,本節內容大部分將基于該論文。
(1)RDD設計目標
RDD用于支持在并行計算時能夠高效地利用中間結果,支持更簡單的編程模型,同時也具有像MapReduce等并行計算框架的高容錯性、能夠高效地進行調度及可擴展性。RDD的容錯通過記錄RDD轉換操作的lineage關系來進行,lineage記錄了RDD的家族關系,當出現錯誤的時候,直接通過lineage進行恢復。RDD最合數據挖掘, 機器學習及圖計算,因此這些應用涉及到大家的迭代計算,基于內存能夠極大地提升其在分布式環境下的執行效率;RDD不適用于諸如分布式爬蟲等需要頻繁更新共享狀態的任務。
下面給出的是在spark-shell中如何查看RDD的Lineage
//textFile讀取hdfs根目錄下的README.md文件,然后篩選出所有包括Spark的行
scala> val rdd2=sc.textFile("/README.md").filter(line => line.contains("Spark"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:21
//toDebugString方法會打印出RDD的家族關系
//可以看到textFile方法會生成兩個RDD,分別是HadoopRDD
//MapPartitionsRDD,而filter同時也會生成新的MapPartitionsRDD
scala> rdd2.toDebugString
15/09/20 01:35:27 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String =?
(2) MapPartitionsRDD[2] at filter at <console>:21 []
?|? MapPartitionsRDD[1] at textFile at <console>:21 []
?|? /README.md HadoopRDD[0] at textFile at <console>:21 []
1
2
3
4
5
6
7
8
9
10
11
12
(2)RDD抽象
RDD在Spark中是一個只讀的(val類型)、經過分區的記錄集合。RDD在Spark中只有兩種創建方式:(1)從存儲系統中創建;(2)從其它RDD中創建。從存儲中創建有多種方式,可以是本地文件系統,也可以是分布式文件系統,還可以是內存中的數據。?
下面的代碼演示的是從HDFS中創建RDD
scala> sc.textFile("/README.md")
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at textFile at <console>:22
1
2
下面的代碼演示的是從內存中創建RDD
//內存中定義了一個數組
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
//通過parallelize方法創建ParallelCollectionRDD
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:23
1
2
3
4
5
6
7
下面的代碼演示的是從其它RDD創建新的RDD
//filter函數將distData RDD轉換成新的RDD
scala> val distDataFiletered=distData.filter(e=>e>2)
distDataFiletered: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at filter at <console>:25
//觸發action操作(后面我們會講),查看過濾后的內容
//注意collect只適合數據量較少時使用
scala> distDataFiltered.collect
res3: Array[Int] = Array(3, 4, 5)
1
2
3
4
5
6
7
8
(3)RDD編程模型
在前面的例子中,我們已經接觸過到如何利用RDD進行編程,前面我們提到的
//filter函數將distData RDD轉換成新的RDD
scala> val distDataFiletered=distData.filter(e=>e>2)
//觸發action操作(后面我們會講),查看過濾后的內容
//注意collect只適合數據量較少時使用
scala> distDataFiltered.collect
1
2
3
4
5
這段代碼它已經給我們解釋了RDD編程模型的核心思想:“filter函數將distData RDD轉換成新的RDD”,“觸發action操作”。也就是說RDD的操作包括Transformations(轉換)、Actions兩種。
transformations操作會將一個RDD轉換成一個新的RDD,需要特別注意的是所有的transformation都是lazy的,如果對scala中的lazy了解的人都知道,transformation之后它不會立馬執行,而只是會記住對相應數據集的transformation,而到真正被使用的時候才會執行,例如distData.filter(e=>e>2) transformation后,它不會立即執行,而是等到distDataFiltered.collect方法執行時才被執行,如下圖所示?
?
從上圖可以看到,在distDataFiltered.collect方法執行之后,才會觸發最終的transformation執行。
從transformation的介紹中我們知道,action是解決程序最終執行的誘因,action操作會返回程序執行結果如collect操作或將運行結果保存,例如SparkWordCount中的saveAsTextFile方法。
Spark 1.5.0支持的transformation包括:
(1)map?
map函數方法參數:
/**
? ?* Return a new RDD by applying a function to all elements of this RDD.
? ?*/
? def map[U: ClassTag](f: T => U): RDD[U]
1
2
3
4
//使用示例
scala> val rdd1=sc.parallelize(Array(1,2,3,4)).map(x=>2*x).collect
rdd1: Array[Int] = Array(2, 4, 6, 8)
1
2
(2)filter?
方法參數:
/**
? ?* Return a new RDD containing only the elements that satisfy a predicate.
? ?*/
? def filter(f: T => Boolean): RDD[T]
1
2
3
4
使用示例
scala> val rdd1=sc.parallelize(Array(1,2,3,4)).filter(x=>x>1).collect
rdd1: Array[Int] = Array(2, 3, 4)
1
2
3
(3)flatMap?
方法參數:
/**
? ?*? Return a new RDD by first applying a function to all elements of this
? ?*? RDD, and then flattening the results.
? ?*/
? def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]?
1
2
3
4
5
使用示例:
scala>? val data =Array(Array(1, 2, 3, 4, 5),Array(4,5,6))
data: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(4, 5, 6))
scala> val rdd1=sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> val rdd2=rdd1.flatMap(x=>x.map(y=>y))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25
scala> rdd2.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
(4)mapPartitions(func)?
本mapPartitions例子來源于:https://www.zybuluo.com/jewes/note/35032?
mapPartitions是map的一個變種。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是應用于每個分區,也就是把每個分區中的內容作為整體來處理的。它的函數定義為:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即為輸入函數,它處理每個分區里面的內容。每個分區中的內容將以Iterator[T]傳遞給輸入函數f,f的輸出結果是Iterator[U]。最終的RDD由所有分區經過輸入函數處理后的結果合并起來的。
scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
? ? var res = List[(T, T)]()?
? ? var pre = iter.next?
? ? while (iter.hasNext) {
? ? ? ? val cur = iter.next;?
? ? ? ? res .::= (pre, cur)
? ? ? ? pre = cur;
? ? }?
? ? res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
1
2
3
4
5
6
7
8
9
10
11
12
13
上述例子中的函數myfunc是把分區中一個元素和它的下一個元素組成一個Tuple。因為分區中最后一個元素沒有下一個元素了,所以(3,4)和(6,7)不在結果中。?
mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數。還有mapPartitionsWithIndex,它能把分區的index傳遞給用戶指定的輸入函數。
(5)mapPartitionsWithIndex
mapPartitionsWithIndex函數是mapPartitions函數的一個變種,它的函數參數如下:
def mapPartitionsWithIndex[U: ClassTag](?
f: (Int, Iterator[T]) => Iterator[U],?
preservesPartitioning: Boolean = false): RDD[U]
```
scala> val a = sc.parallelize(1 to 9, 3)
//函數帶分區索引,返回的集合第一個元素為分區索引
scala> def myfunc[T](index:T,iter: Iterator[T]) : Iterator[(T,T,T)] = {
? ? var res = List[(T,T, T)]()?
? ? var pre = iter.next?
? ? while (iter.hasNext) {
? ? ? ? val cur = iter.next
? ? ? ? res .::= (index,pre, cur)?
? ? ? ? pre = cur
? ? }?
? ? res.iterator
}
scala> a.mapPartitionsWithIndex(myfunc).collect
res11: Array[(Int, Int, Int)] = Array((0,2,3), (0,1,2), (1,5,6), (1,4,5), (2,8,9), (2,7,8))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(6)sample?
方法參數:
?/**
? ?* Return a sampled subset of this RDD.
? ?*
? ?* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
? ?* @param fraction expected size of the sample as a fraction of this RDD's size
? ?*? without replacement: probability that each element is chosen; fraction must be [0, 1]
? ?*? with replacement: expected number of times each element is chosen; fraction must be >= 0
? ?* @param seed seed for the random number generator
? ?*/
? def sample(
? ? ? withReplacement: Boolean,
? ? ? fraction: Double,
? ? ? seed: Long = Utils.random.nextLong): RDD[T]?
1
2
3
4
5
6
7
8
9
10
11
12
13
使用示例:
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21
scala> val smapledA=a.sample(true,0.5)
smapledA: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[13] at sample at <console>:23
scala> smapledA.collect
res12: Array[Int] = Array(3, 3, 3, 5, 6, 8, 8)
scala> val smapledA2=a.sample(false,0.5).collect
smapledA2: Array[Int] = Array(1, 4)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。