您好,登錄后才能下訂單哦!
本篇內容介紹了“事件驅動和CQRS有什么作用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
一、前言:從物流詳情開始
二、領域事件
2.1、建模領域事件
2.2、領域事件代碼解讀
2.3、領域事件的存儲
2.3.1、單獨的EventStore
2.3.2、與業務數據一起存儲
2.4、領域事件如何發布
2.4.1、由領域聚合發送領域事件
2.4.2、事件總線VS消息中間件
三、Saga分布式事務
3.1、Saga概要
3.2、Saga實現
3.2.1、協同式(choreography)
3.2.2、編排式(orchestration)
3.2.3、補償策略
四、CQRS
五、自治服務和系統
六、結語
大家對物流跟蹤都不陌生,它詳細記錄了在什么時間發生了什么,并且數據作為重要憑證是不可變的。我理解其背后的價值有這么幾個方面:業務方可以管控每個子過程、知道目前所處的環節;另一方面,當需要追溯時候僅僅通過每一步的記錄就可以回放整個歷史過程。
我在之前的文章中提出過“軟件項目也是人類社會生產關系的范疇,只不過我們所創造的勞動成果看不見摸不著而已”。所以我們可以借鑒物流跟蹤的思路來開發軟件項目,把復雜過程拆解為一個個步驟、子過程、狀態,這和我們事件劃分是一致的,這就是事件驅動的典型案例。
領域事件(Domain Events)是領域驅動設計(Domain Driven Design,DDD)中的一個概念,用于捕獲我們所建模的領域中所發生過的事情。
領域事件本身也作為通用語言(Ubiquitous Language)的一部分成為包括領域專家在內的所有項目成員的交流用語。
比如在前述的跨境物流例子中,貨品達到保稅倉以后需要分派工作人員進行分揀分包,那么“貨品已到達保稅倉”便是一個領域事件。
首先,從業務邏輯來說該事件關系到整個流程的成功或者失敗;同時又將觸發后續子流程;而對于業務方來說,該事件也是一個標志性的里程碑,代表自己的貨品就快配送到自己手中。
所以通常來說,一個領域事件具有以下幾個特征:較高的業務價值,有助于形成完整的業務閉環,將導致進一步的業務操作。這里還要強調一點,領域事件具有明確的邊界。
比如:如果你建模的是餐廳的結賬系統,那么此時的“客戶已到達”便不是你關心的重點,因為你不可能在客戶到達時就立即向對方要錢,而“客戶已下單”才是對結賬系統有用的事件。
在建模領域事件時,我們應該根據限界上下文中的通用語言來命名事件及屬性。如果事件由聚合上的命令操作產生,那么我們通常根據該操作方法的名字來命名領域事件。
對于上面的例子“貨品已到達保稅倉”,我們將發布與之對應的領域事件
GoodsArrivedBondedWarehouseEvent(當然在明確的界限上下文中也可以去掉聚合的名字,直接建模為ArrivedBondedWarehouseEvent,這都是命名方面的習慣)。
事件的名字表明了聚合上的命令方法在執行成功之后所發生的事情,換句話說待定項以及不確定的狀態是不能作為領域事件的。
一個行之有效的方法是畫出當前業務的狀態流轉圖,包含前置操作以及引起的狀態變更,這里表達的是已經變更完成的狀態所以我們不用過去時態表示,比如刪除或者取消,即代表已經刪除或者已經取消。
然后對于其中的節點進行事件建模。如下圖是文件云端存儲的業務,我們分別對預上傳、上傳完成確認、刪除等環節建模“過去時”事件,PreUploadedEvent、ConfirmUploadedEvent、RemovedEvent。
package domain.event; import java.util.Date; import java.util.UUID; public class DomainEvent { /** * 領域事件還包含了唯一ID, * 但是該ID并不是實體(Entity)層面的ID概念, * 而是主要用于事件追溯和日志。 * 如果是數據庫存儲,該字段通常為唯一索引。 */ private final String id; /** * 創建時間用于追溯,另一方面不管使用了 * 哪種事件存儲都有可能遇到事件延遲, * 我們通過創建時間能夠確保其發生順序。 */ private final Date occurredOn; public DomainEvent() { this.id = String.valueOf(UUID.randomUUID()); this.occurredOn = new Date(); } }
在創建領域事件時,需要注意2點:
領域事件本身應該是不變的(Immutable);
領域事件應該攜帶與事件發生時相關的上下文數據信息,但是并不是整個聚合根的狀態數據。例如,在創建訂單時可以攜帶訂單的基本信息,而對于用戶更新訂單收貨地址事件AddressUpdatedEvent事件,只需要包含訂單、用戶以及新的地址等信息即可。
public class AddressUpdatedEvent extends DomainEvent { //通過userId+orderId來校驗訂單的合法性; private String userId; private String orderId; //新的地址 private Address address; //略去具體業務邏輯 }
事件的不可變性與可追溯性都決定了其必須要持久化的原則,我們來看看常見的幾種方案。
有的業務場景中會創建一個單獨的事件存儲中心,可能是Mysql、Redis、Mongo、甚至文件存儲等。這里以Mysql舉例,business_code、event_code用來區分不同業務的不同事件,具體的命名規則可以根據實際需要。
這里需要注意該數據源與業務數據源不一致的場景,我們要確保當業務數據更新以后事件能夠準確無誤的記錄下來,實踐中盡量避免使用分布式事務,或者盡量避免其跨庫的場景,否則你就得想想如何補償了。千萬要避免,用戶更新了收貨地址,但是AddressUpdatedEvent事件保存失敗。
總的原則就是對分布式事務Say No,無論如何,我相信方法總比問題多,在實踐中我們總可以想到解決方案,區別在于該方案是否簡潔、是否做到了解耦。
# 考慮是否需要分表,事件存儲建議邏輯簡單 CREATE TABLE `event_store` ( `event_id` int(11) NOT NULL auto increment, `event_code` varchar(32) NOT NULL, `event_name` varchar(64) NOT NULL, `event_body` varchar(4096) NOT NULL, `occurred_on` datetime NOT NULL, `business_code` varchar(128) NOT NULL, UNIQUE KEY (`event id`) ) ENGINE=InnoDB COMMENT '事件存儲表';
在分布式架構中,每個模塊都做的相對比較小,準確的說是“自治”。如果當前業務數據量較小,可以將事件與業務數據一起存儲,用相關標識區分是真實的業務數據還是事件記錄;或者在當前業務數據庫中建立該業務自己的事件存儲,但是要考慮到事件存儲的量級必然大于真實的業務數據,考慮是否需要分表。
這種方案的優勢:數據自治;避免分布式事務;不需要額外的事件存儲中心。當然其劣勢就是不能復用。
/* * 一個關于比賽的充血模型例子 * 貧血模型會構造一個MatchService,我們這里通過模型來觸發相應的事件 * 本例中略去了具體的業務細節 */ public class Match { public void start() { //構造Event.... MatchEvent matchStartedEvent = new MatchStartedEvent(); //略去具體業務邏輯 DefaultDomainEventBus.publish(matchStartedEvent); } public void finish() { //構造Event.... MatchEvent matchFinishedEvent = new MatchFinishedEvent(); //略去具體業務邏輯 DefaultDomainEventBus.publish(matchFinishedEvent); } //略去Match對象基本屬性 }
微服務內的領域事件可以通過事件總線或利用應用服務實現不同聚合之間的業務協同。即微服務內發生領域事件時,由于大部分事件的集成發生在同一個線程內,不一定需要引入消息中間件。但一個事件如果同時更新多個聚合數據,按照 DDD“一個事務只更新一個聚合根”的原則,可以考慮引入消息中間件,通過異步化的方式,對微服務內不同的聚合根采用不同的事務
我們看看如何使用 Saga 模式維護數據一致性?
Saga 是一種在微服務架構中維護數據一致性的機制,它可以避免分布式事務所帶來的問題。
一個 Saga 表示需要更新的多個服務中的一個,即Saga由一連串的本地事務組成。每一個本地事務負責更新它所在服務的私有數據庫,這些操作仍舊依賴于我們所熟悉的ACID事務框架和函數庫。
模式:Saga
通過使用異步消息來協調一系列本地事務,從而維護多個服務之間的數據一致性。
請參閱(強烈建議):https://microservices.io/patterns/data/saga.html
Saga與TCC相比少了一步Try的操作,TCC無論最終事務成功失敗都需要與事務參與方交互兩次。而Saga在事務成功的情況下只需要與事務參與方交互一次, 如果事務失敗,需要額外進行補償回滾。
每個Saga由一系列sub-transaction Ti 組成;
每個Ti 都有對應的補償動作Ci,補償動作用于撤銷Ti造成的結果;
可以看到,和TCC相比,Saga沒有“預留”動作,它的Ti就是直接提交到庫。
Saga的執行順序有兩種:
success:T1, T2, T3, ..., Tn ;
failure:T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n;
所以我們可以看到Saga的撤銷十分關鍵,可以說使用Saga的難點就在于如何設計你的回滾策略。
通過上面的例子我們對Saga有了初步的體感,現在來深入探討下如何實現。當通過系統命令啟動Saga時,協調邏輯必須選擇并通知第一個Saga參與方執行本地事務。一旦該事務完成,Saga協調選擇并調用下一個Saga參與方。
這個過程一直持續到Saga執行完所有步驟。如果任何本地事務失敗,則 Saga必須以相反的順序執行補償事務。以下幾種不同的方法可用來構建Saga的協調邏輯。
把 Saga 的決策和執行順序邏輯分布在 Saga的每一個參與方中,它們通過交換事件的方式來進行溝通。
(引用于《微服務架構設計模式》相關章節)
Order服務創建一個Order并發布OrderCreated事件。
Consumer服務消費OrderCreated事件,驗證消費者是否可以下訂單,并發布ConsumerVerified事件。
Kitchen服務消費OrderCreated事件,驗證訂單,在CREATE_PENDING狀態下創建故障單,并發布TicketCreated事件。
Accounting服務消費OrderCreated事件并創建一個處于PENDING狀態的Credit CardAuthorization。
Accounting服務消費TicketCreated和ConsumerVerified事件,向消費者的信用卡收費,并發布信用卡授權失敗事件。
Kitchen服務使用信用卡授權失敗事件并將故障單的狀態更改為REJECTED。
訂單服務消費信用卡授權失敗事件,并將訂單狀態更改為已拒絕。
把Saga的決策和執行順序邏輯集中在一個Saga編排器類中。Saga 編排器發出命令式消息給各個 Saga 參與方,指示這些參與方服務完成具體操作(本地事務)。類似于一個狀態機,當參與方服務完成操作以后會給編排器發送一個狀態指令,以決定下一步做什么。
(引用于《微服務架構設計模式》相關章節)
我們來分析一下執行流程
Order Service首先創建一個Order和一個創建訂單控制器。之后,路徑的流程如下:
Saga orchestrator向Consumer Service發送Verify Consumer命令。
Consumer Service回復Consumer Verified消息。
Saga orchestrator向Kitchen Service發送Create Ticket命令。
Kitchen Service回復Ticket Created消息。
Saga協調器向Accounting Service發送授權卡消息。
Accounting服務部門使用卡片授權消息回復。
Saga orchestrator向Kitchen Service發送Approve Ticket命令。
Saga orchestrator向訂單服務發送批準訂單命令。
之前的描述中我們說過Saga最重要的是如何處理異常,狀態機還定義了許多異常狀態。如上面的6就會發生失敗,觸發AuthorizeCardFailure,此時我們就要結束訂單并把之前提交的事務進行回滾。這里面要區分哪些是校驗性事務、哪些是需要補償的事務。
一個Saga由三種不同類型的事務組成:可補償性事務(可以回滾,因此有一個補償事務);關鍵性事務(這是 Saga的成敗關鍵點,比如4賬戶代扣);以及可重復性事務,它不需要回滾并保證能夠完成(比如6更新狀態)。
在Create Order Saga 中,createOrder()、createTicket()步驟是可補償性事務且具有撤銷其更新的補償事務。
verifyConsumerDetails()事務是只讀的,因此不需要補償事務。authorizeCreditCard()事務是這個 Saga的關鍵性事務。如果消費者的信用卡可以授權,那么這個Saga保證完成。approveTicket()和approveRestaurantOrder()步驟是在關鍵性事務之后的可重復性事務。
認真拆解每個步驟、然后評估其補償策略尤為重要,正如你看到的,每種類型的事務在對策中扮演著不同的角色。
前面講述了事件的概念,又分析了Saga如何解決復雜事務,現在我們來看看CQRS為什么在DDD中廣泛被采用。除了讀寫分離的特征以外,我們用事件驅動的方式來實踐Command邏輯能有效降低業務的復雜度。
當你明白如何建模事件、如何規避復雜事務,明白什么時候用消息中間件、什么時候采用事件總線,才能理解為什么是CQRS、怎么正確應用。
下面是我們項目中的設計,這里為什么會出現Read/Write Service,是為了封裝調用,service內部是基于聚合發送事件。因為我發現在實際項目中,很多人都會第一時間問我要XXXService而不是XXX模型,所以在DDD沒有完全普及的項目中建議大家采取這種居中策略。這也符合咱們的解耦,對方依賴我的抽象能力,然而我內部是基于DDD還是傳統的流程代碼對其是無關透明的。
我們先來看看事件以及處理器的時序關系。
這里還是以文件云端存儲業務為例,下面是一些處理器的核心代碼。注釋行是對代碼功能、用法以及擴展方面的解讀,請認真閱讀。
package domain; import domain.event.DomainEvent; import domain.handler.event.DomainEventHandler; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class DomainRegistry { private Map<String, List<DomainEventHandler>> handlerMap = new HashMap<String, List<DomainEventHandler>>(); private static DomainRegistry instance; private DomainRegistry() { } public static DomainRegistry getInstance() { if (instance == null) { instance = new DomainRegistry(); } return instance; } public Map<String, List<DomainEventHandler>> getHandlerMap() { return handlerMap; } public List<DomainEventHandler> find(String name) { if (name == null) { return null; } return handlerMap.get(name); } //事件注冊與維護,register分多少個場景根據業務拆分, //這里是業務流的核心。如果多個事件需要維護前后依賴關系, //可以維護一個priority邏輯 public void register(Class<? extends DomainEvent> domainEvent, DomainEventHandler handler) { if (domainEvent == null) { return; } if (handlerMap.get(domainEvent.getName()) == null) { handlerMap.put(domainEvent.getName(), new ArrayList<DomainEventHandler>()); } handlerMap.get(domainEvent.getName()).add(handler); //按照優先級進行事件處理器排序 。。。 } }
文件上傳完畢事件的例子。
package domain.handler.event; import domain.DomainRegistry; import domain.StateDispatcher; import domain.entity.meta.MetaActionEnums; import domain.event.DomainEvent; import domain.event.MetaEvent; import domain.repository.meta.MetaRepository; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @Description:一個事件操作的處理器 * 我們混合使用了Saga的兩種模式,外層事件交互; * 對于單個復雜的事件內部采取狀態流轉實現。 */ @Component public class MetaConfirmUploadedHandler implements DomainEventHandler { @Resource private MetaRepository metaRepository; public void handle(DomainEvent event) { //1.我們在當前的上下文中定義個ThreadLocal變量 //用于存放事件影響的聚合根信息(線程共享) //2.當然如果有需要額外的信息,可以基于event所 //攜帶的信息構造Specification從repository獲取 // 代碼示例 // metaRepository.queryBySpecification(SpecificationFactory.build(event)); DomainEvent domainEvent = metaRepository.load(); //此處是我們的邏輯 。。。。 //對于單個操作比較復雜的,可以使用狀態流轉進一步拆分 domainEvent.setStatus(nextState); //在事件觸發之后,仍需要一個狀態跟蹤器來解決大事務問題 //Saga編排式 StateDispatcher.dispatch(); } @PostConstruct public void autoRegister() { //此處可以更加細分,注冊在哪一類場景中,這也是事件驅動的強大、靈活之處。 //避免了if...else判斷。我們可以有這樣的意識,一旦你的邏輯里面充斥了大量 //switch、if的時候來看看自己注冊的場景是否可以繼續細分 DomainRegistry.getInstance().register(MetaEvent.class, this); } public String getAction() { return MetaActionEnums.CONFIRM_UPLOADED.name(); } //適用于前后依賴的事件,通過優先級指定執行順序 public Integer getPriority() { return PriorityEnums.FIRST.getValue(); } }
事件總線邏輯
package domain; import domain.event.DomainEvent; import domain.handler.event.DomainEventHandler; import java.util.List; public class DefaultDomainEventBus { public static void publish(DomainEvent event, String action, EventCallback callback) { List<DomainEventHandler> handlers = DomainRegistry.getInstance(). find(event.getClass().getName()); handlers.stream().forEach(handler -> { if (action != null && action.equals(handler.getAction())) { Exception e = null; boolean result = true; try { handler.handle(event); } catch (Exception ex) { e = ex; result = false; //自定義異常處理 。。。 } finally { //write into event store saveEvent(event); } //根據實際業務處理回調場景,DefaultEventCallback可以返回 if (callback != null) { callback.callback(event, action, result, e); } } }); } }
DDD中強調限界上下文的自治特性,事實上,從更小的粒度來看,對象仍然需要具備自治的這四個特性,即:最小完備、自我履行、穩定空間、獨立進化。其中自我履行是重點,因為不強依賴外部所以穩定、因為穩定才可能獨立進化。這就是六邊形架構在DDD中較為普遍的原因。
本文所講述的事件、Saga、CQRS的方案均可以單獨使用,可以應用到你的某個method、或者你的整個package。項目中我們并不一定要實踐一整套CQRS,只要其中的某些思想解決了我們項目中的某個問題就足夠了。
也許你現在已經磨刀霍霍,準備在項目中實踐一下這些技巧。不過我們要明白“每一個硬幣都有兩面性”,我們不僅看到高擴展、解耦的、易編排的優點以外,仍然要明白其所帶來的問題。利弊分析以后再去決定如何實現才是正確的應對之道。
這類編程模式有一定的學習曲線;
基于消息傳遞的應用程序的復雜性;
處理事件的演化有一定難度;
刪除數據存在一定難度;
查詢事件存儲庫非常有挑戰性。
不過我們還是要認識到在其適合的場景中,六邊形架構以及DDD戰術將加速我們的領域建模過程,也迫使我們從嚴格的通用語言角度來解釋一個領域,而不是一個個需求。任何更強調核心域而不是技術實現的方式都可以增加業務價值,并使我們獲得更大的競爭優勢。
“事件驅動和CQRS有什么作用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。