91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

6.spark core之鍵值對操作

發布時間:2020-07-12 23:33:29 來源:網絡 閱讀:368 作者:菲立思教育 欄目:大數據

??鍵值對RDD(pair RDD)是spark中許多操作所需要的常見數據類型,通常用來進行聚合計算。

創建Pair RDD

??spark有多種方式可以創建pair RDD。比如:很多存儲鍵值對的數據格式在讀取時直接返回pair RDD;通過map()算子將普通的RDD轉為pair RDD。

scala

# 使用第一個單詞作為鍵創建一個pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

java

# 使用第一個單詞作為鍵創建一個pair RDD
# jdk1.8后也支持lambda表達式方式
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

python

# 使用第一個單詞作為鍵創建一個pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0], x))

??從一個內存中的數據集創建pair RDD時,scala和python只需要對這個二元組集合調用SparkContext的parallelize()方法即可;而java需要使用SparkContext.parallelizePairs()方法。

pair RDD轉化操作

轉化操作總覽

針對單個Pair RDD的轉化操作
函數名 作用 示例
reduceByKey(func) 合并具有相同鍵的值 rdd.reduceByKey((x, y) => x + y)
groupByKey() 對具有相同鍵的值進行分組 rdd.groupByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回類型合并具有相同鍵的值 rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
mapValues(func) 對pair RDD中的每個值應用一個函數而不改變鍵 rdd.mapValues(x => x + 1)
flatMapValues(func) 對pair RDD中的每個值應用一個返回迭代器的函數,生成對應原鍵的鍵值對記錄 rdd.flatMapValues(x => (x to 5))
keys() 返回一個僅包含鍵的RDD rdd.keys
values() 返回一個僅包含值得RDD rdd.values
sortByKey() 返回一個根據鍵排序的RDD rdd.sortByKey()
針對兩個Pair RDD的轉化操作
函數名 作用 示例
subtractByKey 刪除RDD中鍵與other RDD中鍵相同的元素 rdd.subtractByKey(other)
join 對兩個RDD進行內連接 rdd.join(other)
leftOuterJoin 對兩個RDD進行連接操作,確保第二個RDD的鍵必須存在(左外連接) rdd.leftOuterJoin(other)
rightOuterJoin 對兩個RDD進行連接操作,確保第一個RDD的鍵必須存在(右外連接) rdd.rightOuterJoin(other)
cogroup 將兩個RDD中擁有相同鍵的數據分組在一起 rdd.cogroup(other)

聚合

  • 使用mapValues()和reduceByKey()計算每個鍵對應值的均值。
scala
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
python
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  • 使用flatMap()、map()和reduceByKey()計算單詞統計
scala
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
java
JavaRDD<String> input = sc.textFile("s3://...");
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
   public Iterable<String> call(String x) {
        return Arrays.asList(x.split(" "));
   }
});
JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String x) {
    return new Tuple2(x, 1);
  }
}).reduceByKey(
    new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
)
python
rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
  • 使用combineByKey()返回與輸入數據不同類型的返回值,求每個鍵對應的平均值
    執行原理
    1.combineByKey()作用于rdd的每個分區。
    2.如果訪問的元素在分區中第一次出現,就使用createCombiner()方法創建那個鍵對應累加器的初始值。
    3.如果訪問的元素在當前分區已經出現過,就使用mergeValue()方法將該鍵的累加器對應的當前值和新值合并。
    4.如果有兩個或多個分區都有對應同一個鍵的累加器時,就使用mergeCombiners()方法將各個分區的結果進行合并。
scala
val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}
java
public static class AvgCount implements Serializable {
    public int total_;
    public int num_;
    public AvgCount(int total, int num) {
        total_ = total;
        num_ = num;
    }
    public float avg() {
        return total_/(float)num_;
    }
}

Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
    public AvgCount call(Integer x) {
        return new AvgCount(x, 1);
    }
};

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
        a.total_ += x;
        a.num_ += 1;
        return a;
    }
};

Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
    }
};

AvgCount initial = new AvgCount(0, 0);
JavaPairRDD<String, AvgCount> avgCounts = input.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
    System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
python
sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

分組

??對于單個RDD數據進行分組時,使用groupByKey()。如果先使用groupByKey(),再使用reduce()或fold()時,可能使用一種根據鍵進行聚合的函數更高效。比如,rdd.reduceByKey(func)與rdd.groupByKey().mapValues(value => value.reduce(func))等價,但前者更高效,因為避免了為每個鍵存放值列表的步驟。

??對多個共享同一個鍵的RDD進行分組時,使用cogroup()。cogroup方法會得到結果RDD類型為[(K, (Iterable[V], Iterable[W]))]。

連接

??將一組有鍵的數據與另一組有鍵的數據連接使用是對鍵值對數據執行的常用操作。連接方式主要有:內連接、左外連接、右外連接。

val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")))
val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
# 內連接
storeAddress.join(storeRating)
#左外連接
storeAddress.leftOuterJoin(storeRating)
#右外連接
storeAddress.rightOuterJoin(storeRating)

排序

??將數據排序輸出是很常見的場景。sortByKey()函數接收一個叫做ascending的參數,表示是否讓結果升序排序(默認true)。有時,也可以提供自定義比較函數。比如,以字符串順序對整數進行自定義排序。

scala
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
java
class IntegerComparator implements Comparator<Integer> {
    public int compare(Integer a, Integer b) {
        return String.valueOf(a).compareTo(String.valueOf(b))
    }
}
rdd.sortByKey(new IntegerComparator());
python
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))

Pair RDD行動操作

??和轉化操作一樣,所有基礎RDD支持的行動操作也都在pair RDD上可用。另外,Pair RDD提供了一些額外的行動操作。

函數 作用 示例
countByKey 對每個鍵對應的元素分別計數 rdd.countByKey()
collectAsMap 將結果以映射表的形式返回 rdd.collectAsMap()
lookup(key) 返回指定鍵對應的所有值 rdd.lookup(3)

忠于技術,熱愛分享。歡迎關注公眾號:java大數據編程,了解更多技術內容。

6.spark core之鍵值對操作

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

内黄县| 南阳市| 哈尔滨市| 漯河市| 鹤峰县| 新宁县| 衢州市| 深水埗区| 上高县| 河间市| 库尔勒市| 东乌珠穆沁旗| 汉源县| 河北省| 河南省| 赞皇县| 福州市| 裕民县| 基隆市| 徐水县| 昭觉县| 北流市| 永善县| 荣昌县| 班戈县| 兰州市| 青龙| 封开县| 夏邑县| 崇明县| 石楼县| 定边县| 和静县| 平果县| 文成县| SHOW| 郸城县| 红桥区| 句容市| 宾川县| 延庆县|