91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Maxwell讀取MySQL binlog日志到Kafka

發布時間:2020-06-30 20:50:36 來源:網絡 閱讀:4950 作者:Stitch_x 欄目:大數據

啟動MySQL

創建maxwell的數據庫和用戶

在MySQL中創建一個測試數據庫和表

前面三個步驟詳見 Maxwell讀取MySQL binlog日志通過stdout展示

啟動Zookeeper

[hadoop@hadoop001 ~]$ cd $ZK_HOME/bin
[hadoop@hadoop001 bin]$ ./zkServer.sh start

啟動kafka,并創建主題為maxwell的topic

[hadoop@hadoop001 bin]$ cd $KAFKA_HOME
//查看kafka版本,防止maxwell不支持
[hadoop@hadoop001 kafka]$ find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka_2.11-0.10.0.1-sources.jar
//啟動kafka-server服務
[hadoop@hadoop001 kafka]$ nohup bin/kafka-server-start.sh config/server.properties &
[hadoop@hadoop001 kafka]$ jps
13460 QuorumPeerMain
14952 Jps
13518 Kafka
[hadoop@hadoop001 kafka]$ bin/kafka-topics.sh --create --zookeeper 192.168.137.2:2181/kafka --replication-factor 1 --partitions 3 --topic maxwell
Created topic "maxwell".
[hadoop@hadoop001 kafka]$ bin/kafka-topics.sh --list --zookeeper 192.168.137.2:2181/kafka
__consumer_offsets
maxwell

啟動kafaka的消費者,檢查數據是否到位

[hadoop@hadoop001 kafka]$ bin/kafka-console-consumer.sh --zookeeper 192.168.137.2:2181/kafka --topic maxwell --from-beginning

啟動maxwell進程

//先檢查maxwell是否支持kafka-0.10.0.1
[root@hadoop000 ~]# cd /root/app/maxwell-1.17.1/lib/kafka-clients
[root@hadoop001 kafka-clients]# ll
total 5556
-rw-r--r-- 1 yarn games  746207 Jul  3  2018 kafka-clients-0.10.0.1.jar
-rw-r--r-- 1 yarn games  951041 Jul  3  2018 kafka-clients-0.10.2.1.jar
-rw-r--r-- 1 yarn games 1419544 Jul  3  2018 kafka-clients-0.11.0.1.jar
-rw-r--r-- 1 yarn games  324016 Jul  3  2018 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 yarn games  641408 Jul  3  2018 kafka-clients-0.9.0.1.jar
-rw-r--r-- 1 yarn games 1591338 Jul  3  2018 kafka-clients-1.0.0.jar
//發現支持kafka-0.10.0.1版本,假如沒有生產上正在用的kafka版本的jar包,可以直接把這個版本的client jar包copy進來
//啟動maxwell
[root@hadoop001 maxwell-1.17.1]# bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka_version=0.10.0.1 --kafka.bootstrap.servers=192.168.137.2:9092 --kafka_topic=maxwell
Using kafka version: 0.10.0.1
10:16:52,979 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:16:53,451 INFO  ProducerConfig - ProducerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [192.168.137.2:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 60000
        interceptor.classes = null
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 30000
        acks = 1
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 0
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0

10:16:53,512 INFO  ProducerConfig - ProducerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [192.168.137.2:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 60000
        interceptor.classes = null
        ssl.truststore.password = null
        client.id = producer-1
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 30000
        acks = 1
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 0
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0

10:16:53,516 INFO  AppInfoParser - Kafka version : 0.10.0.1
10:16:53,516 INFO  AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
10:16:53,550 INFO  Maxwell - Maxwell v1.17.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000016:116360], lastHeartbeat=1552092988288]
10:16:53,730 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000014:5999], lastHeartbeat=0])
10:16:53,846 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000016:116360
10:16:53,951 INFO  BinaryLogClient - Connected to 127.0.0.1:3306 at mysql-bin.000016/116360 (sid:6379, cid:4)
10:16:53,951 INFO  BinlogConnectorLifecycleListener - Binlog connected.

在MySQL中更新一條數據

mysql> update emp set sal=502 where empno=6001;
mysql> update emp set sal=603 where empno=6001;
mysql> create table emp1 select * from emp;

查看kafka的消費者

