在Ubuntu系統下提交Flink任務,需要遵循以下步驟:
安裝Java環境:
Flink依賴于Java運行環境,因此首先需要確保你的系統中已經安裝了Java。可以使用以下命令檢查Java是否已安裝:
java -version
如果沒有安裝Java,可以使用以下命令安裝OpenJDK:
sudo apt update
sudo apt install openjdk-11-jdk
下載并解壓Flink:
從Flink官方網站(https://flink.apache.org/downloads.html)下載所需版本的Flink,然后在Ubuntu系統上解壓:
wget https://downloads.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
tar xzf flink-1.14.0-bin-scala_2.11.tgz
cd flink-1.14.0
配置Flink:
根據實際需求,修改Flink的配置文件(位于conf
目錄下),例如flink-conf.yaml
、masters
和workers
等。
啟動Flink集群:
在完成配置后,可以使用以下命令啟動Flink集群:
bin/start-cluster.sh
你可以通過訪問Web UI(默認地址為http://localhost:8081)來查看集群狀態。
編寫Flink任務:
使用Java、Scala或Python編寫Flink任務。這里以Java為例,創建一個簡單的WordCount任務:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements("Hello Flink", "Hello World");
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("WordCount Example");
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
編譯和打包Flink任務:
將編寫好的Flink任務編譯并打包成JAR文件。例如,如果你使用Maven或Gradle構建項目,可以使用以下命令生成JAR文件:
mvn clean package
提交Flink任務:
使用Flink的命令行工具提交任務到集群。假設你的任務JAR文件名為wordcount.jar
,可以使用以下命令提交任務:
bin/flink run -c com.example.WordCount wordcount.jar
其中-c
選項指定了任務的主類名。
完成以上步驟后,Flink任務將在Ubuntu系統下的集群上運行。