您好,登錄后才能下訂單哦!
本篇內容主要講解“flink1.2版本時間、水位線的介紹和用法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink1.2版本時間、水位線的介紹和用法”吧!
水位線是flink的一種處理延時數據的機制,主要對設定時間內延時數據的自動容錯,水位線的本質是時間戳,計算公式為:當前事件最大時間值 - 數據延時時間。(看了幾遍有點懵)
個人理解:
水位線是收到數據邏輯時間便簽,是處理延時數據的基礎,通過與數據自帶的生成時間Timestamps,實現延遲數據矯正。
理想狀態下的水位線,即數據元素的事件事件是有序的,Watermark時間戳會隨著數據元素的事件時間安裝順序生成,此時,水位線時間和時間時間保持一致。
現實情況數據元素往往并不按照其生產順序接入Flink,而頻繁處理亂序或遲到情況,這時候需要watermark來處理,當事件8和事件11同時進入系統,flink系統將根據設定延時值分別計算它們的watermark,兩個事件到達一個operator中后,匹配事件時間的虛擬時間與watermark匹配,觸發響應的計算。
Watermark在Source Operator中生成,且在每個Operator的子Task中獨立生成。
如果一個watermark同時更新一個算子Task的當前事件時間,Flink會選擇最小的水位線進行更新。當一個Window算子Task中水位線大于Window結束時間,立即觸發窗口計算。
流式處理中最大的特點是數據上具有時間的屬性特征,Flink根據時間產生的位置不同,將時間分為三種概念:事件生成時間(Event Time)、事件接入時間(Ingestion TIme)、事件處理時間(Processing Time)。
事件生成時間:數據從終端或系統中產生的過程消耗的時間。
數據接入時間:數據接入DataSource時的時間。
事件處理時間:處理過程中獲取的主機時間。
Timestamps和Watermark成對對存在,使用時,都要指定
watermark設定Flink中Watermark默認200ms生成一次,也可以手動指定,代碼如下:
// 1、創建flink運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); // 設置并行度 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //處理模式設定:流或批 // 生成 watermark 的時間間隔(每 n 毫秒),設置周期性的產生水位線的時間間隔。當數據流很大的時候,如果每個事件都產生水位線,會影響性能。 //env.getConfig().setAutoWatermarkInterval(1000); // 自動水印時間間隔 12版本不用設置,有默認
此處以滾動窗口為例,窗口知識下次分享,首先對數據進行機構化,數據結構:"yyyy-MM-dd HH:mm:ss|type|num",處理代碼如下:
SingleOutputStreamOperator<Tuple3<String,String, Integer>> formatData =text.map(new MapFunction<String, Tuple3<String, String, Integer>>() { // 數據格式轉換 private static final long serialVersionUID = 1L; @Override public Tuple3<String, String, Integer> map(String value) throws Exception { Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>(); String[] dataTmp = value.split("\\|"); data.f0 = dataTmp[0]; data.f1 = dataTmp[1]; data.f2 = Integer.parseInt(dataTmp[2]); return data; } });
設置Timestamps和最大時延
SingleOutputStreamOperator<Tuple3<String,String, Integer>> orderDSWithWatemark=formatData .assignTimestampsAndWatermarks( // 設置watermark watemark = 最大事件時間 - 最大延遲或亂序時間 WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大無序度時間即最大延遲時間/亂序時間 .withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000) //時間為毫秒級 );
設定窗口大小和處理邏輯
SingleOutputStreamOperator<Tuple3<String,String, Integer>> result=orderDSWithWatemark.keyBy(one -> one.f1) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 設定窗口大小 // .allowedLateness(Time.seconds(1)) //延時處理時間 // .sideOutputLateData(lateOutputTag) //側輸出 .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { // 處理邏輯 private static final long serialVersionUID = -6695049408336015245L; @Override public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception { Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>(); data.f0 = value2.f0; data.f1 = value1.f1; data.f2 = value1.f2 + value2.f2; System.out.println(data); return data; } }); result.print("滾動事件時間"); env.execute();
時間和水位線是flink中比較難理解且重要的概念,我也是一知半解,在使用的過程中再慢慢深化,基本邏輯是針對數據建立自己的時間標簽,并通過時間范圍(窗口)和數據延遲完成事件內數據的匯集、計算和輸出,以此,完成更精確的實時事件數據計算。
技術是需求的一種呈現,基礎本質相互交疊,編程語言、技術框架都是,最重要的細微處的優化和整體的使用的簡便,功能的穩定和強大。
到此,相信大家對“flink1.2版本時間、水位線的介紹和用法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。