您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何編寫最簡單的helloWorld,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
JDK 1.8
IDE Intellij idea
Flink 1.8.1
創建一個Flink簡單Demo,可以從流數據中統計單詞個數。
首先創建一個maven項目,其中pom.xml文件內容如下:
<properties> <flink.version>1.8.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.4.RELEASE</version> <configuration> <mainClass>wikiedits.StreamingJob</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build>
創建一個包com.vincent,并且創建一個類StreamingJob.java
public class WikipediaAnalysis { public static void main(String[] args) throws Exception { } }
Flink 程序的第一步是創建一個StreamExecutionEnvironment。StreamExecutionEnvironment可以設置參數并且導入一些外部系統的數據源。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
接下來創建一個外部數據源,外部數據源使用nc -l 9000 表示服務器端開啟監聽9000端口,并可以發送數據。
DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);
這樣就添加了一個流文本數據源,有了DataStream就可以獲取數據了,然后對數據進行分析:
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
flatMap表示將嵌套集合轉換并平鋪成非嵌套集合,字符串是s,返回值是Collector<Tuple2<String, Integer>>。并且根據keyBy(0)即第0個字段進行統計加一操作。.timeWindow()指定窗口大小是5秒。
所以整體代碼如下:
public class StreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.socketTextStream("192.168.152.45", 9000); DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1); dataStream.print(); // execute program env.execute("Java WordCount from SocketTextStream Example"); } }
運行main方法,然后在服務器端執行nc -l 9000 并且輸入文本:
iie4bu@swarm-manager:~$ nc -l 9000 a b d d e f
然后在intellij控制臺將輸出:
1> (b,1) 3> (a,1) 1> (f,1) 3> (d,2) 1> (e,1)
可以統計出每個單詞的次數
上述就是小編為大家分享的如何編寫最簡單的helloWorld了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。