91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何利用Scala語言開發Spark應用程序

發布時間:2021-09-15 18:54:42 來源:億速云 閱讀:186 作者:小新 欄目:web開發

這篇文章主要介紹如何利用Scala語言開發Spark應用程序,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

Spark內核是由Scala語言開發的,因此使用Scala語言開發Spark應用程序是自然而然的事情。如果你對Scala語言還不太熟悉,可以閱讀網絡教程A Scala Tutorial for Java Programmers或者相關Scala書籍進行學習。

本文將介紹3個Scala Spark編程實例,分別是WordCount、TopK和SparkJoin,分別代表了Spark的三種典型應用。

1. WordCount編程實例

WordCount是一個最簡單的分布式應用實例,主要功能是統計輸入目錄中所有單詞出現的總次數,編寫步驟如下:

步驟1:創建一個SparkContext對象,該對象有四個參數:Spark  master位置、應用程序名稱,Spark安裝目錄和jar存放位置,對于Spark On  YARN而言,最重要的是前兩個參數,***個參數指定為“yarn-standalone”,第二個參數是自定義的字符串,舉例如下:

val sc = new SparkContext(args(0), "WordCount",     System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

步驟2:讀取輸入數據。我們要從HDFS上讀取文本數據,可以使用SparkCon

val textFile = sc.textFile(args(1))

當然,Spark允許你采用任何Hadoop InputFormat,比如二進制輸入格式SequenceFileInputFormat,此時你可以使用SparkContext中的hadoopRDD函數,舉例如下:

val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]] var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])

或者直接創建一個HadoopRDD對象:

var hadoopRdd = new HadoopRDD(sc, conf,      classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])

步驟3:通過RDD轉換算子操作和轉換RDD,對于WordCount而言,首先需要從輸入數據中每行字符串中解析出單詞,然后將相同單詞放到一個桶中,***統計每個桶中每個單詞出現的頻率,舉例如下:

val result = hadoopRdd.flatMap{         case(key, value)  => value.toString().split("\\s+"); }.map(word => (word, 1)). reduceByKey (_ + _)

其中,flatMap函數可以將一條記錄轉換成多條記錄(一對多關系),map函數將一條記錄轉換為另一條記錄(一對一關系),reduceByKey函數將key相同的數據劃分到一個桶中,并以key為單位分組進行計算,這些函數的具體含義可參考:Spark Transformation。

步驟4:將產生的RDD數據集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈數將數據集保存到HDFS目 錄下,默認采用Hadoop提供的TextOutputFormat,每條記錄以“(key,value)”的形式打印輸出,你也可以采用 saveAsSequenceFile函數將數據保存為SequenceFile格式等,舉例如下:

result.saveAsSequenceFile(args(2))

當然,一般我們寫Spark程序時,需要包含以下兩個頭文件:

import org.apache.spark._ import SparkContext._

WordCount完整程序已在“Apache Spark學習:利用Eclipse構建Spark集成開發環境”一文中進行了介紹,在次不贅述。

需要注意的是,指定輸入輸出文件時,需要指定hdfs的URI,比如輸入目錄是hdfs://hadoop-test/tmp/input,輸出目 錄是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core- site.xml中參數fs.default.name指定的,具體替換成你的配置即可。

2. TopK編程實例

TopK程序的任務是對一堆文本進行詞頻統計,并返回出現頻率***的K個詞。如果采用MapReduce實現,則需要編寫兩個作 業:WordCount和TopK,而使用Spark則只需一個作業,其中WordCount部分已由前面實現了,接下來順著前面的實現,找到Top  K個詞。注意,本文的實現并不是***的,有很大改進空間。

步驟1:首先需要對所有詞按照詞頻排序,如下:

val sorted = result.map {   case(key, value) => (value, key); //exchange key and value }.sortByKey(true, 1)

步驟2:返回前K個:

val topK = sorted.top(args(3).toInt)

步驟3:將K各詞打印出來:

topK.foreach(println)

注意,對于應用程序標準輸出的內容,YARN將保存到Container的stdout日志中。在YARN中,每個Container存在三個日志 文件,分別是stdout、stderr和syslog,前兩個保存的是標準輸出產生的內容,第三個保存的是log4j打印的日志,通常只有第三個日志中 有內容。

本程序完整代碼、編譯好的jar包和運行腳本可以從這里下載。下載之后,按照“Apache Spark學習:利用Eclipse構建Spark集成開發環境”一文操作流程運行即可。

3. SparkJoin編程實例

在推薦領域有一個著名的開放測試集是movielens給的,下載鏈接是:http://grouplens.org/datasets/movielens/,該測試集包含三個文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt,本節給出的SparkJoin實例則通過連接ratings.dat和movies.dat兩個文件得到平均得分超過4.0的電影列表,采用的數據集是:ml-1m。程序代碼如下:

import org.apache.spark._ import SparkContext._ object SparkJoin {   def main(args: Array[String]) {     if (args.length != 4 ){       println("usage is org.test.WordCount <master> <rating> <movie> <output>")       return     }     val sc = new SparkContext(args(0), "WordCount",     System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))       // Read rating from HDFS file     val textFile = sc.textFile(args(1))       //extract (movieid, rating)     val rating = textFile.map(line => {         val fileds = line.split("::")         (fileds(1).toInt, fileds(2).toDouble)        })       val movieScores = rating        .groupByKey()        .map(data => {          val avg = data._2.sum / data._2.size          (data._1, avg)        })        // Read movie from HDFS file      val movies = sc.textFile(args(2))      val movieskey = movies.map(line => {        val fileds = line.split("::")         (fileds(0).toInt, fileds(1))      }).keyBy(tup => tup._1)        // by join, we get <movie, averageRating, movieName>      val result = movieScores        .keyBy(tup => tup._1)        .join(movieskey)        .filter(f => f._2._1._2 > 4.0)        .map(f => (f._1, f._2._1._2, f._2._2._2))       result.saveAsTextFile(args(3))   } }

以上是“如何利用Scala語言開發Spark應用程序”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

沛县| 杭州市| 高密市| 安泽县| 崇信县| 舟山市| 巴林右旗| 静海县| 无极县| 桃园县| 漯河市| 大同市| 南江县| 张掖市| 武陟县| 四会市| 崇明县| 陵川县| 嵊泗县| 德保县| 贵阳市| 唐山市| 调兵山市| 勃利县| 新绛县| 塔城市| 延长县| 沾益县| 会昌县| 锡林郭勒盟| 枣强县| 三都| 广昌县| 高雄市| 鹰潭市| 富平县| 婺源县| 冕宁县| 滦平县| 华安县| 疏附县|