您好,登錄后才能下訂單哦!
參考IntAccumulatorParam的實現思路(上述文章中有講):
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具體的實現類
// 如果想要實現自定義的話,就得實現這個方法
addInPlace(t1, t2)
}
}
自定義也可以通過這個方法去實現,從而兼容我們自定義的累加器
**
* 自定義的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于兩個集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定義的累加器,實現隨機數
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 構造一個k-v對
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 獲取到累加器的值 中的key值,并進行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
運行結果如下圖:## 思路 & 需求
參考IntAccumulatorParam的實現思路(上述文章中有講):
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具體的實現類
// 如果想要實現自定義的話,就得實現這個方法
addInPlace(t1, t2)
}
}
自定義也可以通過這個方法去實現,從而兼容我們自定義的累加器
**
* 自定義的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于兩個集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定義的累加器,實現隨機數
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 構造一個k-v對
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 獲取到累加器的值 中的key值,并進行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
運行結果如下圖:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。