91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink開發怎樣進行實時處理應用程序

發布時間:2021-10-20 16:29:21 來源:億速云 閱讀:137 作者:柒染 欄目:大數據

本篇文章為大家展示了Flink開發怎樣進行實時處理應用程序,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

使用Flink + java實現需求

環境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一節中的springboot-flink-train項目

開發步驟

第一步:創建流處理上下文環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:讀取數據,使用socket流方式讀取數據

DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

這里我們使用逗號分隔,然后跟批處理不同的是,這里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久執行一次。

第四步:執行

env.execute("StreamingWCJavaApp");

整體代碼如下:

/**
 * 使用Java API來開發Flink的實時處理應用程序
 * wc統計的數據源自socket
 */
public class StreamingWCJava02App {

    public static void main(String[] args) throws Exception {

        // 獲取參數
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未設置, 使用默認端口9999");
            port = 9999;
        }


        // step1: 獲取流處理上下文環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // step2: 讀取數據
        DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);
        // step3: transform
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
        env.execute("StreamingWCJavaApp");
    }

}

運行

首先在192.168.152.45上運行命令

nc -l 9999

然后在運行main方法。在192.168.152.45的nc上輸入

abc,def,abc,ddd

在idea控制臺輸出如下:

4> (abc,2)
1> (def,1)
4> (ddd,1)

這個前面的"4>"表示并行度。我們可以設置setParallelism(1)來忽略這個問題。如下所示:

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

這樣控制臺的打印結果如下:

(abc,2)
(ddd,1)
(def,1)

這樣一個簡單的demo就成功了!

重構代碼

上面的代碼中localhost與port需要用參數傳遞進來。

代碼如下:

        // 獲取參數
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未設置, 使用默認端口9999");
            port = 9999;
        }

使用Flink提供的ParameterTool來接收參數。

我們在運行時就可以指定參數列表了,其中的key必須以“-”或者“--”開頭。

在運行時,配置參數:

Flink開發怎樣進行實時處理應用程序

這樣運行就可以從外界傳遞參數了

使用Flink + Scala實現需求

接下來使用Scala方式實現,在項目springboot-flink-train-scala中新建StreamingWCScalaApp,內容如下:

/**
  * 使用Scala開發Flink的實時處理應用程序
  */
object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隱式轉換
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map((_,1))
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1)
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
}

這種方式比java實現更加簡潔。

上述內容就是Flink開發怎樣進行實時處理應用程序,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

瑞金市| 都江堰市| 乐至县| 太康县| 乌恰县| 彰化市| 当雄县| 盐津县| 周宁县| 阜新| 长岭县| 承德市| 公安县| 苗栗县| 乌兰浩特市| 蚌埠市| 临湘市| 江津市| 华池县| 商南县| 科技| 本溪| 湟中县| 盐源县| 乾安县| 铁岭县| 吴桥县| 武邑县| 潼关县| 五华县| 桦川县| 晋州市| 张掖市| 舞钢市| 体育| 伊川县| 上犹县| 鹿泉市| 永川市| 秦皇岛市| 凤山县|