91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中怎么使用split

發布時間:2021-12-31 13:35:03 來源:億速云 閱讀:294 作者:iii 欄目:大數據

這篇文章主要講解了“Flink中怎么使用split”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink中怎么使用split”吧!

flink的神奇分流器-sideoutput

這個可以用來分流,很方便的一次就可以對數據進行篩選返回。

還有針對算法處理的迭代操作,我們已經講過兩篇文章了:

Flink特異的迭代操作-bulkIteration

不得不會的Flink Dataset的DeltaI迭代操作

一個是全量迭代,一個是增量迭代。

還有優秀又雞肋的watermark機制

不懂watermark?來吧~

對于迭代操作,其實還有一講,那就是流處理的迭代操作。那么本文就針對這個進行分析~

Flink的迭代流程序實際就是實現了一個步進函數,然后將其嵌入到IterativeStream內部。要知道Flink的Datastream正常情況下是不會結束的,所以也沒有所謂的最大迭代次數。這種情況下,你需要自己指定哪個類型的數據需要回流去繼續迭代,哪個類型的數據繼續向下傳輸,這個分流的方式有兩種:split和filter,官方網站在介紹迭代流的時候使用的是filter。我們這里就先按照官網的介紹走,然后案例展示的時候使用split給大家做個demo。

首先,要創建一個IterativeStream

IterativeStream<Integer> iteration =input.iterate();

接著就可以定義對該留要進行的邏輯操作,官網這里就很簡單的舉了一個map的例子。

DataStream<Integer> iterationBody =iteration.map(/* this is executed many times */);

調用IterativeStream的closeWith(feedbackStream)方法可以對迭代流進行閉環操作。傳遞給closeWith函數的DataStream會返回值迭代的頭部。常用的做法是用filter來分離流的向后迭代的部分和向前傳遞的部分。。

iteration.closeWith(iterationBody.filter(/*one part of the stream */));

DataStream<Integer> output =iterationBody.filter(/* some other part of the stream */);

官方給了一個連續不斷減1直到數據為零的例子:

DataStream<Long> someIntegers =env.generateSequence(0, 1000);

// 創建迭代流

IterativeStream<Long> iteration =someIntegers.iterate();

// 增加處理邏輯,對元素執行減一操作。

DataStream<Long> minusOne =iteration.map(new MapFunction<Long, Long>() {

@Override

public Long map(Long value) throws Exception {

  return value - 1 ;

 }

});

// 獲取要進行迭代的流,

DataStream<Long> stillGreaterThanZero= minusOne.filter(new FilterFunction<Long>() {

@Override

public boolean filter(Long value) throws Exception {

  return (value > 0);

 }

});

// 對需要迭代的流形成一個閉環

iteration.closeWith(stillGreaterThanZero);

// 小于等于0的數據繼續向前傳輸

DataStream<Long> lessThanZero =minusOne.filter(new FilterFunction<Long>() {

@Override

public boolean filter(Long value) throws Exception {

  return (value <= 0);

 }

});

感謝各位的閱讀,以上就是“Flink中怎么使用split”的內容了,經過本文的學習后,相信大家對Flink中怎么使用split這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

综艺| 福贡县| 栖霞市| 年辖:市辖区| 扎赉特旗| 封开县| 富平县| 红原县| 台北县| 晋州市| 麻阳| 武清区| 曲麻莱县| 日喀则市| 双鸭山市| 阿合奇县| 长宁县| 昭苏县| 黔江区| 辽阳市| 砚山县| 芜湖县| 保德县| 濮阳市| 民勤县| 信阳市| 晴隆县| 任丘市| 临城县| 夹江县| 丹巴县| 陵水| 都兰县| 恭城| 平武县| 遵义市| 江源县| 麻江县| 叙永县| 吉隆县| 保康县|