您好,登錄后才能下訂單哦!
本篇文章為大家展示了Flink開發怎樣進行實時處理應用程序,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一節中的springboot-flink-train項目
第一步:創建流處理上下文環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
第二步:讀取數據,使用socket流方式讀取數據
DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);
第三步:transform
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
這里我們使用逗號分隔,然后跟批處理不同的是,這里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久執行一次。
第四步:執行
env.execute("StreamingWCJavaApp");
整體代碼如下:
/** * 使用Java API來開發Flink的實時處理應用程序 * wc統計的數據源自socket */ public class StreamingWCJava02App { public static void main(String[] args) throws Exception { // 獲取參數 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未設置, 使用默認端口9999"); port = 9999; } // step1: 獲取流處理上下文環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // step2: 讀取數據 DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port); // step3: transform text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(); env.execute("StreamingWCJavaApp"); } }
首先在192.168.152.45上運行命令
nc -l 9999
然后在運行main方法。在192.168.152.45的nc上輸入
abc,def,abc,ddd
在idea控制臺輸出如下:
4> (abc,2) 1> (def,1) 4> (ddd,1)
這個前面的"4>"表示并行度。我們可以設置setParallelism(1)來忽略這個問題。如下所示:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
這樣控制臺的打印結果如下:
(abc,2) (ddd,1) (def,1)
這樣一個簡單的demo就成功了!
上面的代碼中localhost與port需要用參數傳遞進來。
代碼如下:
// 獲取參數 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未設置, 使用默認端口9999"); port = 9999; }
使用Flink提供的ParameterTool來接收參數。
我們在運行時就可以指定參數列表了,其中的key必須以“-”或者“--”開頭。
在運行時,配置參數:
這樣運行就可以從外界傳遞參數了
接下來使用Scala方式實現,在項目springboot-flink-train-scala中新建StreamingWCScalaApp,內容如下:
/** * 使用Scala開發Flink的實時處理應用程序 */ object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隱式轉換 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } }
這種方式比java實現更加簡潔。
上述內容就是Flink開發怎樣進行實時處理應用程序,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。