91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka Connect及FileConnector的示例分析

發布時間:2021-12-15 09:19:52 來源:億速云 閱讀:211 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關Kafka Connect及FileConnector的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

一. Kafka Connect簡介

Kafka是一個使用越來越廣的消息系統,尤其是在大數據開發中(實時數據處理和分析)。為何集成其他系統和解耦應用,經常使用Producer來發送消息到Broker,并使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統與Kafka的集成。Kafka Connect運用用戶快速定義并實現各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。

Kafka Connect及FileConnector的示例分析

如圖中所示,左側的Sources負責從其他異構系統中讀取數據并導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其他的系統中。

二. 各種Kafka Connector

Kafka Connector很多,包括開源和商業版本的。如下列表中是常用的開源的Connector

ConnectorsReferences
JdbcSource, Sink
Elastic SearchSink1, Sink2, Sink3
CassandraSource1, Source 2, Sink1, Sink2
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source, Sink
S3Sink1, Sink2

商業版的可以通過Confluent.io獲得

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)轉為流數據再寫入到Destination(test.sink.txt)中。如下圖所示:

Kafka Connect及FileConnector的示例分析

本例使用到了兩個Connector:

  • FileStreamSource:從test.txt中讀取并發布到Broker中

  • FileStreamSink:從Broker中讀取數據并寫入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
3.2 運行Demo

需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

3.2.1 啟動Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 啟動Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打開console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 寫入到test.txt文件中,并觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打開的窗口輸出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line

關于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

全州县| 红桥区| 涟水县| 泰和县| 新巴尔虎左旗| 浦城县| 万载县| 黄骅市| 阿坝| 丹阳市| 开阳县| 嘉荫县| 揭东县| 白水县| 金湖县| 吴川市| 汉源县| 昭苏县| 苗栗县| 东光县| 延寿县| 高平市| 中西区| 台湾省| 云龙县| 贵州省| 舒城县| 通榆县| 兴国县| 凌源市| 丰宁| 怀集县| 广平县| 长白| 即墨市| 高平市| 潞西市| 龙游县| 资中县| 通州区| 奉贤区|