您好,登錄后才能下訂單哦!
為了實現 實時同步數據,在mac環境搭建了canal,mysql,kafka的一套流程
使用canal加mysql加kafka的方式傳遞數據
mysql 數據源頭
canal模仿slave沖mysql取數據。。是一個管道
kafka 將canal獲取的數據放入kafka 。然后消費(程序獲取kafka的隊列。消費數據)
mysql安裝
這個就不詳述了。。不要安裝最新版的mysql。本人親測8.0版本和canal不一定能很好的兼容
所以。。安裝了mysql5.7
安裝命令(這樣安裝 安裝的是8.0版本)
brew install mysql
2.java安裝
不管是canal還是kafka都是需要java環境的。。
也不能用最新版的java,推薦使用java8也就是jdk8
最好是從官網oracle下載
安裝方法:
https://blog.csdn.net/oumuv/article/details/84064169
3.canal安裝
在裝calal之前 確保mysql 及 java8 環境安裝好了
下載地址: https://github.com/alibaba/canal/releases
下載 canal.deployer-1.1.4.tar.gz
在家目錄下面新建一個canal目錄.直接解壓就行
官方文件已經編譯好了,不需再編譯
需要添加賬戶(canal@%及canal@local)
在canal/bin目錄下有幾個腳本文件,startup.sh 啟動服務用的,stop.sh 停止服務用的
在canal/logs目錄下放的是日志文件。
在canal/conf目錄下放的是配置文件。
實例配置文件
canal/conf/example/instance.properties
實例的配置文件。。決定了 你連接哪個實例的數據庫 可以精確到表
instance.properties中比較重要的參數
#實例
canal.instance.master.address = 127.0.0.1:3306
#db
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =test
canal.instance.connectionCharset = UTF-8
#table
# table regex
canal.instance.filter.regex = test.ttt
#mq topic
canal.mq.topic=test12345
上面這樣配置 就代表了。。通過canal連接127.0.0.1中的test庫的ttt表,放到一個叫做test12345的topic里面
驗證canal是否和mysql連接 只需要在mysql的進程里面查看是否有一個復制連接(因為canal就是模仿了一個slave)
還有一個全局的文件
表示了需要連接的kafka zookeeper等
canal/conf/canal.properties
列舉canal連接kafka的重要參數
canal.id= 1#如果有多個canal 這個值和集群中的canal不能沖突
canal.ip=172.17.61.113#canal的ip
canal.port= 11111
canal.zkServers=172.17.61.113:2181#zookeeper的ip:port
canal.serverMode = kafka
canal.destinations = example
#mq
canal.mq.servers = 172.17.61.113:9092#kafka的ip:port
上面有些關于kafka的參數 是 需要kafka安裝好之后才能陪得(因為已經裝好了 所以先列舉出來)
4.kafka安裝
kafka安裝時需要zookeeper的。。
但是一般zookeeper會集成在kafka里面 所以不需要特別單獨安裝(也可以單獨安裝)
mac環境安裝命令如下
brew install kafka
一直等安裝完成就行
運行kafka是需要依賴于zookeeper的,所以安裝kafka的時候也會包含zookeeper。
kafka的安裝目錄:/usr/local/Cellar/kafka/2.0.0/bin
kafka的配置文件目錄:/usr/local/Cellar/kafka/2.0.0/bin
kafka服務的配置文件:/usr/local/etc/kafka/server.properties
zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties
listeners=PLAINTEXT://172.17.61.113:9092
advertised.listeners=PLAINTEXT://172.17.61.113:9092
zookeeper 配置文件我在安裝的時候 沒有做任何修改
然后 通過上面配置canal.properties 就連接上kafka
kafka的基本命令(可以通過find / -name zookeeper-server-start 查找具體的位置)
首先,啟動zookeeper:
zookeeper-server-start /usr/ local /etc/kafka/zookeeper.properties
然后,啟動kafka
kafka- server -start /usr/local/etc/kafka/ server .properties
創建一個“使用單個分區”、“只有一個副本”、名為“test”的主題的topic
kafka-topics --create --zookeeper localhost :2181 --replication-factor 1 --partitions 1 --topic test
(創建topic,這個topic。我們在canal中已經有了配置)
使用如下命令創建topic
kafka-topics --create --zookeeper localhost:2181/kafka --replication-factor 1
--partitions 1 --topic topic1 查看創建的topic,運行list topic命令:
kafka-topics --list --zookeeper 172.17.61.113:2181 生產消息
kafka-console-producer --broker-list 172.17.61.113:9092 --topic test
(消息數據由canal自動發送。所以這里的命令我們只有明白就行)
消費消息(這個地方就可以最后來查看mysql+canal+kafka是否已經聯動起來了)
kafka-console-consumer --bootstrap-server 172.17.61.113:9092 --topic test12345 --from-beginning
我在mysql里面的test庫的ttt表里加了一條記錄
然后反應在kafka就是如下結果
{"data":[{"id":"13","var":"ded"}],"database":"test","es":1575448571000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","var":"varchar(5)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"var":12},"table":"ttt","ts":1575448571758,"type":"INSERT"}
這就表示整個mysql+canal+kafka已經成功了
接下來只要等著程序那邊去消費這個隊列信息就好了
##############
有可能的報錯
服務端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
是由于你改了配置文件,導致meta.dat 中保存的位點信息和數據庫的位點信息不一致;導致canal抓取不到數據庫的動作;
解決方法:刪除meta.dat刪除,再重啟canal,問題解決;
詳見
https://www.cnblogs.com/shaozhiqi/p/11534658.html
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。