要使用Flink SQL讀取Kafka數據,需要按照以下步驟進行操作:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
確保${flink.version}
是Flink的版本號。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableSql = "CREATE TABLE kafka_table (\n" +
" key STRING,\n" +
" value STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'your_topic',\n" +
" 'properties.bootstrap.servers' = 'your_bootstrap_servers',\n" +
" 'properties.group.id' = 'your_group_id',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")";
tEnv.executeSql(createTableSql);
在上述代碼中,'topic'
和'properties.bootstrap.servers'
需要替換為你的Kafka主題和啟動服務器的地址。'properties.group.id'
是Flink消費者組的唯一標識符。
另外,'format'
參數指定了數據格式,可以根據實際情況將其設置為適當的值。
String querySql = "SELECT * FROM kafka_table";
Table result = tEnv.sqlQuery(querySql);
DataStream<Row> resultStream = tEnv.toAppendStream(result, Row.class);
現在,你可以對resultStream
進行進一步處理,如打印或寫入到其他系統中。
最后,記得調用env.execute()
啟動Flink作業。