您好,登錄后才能下訂單哦!
這篇文章主要介紹“最簡消息隊列的實現方法”,在日常操作中,相信很多人在最簡消息隊列的實現方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”最簡消息隊列的實現方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
結合其他 mq
的使用經歷,基本的使用流程:
創建 producer
或 consumer
啟動 mq
生產消息/消費消息
對應到 queue
中,大致也是這個:
// 生產者創建工廠 producer := newMockedProducer() // 消費者創建工廠 consumer := newMockedConsumer() // 將生產者以及消費者的創建工廠函數傳遞給 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
我們看看 NewQueue
需要什么構建條件:
producer constructor
consumer constructor
將雙方的工廠函數傳遞給 queue
,由它去執行以及重試。
這兩個需要的目的是將生產者/消費者的構建和消息生產/消費都封裝在 mq
中,而且將生產者/消費者的整套邏輯交給開發者處理:
type ( // 開發者需要實現此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定義了生成Producer的方法 ProducerFactory func() (Producer, error) )
其實也就是將生產者的邏輯交個開發者自己完成,mq
只負責生產者/消費者的消息傳遞和之間的調度。
工廠方法的設計,是將生產者本身和生產消息,這兩個任務都交給 queue
自己來做調度或者重試。
生產消息當然要回到生產者本身:
type mockedProducer struct { total int32 count int32 // 使用waitgroup來模擬任務的完成 wait sync.WaitGroup } // 實現 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生產者編寫都必須實現:
Produce()
:由開發者編寫生產消息的邏輯
AddListener()
:生產者
和生產者類似:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
啟動,然后驗證我們上述的生產者和消費者之間的數據是否傳輸成功:
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 創建 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 當生產者生產完畢,執行 Stop() 關閉生產端生產 go func() { producer.wait.Wait() // mq生產端停止生產,不是mq本身 Stop 運行 q.Stop() }() // 啟動 q.Start() // 驗證生產消費端是否消息消費完成 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) }
以上就是 queue
最簡易的入門使用代碼。開發者可以根據自己的業務實際情況:自由定義生產者/消費者已經生產/消費邏輯。
![image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)
整體流程如上圖:
全體的通信都由 channel
進行
通過加入監聽器 listener
,以及事件觸發 event
,相當于將觸發器邏輯分離出來
生產者有 produceone
,這個是生產消息的邏輯,但是其中的 Produce()
是由開發者編寫【上面的 interface
中正是這個函數】
同理消費者,Consume()
基本的消息流動就入上圖以及上述描寫的,具體的代碼分析我們就留到下一篇,我們????分析里面,尤其是如何控制 channel
是整個設計的核心。
到此,關于“最簡消息隊列的實現方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。