//對應第一條insert語句
{"database":"hlwtest","table":"emp","type":"update","ts":1552097863,"xid":89,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":502.00,"comm":6000.00,"deptno":40},"old":{"sal":501.00}}
//對應第二條insert語句
{"database":"hlwtest","table":"emp","type":"update","ts":1552097951,"xid":123,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":603.00,"comm":6000.00,"deptno":40},"old":{"sal":502.00}}
//對應建表,相當于在新表emp1里插入了emp表里的所有數據
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":0,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":603.00,"comm":6000.00,"deptno":40}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":1,"data":{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17 00:00:00","sal":800.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":2,"data":{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20 00:00:00","sal":1600.00,"comm":300.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":3,"data":{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22 00:00:00","sal":1250.00,"comm":500.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":4,"data":{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02 00:00:00","sal":2975.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":5,"data":{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28 00:00:00","sal":1250.00,"comm":1400.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":6,"data":{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01 00:00:00","sal":2850.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":7,"data":{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09 00:00:00","sal":2450.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":8,"data":{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":9,"data":{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":0,"hiredate":"1981-11-17 00:00:00","sal":5000.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":10,"data":{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08 00:00:00","sal":1500.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":11,"data":{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23 00:00:00","sal":1100.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":12,"data":{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03 00:00:00","sal":950.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":13,"data":{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":14,"data":{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23 00:00:00","sal":1300.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"commit":true,"data":{"empno":8888,"ename":"HIVE","job":"PROGRAM","mgr":7839,"hiredate":"1988-01-23 00:00:00","sal":10300.00,"comm":0.00,"deptno":null}}
數據已經正常同步到kafka中

Maxwell的過濾功能

參考過濾配置: <http://maxwells-daemon.io/filtering/>

[root@hadoop001 maxwell-1.17.1]# bin/maxwell --user='maxwell' --password='maxwell' \
--host='127.0.0.1' --filter 'exclude: *.*, include:hlwtest.emp1' \
--producer=kafka --kafka_version=0.10.0.1 --kafka.bootstrap.servers=192.168.137.2:9092 --kafka_topic=maxwell
--filter 'exclude: *.*, include:hlwtest.emp1'的意思是只監控hlwtest.emp1表的變化,其他的都不監控
//MySQL中update數據
mysql> update emp set sal=730 where empno=6001;
mysql> update emp1 set sal=330 where empno=6001;
mysql> update emp set sal=730 where empno=6001;
mysql> update emp1 set sal=331 where empno=6001;

//Kafka消費者接收到的數據
{"database":"hlwtest","table":"emp1","type":"update","ts":1552099858,"xid":916,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":330.00,"comm":6000.00,"deptno":40},"old":{"sal":321.00}}
{"database":"hlwtest","table":"emp1","type":"update","ts":1552099858,"xid":922,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":331.00,"comm":6000.00,"deptno":40},"old":{"sal":330.00}}
確實只消費到了emp1表的update語句,而沒有接收到emp表的更新

Maxwell 的bootstrap

mysql> insert into maxwell.bootstrap (database_name, table_name) values ("hlwtest", "emp");
mysql> select * from maxwell.bootstrap;
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+
| id | database_name | table_name | where_clause | is_complete | inserted_rows | total_rows | created_at | started_at          | completed_at        | binlog_file      | binlog_position | client_id |
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+
|  1 | hlwtest       | emp        | NULL         |           1 |            16 |          0 | NULL       | 2019-03-09 11:33:11 | 2019-03-09 11:33:11 | mysql-bin.000018 |          225498 | maxwell   |
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+

kafka的消費窗口

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552102248,"xid":1555,"commit":true,"data":{"id":1,"database_name":"hlwtest","table_name":"emp","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"hlwtest","table":"emp","type":"bootstrap-start","ts":1552102391,"data":{}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":730.00,"comm":6000.00,"deptno":40}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17 00:00:00","sal":800.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20 00:00:00","sal":1600.00,"comm":300.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22 00:00:00","sal":1250.00,"comm":500.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02 00:00:00","sal":2975.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28 00:00:00","sal":1250.00,"comm":1400.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01 00:00:00","sal":2850.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09 00:00:00","sal":2450.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":0,"hiredate":"1981-11-17 00:00:00","sal":5000.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08 00:00:00","sal":1500.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23 00:00:00","sal":1100.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03 00:00:00","sal":950.00,"comm":0.00,"deptno":30}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23 00:00:00","sal":1300.00,"comm":0.00,"deptno":10}}
{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":8888,"ename":"HIVE","job":"PROGRAM","mgr":7839,"hiredate":"1988-01-23 00:00:00","sal":10300.00,"comm":0.00,"deptno":null}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552102391,"xid":1617,"commit":true,"data":{"id":1,"database_name":"hlwtest","table_name":"emp","where_clause":null,"is_complete":1,"inserted_rows":16,"total_rows":0,"created_at":null,"started_at":"2019-03-09 11:33:11","completed_at":"2019-03-09 11:33:11","binlog_file":"mysql-bin.000018","binlog_position":225498,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"hlwtest","table":"emp","type":"bootstrap-complete","ts":1552102391,"data":{}}
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新绛县| 唐海县| 伊宁县| 长海县| 玉门市| 繁昌县| 双城市| 临泽县| 吕梁市| 连平县| 吉隆县| 新民市| 上林县| 通江县| 延长县| 五台县| 大兴区| 宁城县| 平乡县| 东乌珠穆沁旗| 鄂伦春自治旗| 罗山县| 黄浦区| 鸡泽县| 白水县| 禹城市| 晋城| 南宁市| 徐州市| 共和县| 丘北县| 漯河市| 栾城县| 宁明县| 台北市| 邵东县| 九龙城区| 康平县| 东平县| 中方县| 田林县|