91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink Aggregate怎么用

發布時間:2021-12-31 10:21:24 來源:億速云 閱讀:484 作者:iii 欄目:大數據

本篇內容主要講解“Flink  Aggregate怎么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Flink  Aggregate怎么用”吧!

Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中)

示例環境

java.version: 1.8.x
flink.version: 1.11.1

Aggregate.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;

/**
 * @Description Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中)
 */
public class Aggregate {

    /**
     * 遍歷集合,分別打印不同性別的總人數與平均值
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Tuple3<姓名,性別(man男,girl女),年齡>
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        DataStream<MyAverageAccumulator> dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //按數量窗口滾動,每3個輸入窗口數據流,計算一次
                .countWindow(3)
                //只能基于Windowed窗口Stream進行調用
                .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, MyAverageAccumulator, MyAverageAccumulator>() {
                    /**
                     * 創建新累加器,開始聚合計算
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator createAccumulator() {
                        return new MyAverageAccumulator();
                    }

                    /**
                     * 將窗口輸入的數據流值添加到窗口累加器,并返回新的累加器值
                     * @param tuple3
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator add(Tuple3<String, String, Integer> tuple3, MyAverageAccumulator accumulator) {
                        System.out.println("tuple3:" + tuple3.toString());
                        accumulator.setGender(tuple3.f1);
                        //此accumulator保含個數統計和值累計兩個屬性,add方法內會計算窗口內總數與求和
                        accumulator.add(tuple3.f2);
                        return accumulator;
                    }

                    /**
                     * 獲取累加器聚合結果
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) {
                        return accumulator;
                    }

                    /**
                     * 合并兩個累加器,返回合并后的累加器的狀態
                     * @param a
                     * @param b
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) {
                        a.merge(b);
                        return a;
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 添加性別屬性(此類用于顯示不同性別的平均值)
     */
    public static class MyAverageAccumulator extends AverageAccumulator{
        private String gender;
        public String getGender() {
            return gender;
        }
        public void setGender(String gender) {
            this.gender = gender;
        }
        @Override
        public String toString() {
            //繼承父類的this.getLocalValue()方法用于計算并返回平均值
            return super.toString() + ", gender to " + gender;
        }
    }

}

打印結果

tuple3:(張三,man,20)
tuple3:(李四,girl,24)
tuple3:(劉六,girl,32)
tuple3:(王五,man,29)
tuple3:(伍七,girl,18)
tuple3:(吳八,man,30)
4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl
2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man

到此,相信大家對“Flink  Aggregate怎么用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

板桥市| 车险| 鲁山县| 濉溪县| 陆良县| 镇巴县| 嵩明县| 玛沁县| 毕节市| 利津县| 沽源县| 曲靖市| 兴城市| 长岭县| 门头沟区| 白朗县| 卢湾区| 玉林市| 绥芬河市| 上林县| 高台县| 手游| 大厂| 托克逊县| 右玉县| 祁门县| 朝阳县| 丹阳市| 长宁区| 陈巴尔虎旗| 溧水县| 苏尼特右旗| 繁昌县| 隆尧县| 万山特区| 兴海县| 崇阳县| 沂源县| 大港区| 荣昌县| 睢宁县|