您好,登錄后才能下訂單哦!
go.cqrs中DomainRepository的作用是什么,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
// DomainRepository is the interface that all domain repositories should implement. type DomainRepository interface { //Loads an aggregate of the given type and ID Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error) //Saves the aggregate. Save(aggregate AggregateRoot, expectedVersion *int) error }
DomainRepository定義了Load、Save方法
// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository // that uses GetEventStore for persistence type GetEventStoreCommonDomainRepo struct { eventStore *goes.Client eventBus EventBus streamNameDelegate StreamNamer aggregateFactory AggregateFactory eventFactory EventFactory } // Load will load all events from a stream and apply those events to an aggregate // of the type specified. // // The aggregate type and id will be passed to the configured StreamNamer to // get the stream name. func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) { if r.aggregateFactory == nil { return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.") } if r.streamNameDelegate == nil { return nil, fmt.Errorf("The common domain repository has no stream name delegate.") } if r.eventFactory == nil { return nil, fmt.Errorf("The common domain has no Event Factory.") } aggregate := r.aggregateFactory.GetAggregate(aggregateType, id) if aggregate == nil { return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType) } streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id) if err != nil { return nil, err } stream := r.eventStore.NewStreamReader(streamName) for stream.Next() { switch err := stream.Err().(type) { case nil: break case *url.Error, *goes.ErrTemporarilyUnavailable: return nil, &ErrRepositoryUnavailable{} case *goes.ErrNoMoreEvents: return aggregate, nil case *goes.ErrUnauthorized: return nil, &ErrUnauthorized{} case *goes.ErrNotFound: return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id} default: return nil, &ErrUnexpected{Err: err} } event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType) //TODO: No test for meta meta := make(map[string]string) stream.Scan(event, &meta) if stream.Err() != nil { return nil, stream.Err() } em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber)) for k, v := range meta { em.SetHeader(k, v) } aggregate.Apply(em, false) aggregate.IncrementVersion() } return aggregate, nil } // Save persists an aggregate func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error { if r.streamNameDelegate == nil { return fmt.Errorf("The common domain repository has no stream name delagate.") } resultEvents := aggregate.GetChanges() streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID()) if err != nil { return err } if len(resultEvents) > 0 { evs := make([]*goes.Event, len(resultEvents)) for k, v := range resultEvents { //TODO: There is no test for this code v.SetHeader("AggregateID", aggregate.AggregateID()) evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders()) } streamWriter := r.eventStore.NewStreamWriter(streamName) err := streamWriter.Append(expectedVersion, evs...) switch e := err.(type) { case nil: break case *goes.ErrConcurrencyViolation: return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName} case *goes.ErrUnauthorized: return &ErrUnauthorized{} case *goes.ErrTemporarilyUnavailable: return &ErrRepositoryUnavailable{} default: return &ErrUnexpected{Err: e} } } aggregate.ClearChanges() for k, v := range resultEvents { if expectedVersion == nil { r.eventBus.PublishEvent(v) } else { em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1)) r.eventBus.PublishEvent(em) } } return nil }
GetEventStoreCommonDomainRepo定義了eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory屬性,其Load方法先通過r.aggregateFactory.GetAggregate獲取aggregate,再通過r.streamNameDelegate.GetStreamName(aggregateType, id)獲取streamName,然后通過r.eventStore.NewStreamReader去遍歷event,挨個執行aggregate.Apply(em, false)及aggregate.IncrementVersion();其Save方法先通過aggregate.GetChanges()獲取resultEvents,再遍歷resultEvents構造goes.Event,之后通過streamWriter.Append寫入,然后執行aggregate.ClearChanges(),最后執行r.eventBus.PublishEvent
go.cqrs的DomainRepository定義了Load、Save方法;GetEventStoreCommonDomainRepo實現了DomainRepository接口,其Load方法主要是讀取event,然后挨個執行aggregate.Apply;其Save方法主要是將aggregate.GetChanges()轉換為event,然后通過streamWriter.Append寫入,然后執行aggregate.ClearChanges(),最后執行r.eventBus.PublishEvent。
看完上述內容,你們掌握go.cqrs中DomainRepository的作用是什么的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。