您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關怎樣分析Debezium MySQL模塊設計,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
注:本文不會著重分析MySQL binlog格式結構和解析過程,而在于debezium的架構設計。
Debezium is an open source distributed platform for change data capture.
這句話引用自debezium官網,可以看到,debezium的野心還是很大的,把自己定義為一個通用的CDC平臺,事實上,也確實如此,尤其是從0.8版本以來,開發者將大量精力投入到PostgreSQL模塊的開發,一方面引入SQL Server, Oracle, Db2, Cassandra等數據庫的支持,另一方面適配了Pulsar,Amazon Kineis,Google Pub/Sub等消息引擎,并且逐步重構,解耦和具體數據庫的綁定以及具體消息系統的依賴,向統一架構和云原生靠攏。
事實上,早期的Debezium是和Kafka Connect框架緊耦合的,Debezium是Kafka Connect的一個Source Plugin,并且主要適配MySQL,稍帶了MongoDB。
目前Debezium最新版是1.2,1.3已經進入beta階段。其實MySQL模塊在0.8版本已經基本定下來了,后面的變動只有0.9合入了DBZ-175,這是一個在線加表特性,僅作為內部實驗特性,并沒有在文檔上提及。我個人認為這個設計十分精彩,會在本文有篇幅專門討論。在后續的開發中,同為早期的MongoDB被徹底重構,但MySQL模塊還是保持原來的樣子。
Rebase MySQL connector to common framework used by the other connectors.
這個已經在Roadmap上掛了有段兒時間了,但可以預見到,短期內還不會有什么動作。很大一部分原因是,MySQL模塊的代碼中有大量的針對MySQL和Kafka Connect缺陷的額外處理,還有像DBZ-175這種統一架構還不支持的特性,另外由于MySQL的廣泛使用,多年來社區發現和修復了大量的場景下的bug,把一個久經驗證的模塊架構推倒是一件風險很大的事情。
后續的分析僅僅針對MySQL模塊的架構和代碼,基本上不會涉及新的統一架構。
上文提到,Debezium最初設計成一個Kafka Connect 的Source Plugin,目前開發者雖致力于將其與Kafka Connect解耦,但當前的代碼實現還未變動。下圖引自Debeizum官方文檔,可以看到一個Debezium在一個完整CDC系統中的位置。
Kafka Connect 為Source Plugin提供了一系列的編程接口,最主要的就是要實現SourceTask的poll方法,其返回List<SourceRecord>將會被以最少一次語義的方式投遞至Kafka。如果你想了解更多Kafka Connect的細節,請參閱我的另一篇文章:https://www.jianshu.com/p/538b2f0a7462
public abstract class SourceTask implements Task { ... public abstract List<SourceRecord> poll() throws InterruptedException; ... }
Reader體系構成了MySQL模塊中代碼的主線,我們的分析從Reader開始。
這里是Reader的整個繼承樹,我們先暫時忽略ParallelSnapshotReader,ReconcilingBinlogReader,他們是DBZ-175引入的東西。
從名字上應該可以看出,真正主要的是SnapshotReader和BinlogReader,分別實現了對MySQL數據的全量讀取和增量讀取,他們繼承于AbstractReader,里面封裝了共用邏輯,下圖是AbstractReader的內部設計。
可以看到,AbstractReader在實現時,并沒有直接將enqueue喂進來的record投遞進Kafka,而是通過一個內存阻塞隊列BlockingQueue進行了解耦,這種設計有諸多好處:
職責解耦
如上的圖中,在喂入BlockingQueue之前,要根據條件判斷是否接受該record;在向Kafka投遞record之前,判斷task的running狀態。這樣把同類的功能限定在特定的位置。
線程隔離
BlockingQueue是一個線程安全的阻塞隊列,通過BlockingQueue實現的生產者消費者模型,是可以跑在不同的線程里的,這樣避免局部的阻塞帶來的整體的干擾。如上圖中的右側,消費者會定期判斷running標志位,若running被stop信號置為了false,可以立刻停止整個task,而不會因MySQL IO阻塞延遲相應。
Single與Batch的互相轉化
Enqueue record是單條的投遞record,drain_to是批量的消費records。這個用法也可以反過來,實現batch到single的轉化。
還剩下兩個ChainedReader和TimedBlockingReader。
ChainedReader顧名思義,會把幾個Reader包裝起來,串行執行。
TimedBlockingReader就是簡單的sleep一段時間,它的存在是為了應對Kafka Connect rebalance的設計缺陷,在上文中我的另一篇文章中有提到。
如果你搭建過MySQL的主從同步,因該知道,建立從庫時,需要先導出全量數據(MySQL 8.0.x好像已經有了更便捷的方法),然后記錄binlog的位置,把全量數據導入從庫后,從binlog位置繼續增量同步,已保持數據的一致性。
可能你還知道阿里開源的另一個MySQL CDC工具canal,他只負責stream過程,并沒有處理snapshot過程,這也是debezium相較于canal的一個優勢。
對于Debezium來說,基本沿用了官方搭建從庫的這一思路,讓我們看下官方文檔描述的詳細步驟。(如果沒有額外說明,后面的討論僅針對Innodb引擎)
Grabs a global read lock that blocks writes by other database clients. The snapshot itself does not prevent other clients from applying DDL which might interfere with the connector’s attempt to read the binlog position and table schemas. The global read lock is kept while the binlog position is read before released in a later step.
Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.
Reads the current binlog position.
Reads the schema of the databases and tables allowed by the connector’s configuration.
Releases the global read lock. This now allows other database clients to write to the database.
Writes the DDL changes to the schema change topic, including all necessary DROP… and CREATE… DDL statements. This happens if applicable.
Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.
Commits the transaction.
Records the completed snapshot in the connector offsets.
Debezium把這個過程分解成了9步,看上去好像比我們想的要復雜些。
在Debezium目前版本的實現中,這9步是單線程串行的,其中主要的耗時就在第7步,這一步其實就是使用最樸素的方式,通過jdbc使用select * from table [where ...]來實現的讀取全量數據,如果很多千萬級甚至更大的表,這一步的耗時是很長的。其實這一步是可以并行化的,在第1步中,已經獲取了全局鎖,在全局鎖釋放前,是可以開多個連接,并行的實現全量數據的拉去,極大的提升效率。
另外snapshot整個過程如果失敗,是無法恢復的,畢竟事務已經丟了,無法再讀取當時的快照,來保證數據的一致性。
Snapshot過程時間長和中斷不可恢復,再加上Kafka Connect 粗暴的rebalance策略,正是早期使用debezium的一大痛點。TimedBlockingReader的引入正是為了在一定程度上緩解這個問題。
ChainedReader ├── TimedBlockReader ├── SnapshotReader └── BinlogReader
當準備一次性提交多個同步任務時,因為每次任務提交都會觸發一次rebalance,在SnapshotReader和BinlogReader前插入一個TimedBlockReader,確保同步任務提交后不會立刻執行,等多個任務都提交完成時,集群穩定下來,才會開始并發執行。
特別的,snapshot和stream過程都是可選的,你也可以像canal一樣只從當前時刻開始監聽binlog,捕獲stream數據,具體配置請參考官方文檔。
下面我們關注一下stream過程,也就是binlog解析過程。(做數據同步binlog必須設為row模式)
相信能讀到這里的大多數同學都執行過以下命令,就是用MySQL官方的binlog工具解析binlog文件內容。仔細看,你會發現,這里面有庫名和表名,有每個字段的值,卻沒有字段名,換句話說,binlog里不包含schema信息!
mysqlbinlog --no-defaults --base64-output=decode-rows -vvv ~/Downloads/mysql-bin.001192 | less
#190810 12:00:20 server id 206195699 end_log_pos 8624 CRC32 0x46912d80 GTID last_committed=12 sequence_number=13 rbr_only=yes /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; SET @@SESSION.GTID_NEXT= '5358e6dc-d161-11e8-8a6c-7cd30ac4dc44:25115781'/*!*/; # at 8624 #190810 12:00:20 server id 206195699 end_log_pos 8687 CRC32 0xe14a2f5a Query thread_id=576127 exec_time=0 error_code=0 SET TIMESTAMP=1565409620/*!*/; BEGIN /*!*/; # at 8687 #190810 12:00:20 server id 206195699 end_log_pos 8775 CRC32 0xaf16fb7d Table_map: `risk_control`.`log_operation` mapped to number 39286 # at 8775 #190810 12:00:20 server id 206195699 end_log_pos 9055 CRC32 0x9bdc15ae Write_rows: table id 39286 flags: STMT_END_F ### INSERT INTO `risk_control`.`log_operation` ### SET ### @1=8166048 /* INT meta=0 nullable=0 is_null=0 */ ### @2='7b0d526124ba40f6ac71cfe1d0d90665' /* VARSTRING(160) meta=160 nullable=0 is_null=0 */ ### @3=17 /* INT meta=0 nullable=1 is_null=0 */ ### @4='2' /* VARSTRING(64) meta=64 nullable=1 is_null=0 */ ### @5='casign_end=;方法public void com.xxx.risk.service.impl.ActivitiEventServiceImpl.updateAuditStatus(java.lang.String,java.lang.String);參數{"auditStatus": 3}' /* VARSTRING(1020) meta=1020 nullable=0 is_null=0 */ ### @6='\x00\x01\x00\x16\x00\x0b\x00\x0b\x00\x05\x03\x00auditStatus' /* JSON meta=4 nullable=1 is_null=0 */ ### @7='2019-08-10 12:00:21' /* DATETIME(0) meta=0 nullable=0 is_null=0 */ ### @8='' /* VARSTRING(128) meta=128 nullable=0 is_null=0 */ ### @9='' /* VARSTRING(256) meta=256 nullable=0 is_null=0 */ # at 9055 #190810 12:00:20 server id 206195699 end_log_pos 9086 CRC32 0xbe19a1b1 Xid = 180175929 COMMIT/*!*/;
其實這種設計可以理解,作為一個高效的二進制格式,binlog里不存儲冗余度極高的列名可以很可觀的減少體積,并且,有了表名,表結構信息可以從MySQL information_schema表中拿到的,何必再存一份呢?
但是,debezium偷梁換柱,模擬從庫拉取binlog做解析,他并不是真正的從庫,是沒有information_schema表可以查的,只能從MySQL主庫查詢。但這個方式真的萬無一失嗎?
考慮下面的場景:
15:00 BinlogReader正常消費
15:05 Kafka Connect集群維護,暫停BinlogReader
15:10 表A修改,在第3列后增加了1列(新增列不一定在尾部)
15:15 Kafka Connect集群維護結束,恢復BinlogReader
這個場景中,BinlogReader在15:15恢復后,會繼續從15:05讀取并解析binlog,如果這時從MySQL讀取information_schema來獲取表A的schema信息,那么在15:05-15:10期間binlog和schema是不匹配的,也就無法解析出正確的數據。換句話說,如果debezium讀取binlog有延遲,這段時間主庫schema做了修改,那么讀取主庫information_schema的方案就會有問題了。
要解決這個問題,就要模擬information_schema機制,維護一份當前的schema快照,可這樣就夠了嗎?
回到前面提到的AbstractReader內部設計上,BinlogReader作為生產者,其將解析后的數據投遞到BlockingQueue中,如果在解析binlog過程中遇到了DDL語句(比如alter table add column ...),就會更新當前的schema快照。
這時,如果stop task,如上圖,BlockingQueue中還未被消費的records將被丟棄,如果包含schema修改之前解析出的record,那么下次binlog將從此處開始解析,而debezium存儲的schema快照卻已經對應了修改后的,也會照成binlog和schema的不匹配。在這種邊緣場景下,僅保存當前schema快照的方案就行不通了。當然,后面我們還會提到這種模式同樣不能滿足很多其他場景。
事已至此,我們只好使用終極方案,把每一次的schema變更都保存下來,構造一條完整的schema時間線,確保在解析任一時刻的binlog事件時,都能找到對應版本的schema快照。
Debezium中使用DatabaseHistory來實現該功能,功能已經滿足,不過實現的確是簡陋。MySQLDatabaseHistory會從同步任務啟動時,導出所有的create table語句(參見snapshot過程第6步),在此基礎上,追加記錄每一條DDL語句,debezium為這些DDL存儲提供了內存、文件、kafka topic實現,其中kafka topic必須設置過期策略為永不過期。
當要恢復到任意時刻的schema快照時,從頭開始,逐條解析所有的DDL,疊加修改,直到指定時刻前最后一條DDL。可以看到,這種實現方式的效率是比較低的,當任務持續數個月時,會累積大量的DDL(尤其是在阿里云RDS上,不知道阿里云改了什么,binlog里會產生海量的DDL),一次恢復可能需要數十分鐘乃至數個小時,并且若其中有一處DDL解析錯誤,會導致其后所有的快照都發生錯誤。開發者很早就意識到了這個問題,并且提出了一些改進想法,可能會在不久后有所進展。
看到這里,相信你已經可以理解,為什么有些商業的數據同步引擎對同步過程中的schema變更有所限制,要完備的支持各種情況,著實不是一件容易的事情。
我們前面已經多次提到了DBZ-175,現在我們我們開始討論這個精彩的設計。https://issues.redhat.com/browse/DBZ-175
注:該功能僅作為內部實驗特性,官方文檔未提及,有問題請參考JIRA討論或者閱讀調試源碼。
我們先來補充一些背景,debezium在同步數據過程中,允許通過table.whitelist和table.blacklist指定要同步的表。假設一開始,我們將table.whitelist配置為a,b兩張表,這兩張表完成了Snapshot階段,已經穩定的切換到Stream階段。這時,來了新的同步需求,要再同步c表,并且最好不干擾a,b兩張表的同步進度。那很自然的想法就是我再起一個新的同步任務來處理c表,長此以往,你會發現一個MySQL主庫上掛了很多個“從庫”,對MySQL主庫會照成一定壓力。所以,一個理想的方案便是:修改同步任務的table.whitelist后,debezium可以自動完成新增表的全量和增量同步,并且這個過程不會干擾原有的同步任務;當新老兩批同步任務進度相近時,合二為一,只使用同一個BinlogReader完成后續的stream同步。
簡單吧!怎么實現呢?就是我們前文暫時略過的ParallelSnapshotReader和ReconcilingBinlogReader。
首先描述一下在線加表后,整個Reader的結構。
ChainedReader ├── ParallelSnapshotReader │ ├── OldTablesBinlogReader │ └── ChainedReader │ ├── NewTablesSnapshotReader │ └── NewTablesBinlogReader ├── ReconcilingBinlogReader └── UnifiedBinlogReader
ParallelSnapshotReader就如其名字一樣,在保證不干擾OldTablesBinlogReader運行的情況下,并行的開始對新增表進行全量和增量同步;
當新增表進入stream階段后,OldTablesBinlogReader和NewTablesBinlogReader每一次拉取都會和對方作比較,當兩者的進度相差在一定時間內(默認時5分鐘)時,將兩者停止;
此時ParallelSnapshotReader退出,由ReconcilingBinlogReader將兩個BinlogReader進度同步,即將滯后者追平只領先者;
原有的兩個BinlogReader退出,新建的UnifiedBinlogReader從其位點繼續做新老所有表的stream解析,整個合并過程結束。
這段設計是我在翻閱了JIRA上數位開發者們歷經幾年的討論記錄后,結合代碼調試整理得出的。回想當年剛畢業的我,接到這個在線加表需求時,本以為是不可能實現的事情,直到發現了這個設計,不由得是感嘆和驚喜。
這只是一個初版的實現,在整個過程中,因為元數據的設計問題,并不支持schema的變更,可能正是由于這個原因,嚴謹的開發者們選擇不公開這項功能,僅作為內部實驗特性。
看完上述內容,你們對怎樣分析Debezium MySQL模塊設計有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。