91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink入門wordCount

發布時間:2020-06-04 11:25:14 來源:網絡 閱讀:362 作者:qq513283439 欄目:大數據

Flink的編程模型
1、獲取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加載、創建數據;
DataSet
3、數據轉換;
Transformation
4、數據結果存放;
5、觸發執行。
env.execution

下面為flink輸出wordcount數據:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{

    @SuppressWarnings("rawtypes")
    @Override
    /**
     * @param value 原數據
     * @param out 輸出的數據
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.split(" ");
        for (String token : tokens) {
            if(token!=null && token.length()>0){
                Tuple2 t = new Tuple2<String, Integer>(token,1);
                out.collect(t);
            }
        }
    }

}

public static void main(String[] args) throws Exception {
    //創建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //創建數據集
    DataSet<String> text = env.fromElements("to be","or no to be","is question");
    //對數據集轉換
    DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
    //輸出轉換后的數據集(print中包含了env.execute執行)
    count.print();
    System.out.println("-----------------------");
    //對數據集分組統計轉換,0,1是下標,對應Tuple2類中的參數
    count = count.groupBy(0).sum(1);
    //控制臺輸出數據集
    count.print();
    System.out.println("-----------------------");
}

}

Flink使用sql方式轉換數據
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {

    //創建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

    List<WordCount> list = new ArrayList();
    String workStr = "to be or no to be is question";
    String[] tokens = workStr.split(" ");
    for (String token : tokens) {
        if(token!=null && token.length()>0){
            list.add( new WordCount(token,1));
        }
    }
    //創建數據集
    DataSet<WordCount> input = env.fromCollection(list);
    //注冊為數據表wordCount為數據庫表,word,frequency為wordCount表字段
    tEnv.registerDataSet("wordCount", input, "word, frequency");

    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );

    DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
    //控制臺輸出
    res.print();

}

public static class WordCount    {
    public String word;
    public long frequency;
    public WordCount(){}

    public WordCount(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;
    }

    @Override
    public String toString() {
        return "詞語:" + word + ",詞頻:" + frequency;
    }
}

}

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西平县| 营口市| 晋州市| 德惠市| 安义县| 揭西县| 正安县| 中方县| 隆昌县| 延安市| 封丘县| 呼图壁县| 锡林浩特市| 怀宁县| 沙河市| 巧家县| 南安市| 泾川县| 磴口县| 明溪县| 海林市| 惠来县| 白河县| 鄂尔多斯市| 岳池县| 阿图什市| 玉田县| 嘉兴市| 孙吴县| 延吉市| 西贡区| 阆中市| 汤阴县| 江山市| 阿鲁科尔沁旗| 祁阳县| 建湖县| 鸡西市| 西城区| 藁城市| 南溪县|