您好,登錄后才能下訂單哦!
小編給大家分享一下Spark RDD常用算子是什么類型的,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
Spark之所以比Hadoop靈活和強大,其中一個原因是Spark內置了許多有用的算子,也就是方法。通過對這些方法的組合,編程人員就可以寫出自己想要的功能。說白了spark編程就是對spark算子的使用,下面為大家詳細講解一下SparkValue類型的常用算子
map() 接收一個函數,該函數將RDD中的元素逐條進行映射轉換,可以是類型的轉換,也可以是值的轉換,將函數的返回結果作為結果RDD編程。
def map[U: ClassTag](f: T => U): RDD[U]
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //算子 -map val rdd = sc.makeRDD(List(1, 2, 3, 4),2) val mapRdd1 = rdd.map( _*2 ) mapRdd1.collect().foreach(println) sc.stop()
運行結果
2 4 6 8
將待處理的數據以分區
為單位發送到待計算節點上進行處理,mapPartition是對RDD的每一個分區的迭代器
進行操作,返回的是迭代器。這里的處理可以進行任意的處理。
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //算子 -mapPartitons 計算每個分區的最大數 val rdd = sc.makeRDD(List(1, 34, 36,345,2435,2342,62,35, 4),4) val mapParRdd = rdd.mapPartitions( iter => { List(iter.max).iterator } ) mapParRdd.foreach(println) sc.stop() }
運行結果:
62 2435 34 345
將待處理的數據以分區為單位發送到計算節點上,這里的處理可以進行任意的處理,哪怕是過濾數據,在處理的同時可以獲取當前分區的索引值。
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2) val mapRDD = rdd.flatMap(_.split(" ")) val mpwiRdd = mapRDD.mapPartitionsWithIndex( (index, datas) => { datas.map( num => { (index, num) } ) } ) mpwiRdd.collect().foreach(println) }
運行結果:
(0,Hello) (0,Spark) (1,Hello) (1,Scala) (1,Word) (1,Count)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2) val mapRDD = rdd.flatMap(_.split(" ")) val mpwiRdd = mapRDD.mapPartitionsWithIndex( (index, datas) => { if (index==0){ datas.map( num => { (index, num) } ) }else{ Nil.iterator } } ) mpwiRdd.collect().foreach(println)
運行結果:
(0,Hello) (0,Spark)
將數據進行扁平化之后在做映射處理,所以算子也稱為扁平化映射
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
將每個單詞進行扁平化映射
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //算子 -map val rdd = sc.makeRDD(List("Hello Scala","Hello Spark"), 2) val FltRdd = rdd.flatMap( _.split(" ") ) FltRdd.foreach(println) sc.stop() }
運行結果:
Hello Scala Hello Spark
glom的作用就是將一個分區的數據合并到一個array中。
def glom(): RDD[Array[T]]
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2) val glomRdd = rdd.glom() glomRdd.collect().foreach(data=>println(data.mkString(","))) sc.stop() }
運行結果:
1,2,3,4 5,6,7,8,9
將數據根據指定的規則進行分組,分區默認不變,單數數據會被打亂,我們成這樣的操作為shuffer,
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,10),2) val groupByRDD = rdd.groupBy(_ % 2 == 0) groupByRDD.collect().foreach(println) sc.stop() }
運行結果:
(false,CompactBuffer(1, 3, 5, 7)) (true,CompactBuffer(2, 4, 6, 8, 10))
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark")) val groupByRDD = rdd.groupBy(_.charAt(0)) groupByRDD.collect().foreach(println) sc.stop() }
運行結果:
(T,CompactBuffer(Tom, Timi)) (H,CompactBuffer(Hello)) (S,CompactBuffer(Scala, Spark))
filter即過濾器的意思,所以filter算子的作用就是過濾的作用。filter將根據指定的規則進行篩選過濾,符合條件的數據保留,不符合的數據丟棄,當數據進行篩選過濾之后,分區不變,但分區內的數據可能不均衡,生產環境下,可能會出現數據傾斜。
def filter(f: T => Boolean): RDD[T]
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265)) val filterRDD = rdd.filter(_ % 2 == 0) filterRDD.collect().foreach(println) sc.stop() }
運行結果:
46 246 2346 3276 234 6234 246 24 6246 26
2.篩選單詞中包含H的
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("rdd") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper")) val filterRDD = rdd.filter(_.contains("H")) filterRDD.collect().foreach(println) sc.stop() }
運行結果:
Hello Horber Hbeer ersfgH Hadoop
以上是“Spark RDD常用算子是什么類型的”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。