您好,登錄后才能下訂單哦!
如何進行instance模塊的源碼解析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
instance模塊下面也分為三個子模塊,core、manager、spring。
其中,core是instance的核心邏輯 。
而manager和spring只是兩種不同的instance配置讀取方式,manager通過http請求讀取admin的配置,spring通過配置文件的方式讀取。
主要控制邏輯我們在deployer模塊源碼分析中提到過,就是在CanalController類 的instanceGenerator,配置參數是canal.instance.global.mode
源碼如下
代碼不多,就兩個接口,兩個類。
這個接口只有一個方法
具體實現就是開頭提到的兩種,PlainCanalInstanceGenerator和SpringCanalInstanceGenerator,分別在manager子模塊和spring子模塊中實現。
具體選擇就是開頭的那個canalController里面根據canal.instance.global.mode來選擇。
先看一張官方文檔的圖,這個前面的文章已經分析過了。
server代表一個canal-server運行實例,對應于一個jvm。server內部可以有多個instance。
Instance內部有4個主要組件:
在這個接口中,就定義了獲取4個組件的方法,以及新版本增加的mqProducer的配置信息(mqProducer在server模塊解析中介紹過了,可以回頭去看看)
我們簡單看下4個組件接口的各個實現類。
CanalEventParser接口實現類(paser模塊):
CanalEventSink接口實現類(sink模塊):
EntryEventSink:普通的單個parser的sink操作,進行數據過濾,加工,分發
GroupEventSink:用于分庫分表的場景,對應GroupEventParser的數據解析,然后實現基于歸并排序的sink處理
CanalEventStore接口實現類(store模塊):
MemoryEventStoreWithBuffer:基于內存實現存儲store
CanalMetaManager(meta模塊):
ZooKeeperMetaManager:將元數據存存儲到zk中
MemoryMetaManager:將元數據存儲到內存中
MixedMetaManager:組合memory + zookeeper的使用模式
PeriodMixedMetaManager:基于定時刷新的策略的mixed實現
FileMixedMetaManager:先寫內存,然后定時刷新數據到File
關于這些實現的具體細節,我們在相應模塊的源碼分析時,進行講解。目前只需要知道,一些組件有多種實現,因此組合工作方式有多種。
AbstractCanalInstance是canalInstance的抽象類,維護了相關組件的引用。
同時,也實現了canalInstance接口的subscribeChange方法。
這個抽象類有兩個實現,CanalInstanceWithManager 和 CanalInstanceWithSpring。
AbstractCanalInstance的初始化過程都是在實現類中完成的。
如果選擇admin控制模式,那就是在CanalInstanceWithManager中完成;如果是spring模式,就在CanalInstanceWithSpring中完成。
這里有個小發現:
仔細看下實際代碼調用我們發現,CanalInstanceWithManager是給ManagerCanalInstanceGenerator使用的,而這個generator實際上沒有被使用到。如果使用admin模式,本文開頭我們就看到了,使用了PlainCanalInstanceGenerator。PlainCanalInstanceGenerator里面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然后替換系統變量,然后再從spring的beanfactory中構建具體的實例。
AbstractCanalInstance類實現了CanalInstance接口的subscribeChange方法。
我們看到,如果訂閱關系發生變化,就做一些操作,這里看的話,主要就是更新了一下filter。
filter規定了需要訂閱哪些庫,哪些表。
啟動沒什么特別的邏輯,就是按照順序依次啟動各個組件。
順序為 metaManager -> alarmHandler -> eventStore -> eventSink -> eventParser。
啟動順序主要跟依賴關系有關,元信息相關的管理跟所有都有關,所以metaManager最先啟動,其他的按照彼此之間的關系一一啟動。
這里我們發現,在啟動eventParser的時候做了特殊處理,分別是beforeStartEventParser 和 afterStartEventParser。我們在2.3.4專門講一下。
stop也沒什么特殊的,就是依次關閉各個組件。
關閉的順序就是start的逆過程。
這里就不貼代碼了。
在start和stop方法中的eventParser前后都有特殊的處理,start的beforeStartEventParser 和 afterStartEventParser,Stop的beforeStopEventParser 和 afterStopEventParser。
這個其實跟eventParser的設計有關。
每個EventParser都會關聯兩個內部組件
CanalLogPositionManager : 記錄binlog 最后一次解析成功位置信息,主要是描述下一次canal啟動的位點 CanalHAController: 控制 EventParser 的鏈接主機管理,判斷當前該鏈接哪個mysql數據庫
所以這兩個beforexxx、afterxxxx方法做的主要是CanalLogPositionManager和CanalHAController的啟停工作。
可以看到AbstractCanalInstance除了負責啟動和停止其內部組件,就沒有其他工作了。
eventParser在AbstractCanalInstance中啟動后,就會自行開啟多線程任務dump數據,通過eventSink投遞給eventStore。
而對eventStore的操作邏輯,實際上都是在CanalServerWithEmbedded中完成的,我們可以回顧一下CanalServerWithEmbedded中 getWithoutAck( ) 的相關邏輯。
包括:
根據clientIdentity的destination獲取對應的instance
獲取到流式數據中的最后一批獲取的位置positionRanges(跟batchId有關聯,就是上面那個map里面的)
從cananlEventStore里面獲取binlog,轉化為event。一般是從最后的一個batchId位置開始,如果之前沒有batchId,那么就從cursor記錄的消費位點開始;如果cursor為空,那只能從eventStore的第一條消息開始。(這里幾個位置關系再想一想,跟ack有關,畫個圖)
event轉化為entry,并生成新的batchId,組合成message返回給客戶端
所以,其實這里只是簡單的啟動和停止,組件的交互邏輯是在CanalServerWithEmbedded中get出instance的各個組件來進行實現的。
前面提到了,PlainCanalInstanceGenerator里面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然后替換系統變量,然后再從spring的beanfactory中構建具體的實例。
所以我們重點關注spring子模塊的配置方式即可。
就下面四個類
基于spring容器啟動canal實例,方便獨立于manager啟動。
繼承了AbstractCanalInstance,其實就是一系列組件的setter方法,就不貼源碼了。
具體配置是基于spring的xml來做的.
當我們配置加載方式為spring時,創建的CanalInstance實例類型都是CanalInstanceWithSpring。canal將會尋找本地的spring配置文件來創建instance實例。canal默認提供了以下幾種spring配置文件:
四個配置文件中,對CanalInstanceWithSpring都采用了同樣的配置方式:
當然,具體每個組件的ref在不同配置文件中有所不同。
最主要的就是metaManager 和eventParser 這兩個配置有所不同,可能在內存、文件或zk進行存儲。
eventStore 、和eventSink 定義都是相同的,eventStore目前的開源版本中eventStore只有一種基于內存的實現,eventSink其作用是eventParser和eventStore的連接器,進行數據過濾,加工,分發的工作。不涉及存儲,也就沒有必要針對內存、file、或者zk進行區分。
這個是具體創建instance的邏輯。
順便看下PlainCanalInstanceGenerator里面的實現,就是多了從遠端拉取配置,然后用PropertyPlaceholderConfigurer進行了變量替換,然后還是用beanFactory來獲取實例。
com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer繼承了org.springframework.beans.factory.config.PropertyPlaceholderConfigurer,設置動態properties,替換掉本地properties。
其實這個模塊的東西比較少,沒有什么特別復雜的邏輯。
我們來回顧下幾個問題
主要有基于spring和基于遠端配置兩種方式,前者的實現在,后者的實現在PlainCanalInstanceGenerator
PlainCanalInstanceGenerator中使用了spring的PropertyPlaceholderConfigurer來覆蓋配置
包括了parser、sink、store、metamanager等組件,但是只負責了啟動和停止邏輯,具體交互邏輯還是在CanalServerWithEmbedded中實現的。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。