您好,登錄后才能下訂單哦!
本篇內容介紹了“Flink的bulkIteration迭代操作怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。為了從大數據中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。大數據處理框架很多,比如spark,mr。實際上這些實現迭代計算都是很困難的。
Flink神奇之處就是它直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,然后將其嵌入到迭代算子中去。有兩種迭代操作算子:Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調用step函數。
本小節是主要是講解理論。
迭代操作算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),然后計算得到下輪迭代的輸出(例如,map,reduce,join等)
1.迭代輸入(Iteration Input)
第一次迭代的初始輸入,可能來源于數據源或者先前的操作算子。
2. Step函數
每次迭代都會執行step函數。其是由map,reduce,join等算子組成的數據流,根據業務定制的。
3. 下次迭代的部分結果(Next Partial Solution):
每次迭代,step函數的輸出結果會有部分返回參與繼續迭代。
4. 最大迭代次數
如果沒有其他終止條件,就會在聚合次數達到該值的情況下終止。
5. 自定義聚合器收斂:
迭代允許指定自定義聚合器和收斂標準,如sum會聚合要發出的記錄數(聚合器),如果此數字為零則終止(收斂標準)。
案例:累加計數
這個例子主要是給定數據輸入,每次增加一,輸出結果。
迭代輸入:輸入是1-5的數字。
step函數:給數字加一操作。
部分結果:實際上就是一個map函數。
迭代結果:最大迭代次數是十次,所以最終輸出是11-15.
代碼操作
編程的時候,本文說的這種迭代方式叫做bulk Iteration,需要調用iterate(int),該函數返回的是一個IterativeDataSet,當然我們可以對他進行一些操作,比如map等。Iterate函數唯一的參數是代表最大迭代次數。
迭代是一個環有前面的圖可以看到,我們需要進行閉環操作,那么這時候就要用到closeWith(Dataset)操作了,參數就是需要循環迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數后終止。
下面就是通過迭代計算pi的例子。
package Streaming.iteration;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
public class IteratePi {
public static voidmain(String[] args) throws Exception{
final ExecutionEnvironmentenv = ExecutionEnvironment.getExecutionEnvironment();
// Create initialIterativeDataSet
IterativeDataSet<Integer> initial= env.fromElements(0).iterate(100);
DataSet<Integer> iteration= initial.map(new MapFunction<Integer, Integer>(){
@Override
public Integermap(Integer i) throws Exception{
double x = Math.random();
double y = Math.random();
return i + ((x * x + y * y < 1) ? 1 : 0);
}
});
// Iterativelytransform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);
count.map(new MapFunction<Integer, Double>(){
@Override
public Double map(Integercount) throws Exception {
return count /(double) 10000 * 4;
}
}).print();
// execute theprogram
env.execute("IterativePi Example");
}
}
“Flink的bulkIteration迭代操作怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。