您好,登錄后才能下訂單哦!
這篇文章主要介紹“RDD行動操作方法是什么”,在日常操作中,相信很多人在RDD行動操作方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RDD行動操作方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
行動操作是真正觸發計算的地方。Spark程序執行到行動操作時,才會執行真正的計算,從文件中加載數據,完成一次又一次轉換操作,最終,完成行動操作得到結果。
操作 | 說明 |
count() | 返回數據集中的元素個數 |
collect() | 以數組的形式返回數據集中的所有元素 |
first() | 返回數據集中的第一個元素 |
take(n) | 以數組的形式返回數據集中的前n個元素 |
reduce(func) | 通過函數func(輸入兩個參數并返回一個值)聚合數據集中的元素 |
foreach(func) | 將數據集中的每個元素傳遞到函數func中運行 |
####惰性機制
在當前的spark目錄下面創建input目錄
cd $SPARK_HOMEmkdir inputvim word.txthello worldhello sparkhello hadoophello scala
由于textFile()方法只是一個轉換操作,因此,這行代碼執行后,不會立即把data.txt文件加載到內存中,這時的lines只是一個指向這個文件的指針。
scala> val lines = sc.textFile("word.txt")lines: org.apache.spark.rdd.RDD[String] = word.txt MapPartitionsRDD[13] at textFile at <console>:24
下面代碼用來計算每行的長度(即每行包含多少個單詞),同樣,由于map()方法只是一個轉換操作,這行代碼執行后,不會立即計算每行的長度。
scala> val lineLengths = lines.map(s=>s.length)lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:25
reduce()方法是一個“動作”類型的操作,這時,就會觸發真正的計算。這時,Spark會把計算分解成多個任務在不同的機器上執行,每臺機器運行位于屬于它自己的map和reduce,最后把結果返回給Driver Program。
scala> val totalLength = lineLengths.reduce((a,b)=> a+b)totalLength: Int = 45
####count
lines就是一個RDD。lines.filter()會遍歷lines中的每行文本,并對每行文本執行括號中的匿名函數,也就是執行Lamda表達式:line => line.contains(“spark”),在執行Lamda表達式時,會把當前遍歷到的這行文本內容賦值給參數line,然后,執行處理邏輯line.contains(“spark”),也就是只有當改行文本包含“spark”才滿足條件,才會被放入到結果集中。最后,等到lines集合遍歷結束后,就會得到一個結果集,這個結果集中包含了所有包含“Spark”的行。最后,對這個結果集調用count(),這是一個行動操作,會計算出結果集中的元素個數。
##持久化
在Spark中,RDD采用惰性求值的機制,每次遇到行動操作,都會從頭開始執行計算。如果整個Spark程序中只有一次行動操作,這當然不會有什么問題。但是,在一些情形下,我們需要多次調用不同的行動操作,這就意味著,每次調用行動操作,都會觸發一次從頭開始的計算。這對于迭代計算而言,代價是很大的,迭代計算經常需要多次重復使用同一組數據。
前后共觸發了兩次從頭到尾的計算。
實際上,可以通過持久化(緩存)機制避免這種重復計算的開銷。可以使用persist()方法對一個RDD標記為持久化,之所以說“標記為持久化”,是因為出現persist()語句的地方,并不會馬上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發真正計算以后,才會把計算結果進行持久化,持久化后的RDD將會被保留在計算節點的內存中被后面的行動操作重復使用。
persist()的圓括號中包含的是持久化級別參數,
persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲于JVM中,如果內存不足,就要按照LRU原則替換緩存中的內容。
persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中,如果內存不足,超出的分區將會被存放在硬盤上。
一般而言,使用cache()方法時,會調用persist(MEMORY_ONLY)。
可以使用unpersist()方法手動地把持久化的RDD從緩存中移除。
RDD是彈性分布式數據集,通常RDD很大,會被分成很多個分區,分別保存在不同的節點上。RDD分區的一個分區原則是使得分區的個數盡量等于集群中的CPU核心(core)數目。
對于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過設置spark.default.parallelism這個參數的值,來配置默認的分區數目,一般而言:
*本地模式:默認為本地機器的CPU數目,若設置了local[N],則默認為N;
*Apache Mesos:默認的分區數為8;
*Standalone或YARN:在“集群中所有CPU核心數目總和”和“2”二者中取較大值作為默認值;
因此,對于parallelize而言,如果沒有在方法中指定分區數,則默認為spark.default.parallelism,比如:
對于textFile而言,如果沒有在方法中指定分區數,則默認為min(defaultParallelism,2),其中,defaultParallelism對應的就是spark.default.parallelism。
如果是從HDFS中讀取文件,則分區數為文件分片數(比如,128MB/片)。
到此,關于“RDD行動操作方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。