您好,登錄后才能下訂單哦!
這篇文章主要介紹“flinksql如何鏈接kafka”,在日常操作中,相信很多人在flinksql如何鏈接kafka問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”flinksql如何鏈接kafka”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flinksqldemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- Encoding --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> <kafka.version>0.10.2.1</kafka.version> <flink.version>1.12.0</flink.version> <hadoop.version>2.7.3</hadoop.version> <!-- scope 本地調試時注銷 設定為默認的 compile 打包時設定為 provided --> <setting.scope>compile</setting.scope> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--flink start--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>--> <!-- flink end--> <!-- kafka start --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- kafka end--> <!-- hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- hadoop end --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency> </dependencies> </project>
代碼:
package com.jd.data; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class TableApiConnectKafka04 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1、創建表執行環節 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.connect(new Kafka() .version("0.11") // 定義版本 .topic("xxx") // 定義主題 .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ).withFormat(new Csv()).withSchema(new Schema().field("a", DataTypes.STRING()) // 定義表的結構 .field("b", DataTypes.STRING()) .field("c", DataTypes.STRING()) ) .inAppendMode() .createTemporaryTable("xxx"); Table xxx = tableEnv.from("xxx"); xxx.printSchema(); tableEnv.toAppendStream(xxx, Row.class ).print(); env.execute("job"); } }
到此,關于“flinksql如何鏈接kafka”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。