在 Spring Boot 中使用 Apache Flink,你需要先添加 Flink 的依賴項到你的項目中。以下是一個簡單的例子,展示了如何在 Spring Boot 應用程序中配置 Flink 數據源(Source)和數據接收器(Sink)。
pom.xml
文件中添加 Flink 的依賴項: <!-- Spring Boot dependencies -->
...
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class FlinkConfiguration {
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.input.topic}")
private String inputTopic;
@Value("${kafka.output.topic}")
private String outputTopic;
@Bean
public StreamExecutionEnvironment streamExecutionEnvironment() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
@Bean
public FlinkKafkaConsumer<String> kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty("group.id", "flink-spring-boot");
return new FlinkKafkaConsumer<>(inputTopic, new SimpleStringSchema(), properties);
}
@Bean
public FlinkKafkaProducer<String> kafkaProducer() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
return new FlinkKafkaProducer<>(outputTopic, new SimpleStringSchema(), properties);
}
@Bean
public DataStream<String> dataStream(StreamExecutionEnvironment env, FlinkKafkaConsumer<String> consumer) {
return env.addSource(consumer);
}
}
application.properties
文件中配置 Kafka 相關參數:kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input-topic
kafka.output.topic=output-topic
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FlinkSpringBootApplication implements CommandLineRunner {
@Autowired
private StreamExecutionEnvironment env;
@Autowired
private DataStream<String> dataStream;
@Autowired
private FlinkKafkaProducer<String> kafkaProducer;
public static void main(String[] args) {
SpringApplication.run(FlinkSpringBootApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Process the data stream as needed
dataStream.map(value -> value.toUpperCase()).addSink(kafkaProducer);
// Execute the Flink job
env.execute("Flink Spring Boot Example");
}
}
這個例子展示了如何在 Spring Boot 應用程序中配置 Flink 數據源(從 Kafka 讀取數據)和數據接收器(將處理后的數據寫入 Kafka)。你可以根據自己的需求修改數據處理邏輯。