您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink水印延遲與窗口允許延遲的概念是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink水印延遲與窗口允許延遲的概念是什么”吧!
link 在開窗處理事件時間(Event Time) 數據時,可設置水印延遲以及設置窗口允許延遲(allowedLateness)以保證數據的完整性。這兩者因都是設置延遲時間所以剛接觸時容易混淆。本文接下將展開討論分析“水印延遲”與“窗口允許延遲”概念及區別。
水印延遲(WaterMark)
(1) 水印
由于采用了事件時間,脫離了物理掛鐘。窗口不知道什么時候需要關閉并進行計算,這個時候需要借助水印來解決該問題。當窗口遇到水位標識時就默認是窗口時間段內的數據都到齊了,可以觸發窗口計算。
(2) 水印延遲
設置水印延遲時間的目的是讓水印延遲到達,從而可以解決亂序問題。通過水印延遲到達讓在延遲時間范圍內到達的遲到數據可以加入到窗口計算中,保證了數據的完整性。當水印到達后就會觸發窗口計算,在水印之后到達的遲到數據則會被丟棄。
窗口允許延遲(allowedLateness)
使用 StreamAPI 時,在進行開窗后可設置 allowedLateness 窗口延遲。官網中對其解釋如下:
默認情況下,當水印到達窗口末端時,遲到元素將會被刪除。但Flink允許為window operators指定允許的最大延遲。允許延遲指定元素在被刪除之前延遲的時間,默認值為0。當元素在水印經過窗口末端后到達,且它的到達時間在窗口末端加上運行延遲的時間之內,其仍會被添加到窗口中。根據所使用的觸發器,延遲但未被丟棄的元素可能會再次觸發窗口計算。EventTimeTrigger就是這種情況。為了做到這一點,Flink保持窗口的狀態,直到它們允許的延遲到期。一旦發生這種情況,Flink將刪除窗口并刪除其狀態,正如窗口生命周期部分中所描述的那樣。 |
簡單理解:通常在水印到達之后遲到數據將會被刪除,而窗口的延遲則是指數據在被刪除之前的允許保留時間。也就是說,在水印達到之后遲到數據本該被刪除,但是如果設置了窗口延遲,那么在水印之后到窗口延遲時間段內到達的遲到數據還是會被加入到窗口計算中,并再次觸發窗口計算。
一個Demo 兩個猜想
下面我用一個 Demo 和兩個猜想來幫助大家加深理解這兩個概念。
例子:接收 Kafka 數據,數據為 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我們開一個 5 秒的 tumbling windows 滾動窗口,以 word 作為 key 在窗口內對 count 值進行累加。同時設置水印延遲 2 秒,窗口延遲 2 秒。代碼如下:
public class MyExample { public static void main(String[] args) throws Exception { // 創建環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 設置時間特性為 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 水印策略,其需要注入Timestamp Assigner(描述了如何訪問事件時間戳)和 Watermark Generator (事件流顯示的超出正常范圍的程度) WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy // forBoundedOutOfOrderness 屬于(periodic周期性),周期生成器通常通過onEvent()觀察傳入的事件,然后在框架調用onPeriodicEmit()時發出水印。 .<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner<WC>() { @Override public long extractTimestamp(WC wc, long l) { return wc.getEventTime() * 1000; } }); // Kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "Kafka地址:9092"); properties.setProperty("group.id", "test"); // Flink 需要知道如何轉換Kafka消息為Java對象(反序列化),默認提供了 KafkaDeserializationSchema(序列化需要自己編寫)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest()) // map 構建 WC 對象 .map(new MapFunction<ObjectNode, WC>() { @Override public WC map(ObjectNode jsonNode) throws Exception { JsonNode valueNode = jsonNode.get("value"); WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong()); return wc; } }) // 設定水印策略 .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(WC::getWord) // 窗口設置,這里設置為滾動窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 設置窗口延遲 .allowedLateness(Time.seconds(2)) .reduce(new ReduceFunction<WC>() { @Override public WC reduce(WC wc, WC t1) throws Exception { return new WC(wc.getWord(), wc.getCount() + t1.getCount()); } }) .print(); env.execute(); } static class WC { public String word; public int count; public long eventTime; public long getEventTime() { return eventTime; } public void setEventTime(long eventTime) { this.eventTime = eventTime; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public WC(String word, int count) { this.word = word; this.count = count; } public WC(String word, int count,long eventTime) { this.word = word; this.count = count; this.eventTime = eventTime; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
猜想1:
水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數據全部到齊,并觸發窗口計算。
// 往 Kafka 中寫入數據 {"word":"a","count":1,"time":1604286560} //2020-11-02 11:09:20 {"word":"a","count":1,"time":1604286561} //2020-11-02 11:09:21 {"word":"a","count":1,"time":1604286562} //2020-11-02 11:09:22 {"word":"a","count":1,"time":1604286566} //2020-11-02 11:09:26 {"word":"a","count":1,"time":1604286567} //2020-11-02 11:09:27 (觸發了窗口計算)
控制臺輸出
分析:通過測試發現最后在第 7s 也就是 11:09:27 時觸發了窗口計算,這符合了我們的猜想一。水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數據全部到齊,并觸發窗口計算。計算結果為3,這是因為只有最前面的3條數據屬于 [0,5) 窗口計算范圍之內。
猜想2:
設置了窗口延遲2秒,那么只要在水印之后到窗口允許延遲的時間范圍內達到且屬于 [ 0,5) 窗口的遲到數據會被加入到窗口中,且再次觸發窗口運算:
// 繼續往 Kafka 中寫入數據 {"word":"a","count":1,"time":1604286568} //2020-11-02 11:09:28 時間到達了第 8 秒 {"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模擬一個在水印之后、在窗口允許延遲范圍內、且屬于[0,5) 窗口的遲到數據,該數據還是會觸發并參與到[0,5) 窗口的計算
控制臺輸出新增了一行
// 我們再繼續往 Kafka 中寫入數據 {"word":"a","count":1,"time":1604286569} //2020-11-02 11:09:29 時間到達第9秒 {"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模擬一個在水印之后且超出窗口允許延遲范圍、且屬于[0,5) 窗口的遲到數據,該數據不會參與和觸發[0,5)窗口計算
查看控制臺并沒有發現新的輸出打印。
解析:水印因延遲在第 7s 到達之后會觸發[0,5) 窗口計算,如果沒有設置窗口延遲的情況下,水印之后遲到且屬于 [0,5) 窗口的數據會被丟棄。上面我們實驗設置窗口延遲 2s,實現的效果就是在水印之后,窗口允許延遲時間之內(7 + 2 = 9s 之間),遲到且屬于 [0,5) 窗口的數據還是會觸發一次窗口計算,并參與到窗口計算中。而在 9s 之后,也就是超過窗口允許延時時間,那么遲到且屬于[0,5)的數據就會被丟棄。
感謝各位的閱讀,以上就是“Flink水印延遲與窗口允許延遲的概念是什么”的內容了,經過本文的學習后,相信大家對Flink水印延遲與窗口允許延遲的概念是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。