您好,登錄后才能下訂單哦!
有很多人問過我這么一類問題:RabbitMQ如何確保消息可靠?很多時候,筆者的回答都是:說來話長的事情何來長話短說。的確,要確保消息可靠不只是單單幾句就能夠敘述明白的,包括Kafka也是如此。可靠并不是一個絕對的概念,曾經有人也留言說過類似全部磁盤損毀也會導致消息丟失,筆者戲答:還有機房被炸了也會導致消息丟失。可靠性是一個相對的概念,在條件合理的范圍內系統所能確保的多少個9的可靠性。一切盡可能的趨于完美而無法企及于完美。
我們可以盡可能的確保RabbitMQ的消息可靠。在詳細論述RabbitMQ的消息可靠性之前,我們先來回顧下消息在RabbitMQ中的經由之路。
如圖所示,從AMQP協議層面上來說:
消息先從生產者Producer出發到達交換器Exchange;
交換器Exchange根據路由規則將消息轉發對應的隊列Queue之上;
消息在隊列Queue上進行存儲;
消費者Consumer訂閱隊列Queue并進行消費。
我們對于消息可靠性的分析也從這四個階段來一一探討。
消息從生產者發出到達交換器Exchange,在這個過程中可以發生各種情況,生產者客戶端發送出去之后可以發生網絡丟包、網絡故障等造成消息丟失。一般情況下如果不采取措施,生產者無法感知消息是否已經正確無誤的發送到交換器中。如果消息在傳輸到Exchange的過程中發生失敗而可以讓生產者感知的話,生產者可以進行進一步的處理動作,比如重新投遞相關消息以確保消息的可靠性。
為此AMQP協議在建立之初就考慮到這種情況而提供了事務機制。RabbitMQ客戶端中與事務機制相關的方法有三個:channel.txSelect、channel.txCommit以及channel.txRollback。channel.txSelect用于將當前的信道設置成事務模式,channel.txCommit用于提交事務,而channel.txRollback用于事務回滾。在通過channel.txSelect方法開啟事務之后,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定到達了RabbitMQ中,如果在事務提交執行之前由于RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。注意這里的RabbitMQ中的事務機制與大多數數據庫中的事務概念并不相同,需要注意區分。
事務確實能夠解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,否則我們便可在捕獲異常之后進行事務回滾,與此同時可以進行消息重發。但是使用事務機制的話會“吸干”RabbitMQ的性能,那么有沒有更好的方法既能保證消息發送方確認消息已經正確送達,又能基本上不帶來性能上的損失呢?從AMQP協議層面來看并沒有更好的辦法,但是RabbitMQ提供了一個改進方案,即發送方確認機制(publisher confirm)。
生產者將信道設置成confirm(確認)模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。RabbitMQ回傳給生產者的確認消息中的deliveryTag包含了確認消息的序號,此外RabbitMQ也可以設置channel.basicAck方法中的multiple參數,表示到這個序號之前的所有消息都已經得到了處理。
事務機制在一條消息發送之后會使發送端阻塞,以等待RabbitMQ的回應,之后才能繼續發送下一條消息。相比之下,發送方確認機制最大的好處在于它是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程序同樣可以在回調方法中處理該nack命令。
生產者通過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置為confirm模式,之后RabbitMQ會返回 Confirm.Select-Ok命令表示同意生產者將當前信道設置為confirm模式。所有被發送的后續消息都被ack或者nack一次,不會出現一條消息即被ack又被nack的情況。并且RabbitMQ也并沒有對消息被confirm的快慢做任何保證。
事務機制和publisher confirm機制兩者是互斥的,不能共存。如果企圖將已開啟事務模式的信道再設置為publisher confirm模式,RabbitMQ會報錯:{amqp_error, precondition_failed, “cannot switch from tx to confirm mode”, ‘confirm.select’},或者如果企圖將已開啟publisher confirm模式的信道在設置為事務模式的話,RabbitMQ也會報錯:{amqp_error, precondition_failed, “cannot switch from confirm to tx mode”, ‘tx.select’ }。
事務機制和publisher confirm機制確保的是消息能夠正確的發送至RabbitMQ,這里的“發送至RabbitMQ”的含義是指消息被正確的發往至RabbitMQ的交換器,如果此交換器沒有匹配的隊列的話,那么消息也將會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的隊列。更進一步的講,發送方要配合mandatory參數或者備份交換器一起使用來提高消息傳輸的可靠性。
mandatory和immediate是channel.basicPublish方法中的兩個參數,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。而RabbitMQ提供的備份交換器(Alternate Exchange)可以將未能被交換器路由的消息(沒有綁定隊列或者沒有匹配的綁定)存儲起來,而不用返回給客戶端。
RabbitMQ 3.0版本開始去掉了對于immediate參數的支持,對此RabbitMQ官方解釋是:immediate參數會影響鏡像隊列的性能,增加代碼復雜性,建議采用TTL和DLX的方法替代。所以本文只簡單介紹mandatory和備份交換器。
當mandatory參數設為true時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列的話,那么RabbitMQ會調用Basic.Return命令將消息返回給生產者。當mandatory參數設置為false時,出現上述情形的話,消息直接被丟棄。 那么生產者如何獲取到沒有被正確路由到合適隊列的消息呢?這時候可以通過調用channel.addReturnListener來添加ReturnListener監聽器實現。使用mandatory參數的關鍵代碼如下所示:
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的結果是:" + message);
}
});
上面代碼中生產者沒有成功的將消息路由到隊列,此時RabbitMQ會通過Basic.Return返回“mandatory test”這條消息,之后生產者客戶端通過ReturnListener監聽到了這個事件,上面代碼的最后輸出應該是“Basic.Return返回的結果是:mandatory test”。
生產者可以通過ReturnListener中返回的消息來重新投遞或者其它方案來提高消息的可靠性。
備份交換器,英文名稱Alternate Exchange,簡稱AE,或者更直白的可以稱之為“備胎交換器”。生產者在發送消息的時候如果不設置mandatory參數,那么消息在未被路由的情況下將會丟失,如果設置了mandatory參數,那么需要添加ReturnListener的編程邏輯,生產者的代碼將變得復雜化。如果你不想復雜化生產者的編程邏輯,又不想消息丟失,那么可以使用備份交換器,這樣可以將未被路由的消息存儲在RabbitMQ中,再在需要的時候去處理這些消息。 可以通過在聲明交換器(調用channel.exchangeDeclare方法)的時候添加alternate-exchange參數來實現,也可以通過策略的方式實現。如果兩者同時使用的話,前者的優先級更高,會覆蓋掉Policy的設置。
參考下圖,如果此時我們發送一條消息到normalExchange上,當路由鍵等于“normalKey”的時候,消息能正確路由到normalQueue這個隊列中。如果路由鍵設為其他值,比如“errorKey”,即消息不能被正確的路由到與normalExchange綁定的任何隊列上,此時就會發送給myAe,進而發送到unroutedQueue這個隊列。
備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設置為fanout類型,如若讀者想設置為direct或者topic的類型也沒有什么不妥。需要注意的是消息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的。備份交換器的實質就是原有交換器的一個“備胎”,所有無法正確路由的消息都發往這個備份交換器中,可以為所有的交換器設置同一個AE,不過這里需要提前確保的是AE已經正確的綁定了隊列,最好類型也是fanout的。如果備份交換器和mandatory參數一起使用,那么mandatory參數無效。
mandatory或者AE可以讓消息在路由到隊列之前得到極大的可靠性保障,但是消息存入隊列之后的可靠性又如何保證?
首先是持久化。持久化可以提高隊列的可靠性,以防在異常情況(重啟、關閉、宕機等)下的數據丟失。隊列的持久化是通過在聲明隊列時將durable參數置為true實現的,如果隊列不設置持久化,那么在RabbitMQ服務重啟之后,相關隊列的元數據將會丟失,此時數據也會丟失。正所謂“皮之不存,毛將焉附”,隊列都沒有了,消息又能存在哪里呢?隊列的持久化能保證其本身的元數據不會因異常情況而丟失,但是并不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將其設置為持久化。通過將消息的投遞模式(BasicProperties中的deliveryMode屬性)設置為2即可實現消息的持久化。
設置了隊列和消息的持久化,當RabbitMQ服務重啟之后,消息依舊存在。單單只設置隊列持久化,重啟之后消息會丟失;單單只設置消息的持久化,重啟之后隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無意義。
在持久化的消息正確存入RabbitMQ之后,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁盤之中。RabbitMQ并不會為每條消息都做同步存盤(調用內核的fsync6方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將會丟失。
如果在Phase1中采用了事務機制或者publisher confirm機制的話,服務端的返回是在消息落盤之后執行的,這樣可以進一步的提高了消息的可靠性。但是即便如此也無法避免單機故障且無法修復(比如磁盤損毀)而引起的消息丟失,這里就需要引入鏡像隊列。鏡像隊列相當于配置了副本,絕大多數分布式的東西都有多副本的概念來確保HA。在鏡像隊列中,如果主節點(master)在此特殊時間內掛掉,可以自動切換到從節點(slave),這樣有效的保證了高可用性,除非整個集群都掛掉。雖然這樣也不能完全的保證RabbitMQ消息不丟失(比如機房被炸。。。),但是配置了鏡像隊列要比沒有配置鏡像隊列的可靠性要高很多,在實際生產環境中的關鍵業務隊列一般都會設置鏡像隊列。
進一步的從消費者的角度來說,如果在消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據丟失。
為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時,可以指定autoAck參數,當autoAck等于false時,RabbitMQ會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,之后再刪除)。當autoAck等于true時,RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正的消費到了這些消息。
采用消息確認機制后,只要設置autoAck參數為false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直等待持有消息直到消費者顯式調用Basic.Ack命令為止。
當autoAck參數置為false,對于RabbitMQ服務端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者確認信號的消息。如果RabbitMQ一直沒有收到消費者的確認信號,并且消費此消息的消費者已經斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
RabbitMQ不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開,這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。
如果消息消費失敗,也可以調用Basic.Reject或者Basic.Nack來拒絕當前消息而不是確認,如果只是簡單的拒絕那么消息會丟失,需要將相應的requeue參數設置為true,那么RabbitMQ會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者。如果requeue參數設置為false的話,RabbitMQ立即會把消息從隊列中移除,而不會把它發送給新的消費者。
還有一種情況需要考慮:requeue的消息是存入隊列頭部的,即可以快速的又被發送給消費,如果此時消費者又不能正確的消費而又requeue的話就會進入一個無盡的循環之中。對于這種情況,筆者的建議是在出現無法正確消費的消息時不要采用requeue的方式來確保消息可靠性,而是重新投遞到新的隊列中,比如設定的死信隊列中,以此可以避免前面所說的死循環而又可以確保相應的消息不丟失。對于死信隊列中的消息可以用另外的方式來消費分析,以便找出問題的根本。
本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、Jvm性能調優、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構師的朋友有一定的參考和幫助
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。