Apache Flink 是一個分布式流處理框架,用于實時處理無界和有界數據流
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SimpleStreamProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.fromElements("Hello", "Flink", "on", "Ubuntu");
DataStream<String> processed = source.map(s -> s.toUpperCase());
processed.print();
env.execute("Simple Stream Processing Example");
}
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.fromElements("Hello", "Flink", "on", "Ubuntu", "is", "awesome");
DataStream<Tuple2<String, Integer>> wordCounts = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(0)
.sum(1);
wordCounts.print();
env.execute("Word Count Example");
}
}
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class RollingAverageExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Double> source = env.fromElements(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0);
DataStream<Double> rollingAverage = source
.timeWindowAll(Time.seconds(3))
.reduce((value1, value2) -> value1 + value2)
.map(sum -> sum / 3);
rollingAverage.print();
env.execute("Rolling Average Example");
}
}
要運行這些示例,請確保已安裝 Java 開發工具包(JDK)并正確配置了 Flink。然后,將示例代碼保存為 Java 文件(例如 SimpleStreamProcessing.java
),并使用以下命令編譯和運行:
javac -cp /path/to/flink/lib/*: SimpleStreamProcessing.java
java -cp /path/to/flink/lib/*: SimpleStreamProcessing.class SimpleStreamProcessing
請注意,您需要根據實際情況替換 /path/to/flink/lib/
為 Flink 安裝目錄中 lib
文件夾的路徑。