您好,登錄后才能下訂單哦!
本篇內容主要講解“Flink Aggregate怎么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Flink Aggregate怎么用”吧!
Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中)
示例環境
java.version: 1.8.x flink.version: 1.11.1
Aggregate.java
import com.flink.examples.DataSource; import org.apache.flink.api.common.accumulators.AverageAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.List; /** * @Description Aggregate算子:提供基于事件窗口進行增量計算的函數。(對輸入窗口每個數據流元素遞增聚合計算,并將窗口狀態與窗口內元素保持在累加器中) */ public class Aggregate { /** * 遍歷集合,分別打印不同性別的總人數與平均值 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Tuple3<姓名,性別(man男,girl女),年齡> List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList(); DataStream<MyAverageAccumulator> dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1) //按數量窗口滾動,每3個輸入窗口數據流,計算一次 .countWindow(3) //只能基于Windowed窗口Stream進行調用 .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, MyAverageAccumulator, MyAverageAccumulator>() { /** * 創建新累加器,開始聚合計算 * @return */ @Override public MyAverageAccumulator createAccumulator() { return new MyAverageAccumulator(); } /** * 將窗口輸入的數據流值添加到窗口累加器,并返回新的累加器值 * @param tuple3 * @param accumulator * @return */ @Override public MyAverageAccumulator add(Tuple3<String, String, Integer> tuple3, MyAverageAccumulator accumulator) { System.out.println("tuple3:" + tuple3.toString()); accumulator.setGender(tuple3.f1); //此accumulator保含個數統計和值累計兩個屬性,add方法內會計算窗口內總數與求和 accumulator.add(tuple3.f2); return accumulator; } /** * 獲取累加器聚合結果 * @param accumulator * @return */ @Override public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) { return accumulator; } /** * 合并兩個累加器,返回合并后的累加器的狀態 * @param a * @param b * @return */ @Override public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) { a.merge(b); return a; } }); dataStream.print(); env.execute("flink Filter job"); } /** * 添加性別屬性(此類用于顯示不同性別的平均值) */ public static class MyAverageAccumulator extends AverageAccumulator{ private String gender; public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { //繼承父類的this.getLocalValue()方法用于計算并返回平均值 return super.toString() + ", gender to " + gender; } } }
打印結果
tuple3:(張三,man,20) tuple3:(李四,girl,24) tuple3:(劉六,girl,32) tuple3:(王五,man,29) tuple3:(伍七,girl,18) tuple3:(吳八,man,30) 4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl 2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man
到此,相信大家對“Flink Aggregate怎么用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。