您好,登錄后才能下訂單哦!
Flink的編程模型
1、獲取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加載、創建數據;
DataSet
3、數據轉換;
Transformation
4、數據結果存放;
5、觸發執行。
env.execution
下面為flink輸出wordcount數據:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkMain {
@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{
@SuppressWarnings("rawtypes")
@Override
/**
* @param value 原數據
* @param out 輸出的數據
*/
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
Tuple2 t = new Tuple2<String, Integer>(token,1);
out.collect(t);
}
}
}
}
public static void main(String[] args) throws Exception {
//創建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//創建數據集
DataSet<String> text = env.fromElements("to be","or no to be","is question");
//對數據集轉換
DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
//輸出轉換后的數據集(print中包含了env.execute執行)
count.print();
System.out.println("-----------------------");
//對數據集分組統計轉換,0,1是下標,對應Tuple2類中的參數
count = count.groupBy(0).sum(1);
//控制臺輸出數據集
count.print();
System.out.println("-----------------------");
}
}
Flink使用sql方式轉換數據
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class FlinkMain2 {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
//創建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List<WordCount> list = new ArrayList();
String workStr = "to be or no to be is question";
String[] tokens = workStr.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
list.add( new WordCount(token,1));
}
}
//創建數據集
DataSet<WordCount> input = env.fromCollection(list);
//注冊為數據表wordCount為數據庫表,word,frequency為wordCount表字段
tEnv.registerDataSet("wordCount", input, "word, frequency");
Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );
DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
//控制臺輸出
res.print();
}
public static class WordCount {
public String word;
public long frequency;
public WordCount(){}
public WordCount(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "詞語:" + word + ",詞頻:" + frequency;
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。