您好,登錄后才能下訂單哦!
Rabbitmq如何保證不丟消息,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
背景介紹:
筆者最近研究了下rabbitmq,便很好奇它是怎么保證不丟失消息的呢?于是便整理了這篇文章來跟大家分享下,自己的理解,如有不準確的地方或者不同的意見,還請各位能夠給出反饋,我們可以討論,相互學習,相互成長。
基礎知識:
在開始探討這個問題之前,筆者還是覺得很有必要將rabbitmq的架構等基礎知識回顧下,如下所示:
對于使用rabbitmq的服務來說,主要由三部分構成,它們分別是:生產者,rabbitmq,消費者。這三者之間是通過網絡來進行通訊的,其中與生產者對應的是exchange,與消費者對應的是connection,而rabbitmq內部又由exchange,queue,connection三部分構成。
消息的流程:消息是由生產者生產了之后,上報給exchange,exchange綁定并存儲到queue中,再傳遞給最終的消費者手里。
如此以來,整個過程就分成了三大場景:
場景1: 生產者與exchange的上報消息,如何保證不丟失?
對于網絡通訊來說,解決丟數據最好的辦法就是,消息確認機制,而rabbitmq里面是通過兩個方式來保證:一種是事務機制,這個是在amqp協議層面保證的,具體操作如下所示:
RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback(), txSelect用于將當前channel設置成transaction模式,txCommit用于提交事務,txRollback用于回滾事務,在通過txSelect開啟事務之后,我們便可以發布消息給broker代理服務器了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由于其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。(備注:采用事務機制實現會降低RabbitMQ的消息吞吐量。)
步驟為:
1.client----發送----->Tx.Select2.broker----發送----->Tx.Select-Ok(之后publish)3.client------發送----->Tx.Commit4.broker------發送---->Tx.Commit-Ok
一種是confrim機制:
原理:消息響應機制,
生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號。
confrim的優勢是,它是異步的,在生產者發送完一個消息之后,不必要等這個消息的返回,就可以繼續處理另外一個消息,等待消息的ack返回之后,再去處理前面發過的消息,類似于多路復用的做法。rabbitmq在收到之后,會回復ack,如果因為rabbitmq自身的問題導致的,會回復nack消息。
對于生產者來說,為了方便確認消息有沒有真正到達rabbitmq端,還需要在生產者端設置超時重發,畢竟網絡里面是可能丟失消息的。
confrim方式使用的API:
https://godoc.org/github.com/streadway/amqp#Channel.Confirm
場景2: 消費者從queue中獲取消息如何保證不丟失?
消息在到達了rabbitmq之后,會將數據保存到queue里面,queue是存到內存里面的,不過rabbitmq提供了持久化的操作,這個策略如下所示:
1.buffer大約1M左右,寫滿之后,就會寫到磁盤中。2.25ms超時時間,buffer不滿的話,超時也會寫到磁盤中。
盡管如此,也有可能會丟數據,特別是當rabbitmq在buffer沒有寫到磁盤的時候,就死掉了。不過rabbitmq也提供了鏡像隊列的方式,利用主備的方式來防止消息丟掉,不過當master和salve同時掛掉的話,還是會丟數據,只不過這種同時掛掉的概率會小很多。(筆者覺得,沒有百分之百的不丟消息,只是丟消息的概率變的很低而已。)
參考文章:https://blog.csdn.net/u013256816/article/details/60875666
場景3: rabbitmq內部如何保證不丟失消息?
對于消費者來說,同樣也是采用了消息響應的方式來防止消息不丟失,不過在這一層使用的是ack機制來處理,不過這里的ack可以設置成不等待ack和等待ack兩種,在這里我們使用的是設置ack。
消費者對于消息的響應通常有下面三個場景:
1.消費者在接收到rabbitmq的消息之后,等處理完消息之后,會主動回復ack消息,rabbitmq在收到ack之后,便會繼續給這個消費者分配下一個消息進行處理。
2.當然rabbitmq也可以回復unack消息,如此以來消息隊列下一次還會繼續將這個消息分配給消費者,來實現消息重處理。
3.消費者先把ack消息回復掉,然后在重新將這個消息放到rabbitmq之中,如此以來通過rabbitmq的隊列特性來實現,消息的重試,這里的重試,不是一直處理這一個消息,而是要等到隊列里面的消息排隊到它才行。
除此之外,rabbitmq還實現了批量的概念,通過qos來設置一次性分配給一個消費者的最大數量,讓消費者一次性消費一批消息,等到處理完了這一批消息,再去分配下一批消息給這一個消費者。
問題1:一旦消費者長時間不回復Ack消息或者消費者卡死了呢,這種場景如何處理?
理論上需要消費者需要實現一個超時處理機制,在一定時間內沒有處理完畢,需要超時回復ack或者unack消息給rabbitmq。
問題2:就算消費者有超時機制,可是一旦消費者在發送ack給rabbitmq的時候,消息丟失,rabbitmq這個消息一直收不到響應消息的話,會怎么辦呢?
rabbitmq提供了一個可以針對消費者掛掉之后的處理機制,在消費者掛掉之后,會探測出來,然后進行后續處理。
rabbitmq還有一個ttl的功能,可以針對消息隊列或者單個消息設置對應的ttl值,一旦ttl超時,消息就會變為dead message, 不會再分配給消費者。在這里我們可以采用這個策略,在消息變成死消息之后,我們可以讓生產者再次生產相同的消息放到rabbitmq當中,如果確定這個消息不在使用了,就直接丟棄這個消息。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。