您好,登錄后才能下訂單哦!
這篇文章主要介紹“rabbitmq的事務機制”,在日常操作中,相信很多人在rabbitmq的事務機制問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”rabbitmq的事務機制”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
rabbitmq事務機制:
1:通過事務機制實現
1:channel.txSelect()聲明啟動事務模式;
2 : channel.txComment()提交事務;
3:channel.txRollback()回滾事務;
try { channel.txSelect(); // 聲明事務 // 發送消息 channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); channel.txCommit(); // 提交事務 } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); }
2:通過發送方確認 publisher confirm 機制實現。
Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的。 Confirm的三種實現方式: 方式一:channel.waitForConfirms()普通發送方確認模式; 方式二:channel.waitForConfirmsOrDie()批量確認模式; 方式三:channel.addConfirmListener()異步監聽發送方確認模式; 方式一:普通Confirm模式 // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息發送成功" ); } 看代碼可以知道,我們只需要在推送消息之前,channel.confirmSelect()聲明開啟發送方確認模式,再使用channel.waitForConfirms()等待消息被服務器確認即可。 方式二:批量Confirm模式 // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到所有信息都發布,只要有一個未確認就會IOException System.out.println("全部執行完成"); 以上代碼可以看出來channel.waitForConfirmsOrDie(),使用同步方式等所有的消息發送之后才會執行后面代碼,只要有一個消息未被確認就會拋出IOException異常。 方式三:異步Confirm模式 // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //異步監聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); } });
rabbitmq 消息分發
RabbitMQ 隊列擁有多個消費者時 ,隊列收到的消息將以輪詢 (round-robin )的分發方發送給消費者。每條消息只會發送給訂閱列表里的一個消費者。這種方式非常適合擴展,而它是專門為并發程序設計的。如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可。 很多時候輪詢的分發機制也不是那么優雅。默認情況下,如果有 個消費者,那么 RabbitM會將第 條消息分發給第 m%n (取余的方式)個消費者, RabbitMQ 不管消費者是否消費并己 經確認 (Basic.Ack) 了消息。試想一下,如果某些消費者任務繁重,來不及消費那么多的消 息,而某些其他消費者由于某些原因(比如業務邏輯簡單、機器性能卓越等)很快地處理完了 所分配到的消息,進而進程空閑,這樣就會造成整體應用吞吐量的下降。 那么該如何處理這種情況呢?這里就要用到 channel.basicQos(int prefetchCoun這個方法,如前面章節所述, channel.basicQos 方法允許限制信道上的消費者所能保持的最未確認消息的數量。 舉例說明,在訂閱消費隊列之前,消費端程序調用了 channel.basicQos(5) ,之后閱了某個隊列進行消費。 RabbitM 會保存一個消費者的列表,每發送一條消息都會為對應的費者計數,如果達到了所設定的上限,那么 RabbitMQ 就不會向這個消費者再發送任何消息。 直到消費者確認了某條消息之后 RabbitMQ 將相應的計數減1,之后消費者可以繼續接收消息, 直到再次到達計數上限。這種機制可以類比于 TCP!IP中的"滑動窗口" 注意要點: Basic.Qos 的使用對于拉模式的消費方式無效. channel.basicQos 有三種類型的重載方法: (1) void basicQos(int prefetchCount) throws IOException; (2) void basicQos( nt prefetchCount , boo1ean globa1) throws IOExcepti(3) void basicQos(int prefetchSize , int prefetchCount , boo1ean global) IOException ; 前面介紹的都只用到了 prefetchCount 這個參數,當 prefetchCount 設置 沒有上限。還有 prefetchSize 這個參數表示消費者所能接收未確認消息的總體大小的上單位為 ,設置為 則表示沒有上限。 對于 個信道來說,它可以同時消費多個隊列,當設置了 prefetchCount 大于 時信道需要和各個隊列協調以確保發送的消息都沒有超過所限定的 prefetchCount 的值,RabbitM 的性能降低,尤其是這些隊列分散在集群中的多個 Broker 節點之中。Rabbit提升相關的性能,在 AMQPO-9-1 協議之上重新定義了 global 這個參數,對比如表 4- 所4-1 global 參數的對比 global 參數 false:信道上新的消費者需要遵從 prefetchCount 的限定值true:信道上所有的消費者都需要遵從 prefetchCount的限定值
rabbitmq 消息順序消費:
rabbitmq重發,死信隊列,延時,及網絡閃斷都會造成生產順序亂序,故不支持順序消費。
可以在業務上自定義排序值,當接受到的排序值與下一個預估要消費的值不一致是,就等待。
rabbitmq 消息可靠性保障:
消息可靠傳輸一般是業務系統接入消息中間件時首要考慮的問題,一般消息中間件的消傳輸保障分為三個層級。 {> At most once: 最多一次。消息可能會丟失,但絕不會重復傳輸 At least once: 最少一次。消息絕不會丟失,但可能會重復傳輸。 Exactly once: 恰好一次。每條消息肯定會被傳輸一次且僅傳輸一次。 RabbitMQ 支持其中的"最多一次 "和"最少一次"。其中"最少 次"投遞實現需要考以下這個幾個方面的內容: (1)消息生產者需要開啟事務機制或者 publisher confirm 機制,以確保消 息可以可靠地輸到 RabbitMQ 中。 (2) 消息生產者需要配合使用 mandatory 參數或者備份交換器來確保消息能夠從交換路由到隊列中,進而能夠保存下來而不會被丟棄。 3) 消息和隊列都需要進行持久化處理,以確保 RabbitMQ 服務器在遇到異常情況時不? 90 ? Rabbi{MQ 進階 造成消息丟失 (4) 消費者在消費消息的同時需要將 autoAck 設置為 false ,然后通過手動確認的方式去 確認己經正確消費的消息,以避免在消費端引起不必要的消息丟失。 "最多 次"的方式就無須考慮以上那些方面,生產者隨意發送,消費者隨意消費,不過這 樣很難確保消息不會丟失 "恰好 次"是 RabbitMQ 目前無法保障的。考慮這樣一種情況,消費者在消費完一條消息 之后向 RabbitMQ 發送確認 Basic.Ack 命令,此時由于網絡斷開或者其他原因造成 RabbitMQ 并沒有收到這個確認命令,那么 RabbitMQ 不會將此條消息標記刪除。在重新建立連接之后, 消費者還是會消費到這 條消息,這就造成了重復消費。再考慮 種情況,生產者在使用 ublisher confirm 機制的時候,發送完 條消息等待 RabbitMQ 返回確認通知,此時網絡斷開, 生產者捕獲到異常情況,為了確保消息可靠性選擇重新發送,這樣 RabbitMQ 中就有兩條同樣 的消息,在消費的時候,消費者就會重復消費 那么 RabbitMQ 有沒有去重的機制來保證"恰好一次"呢?答案是并沒有,不僅是 RabbitMQ 目前大多數 流的消息中間件都沒有消息去重機制,也不保障"恰好 次"。去重處理 般是在 業務客戶端實現,比如引入 GUID (Globally Unique Identifier) 的概念。針對 GUID ,如果從客 戶端的角度去 ,那么 要引入集中式緩存,必然會增加依賴復雜度,另外緩存的大小也難以 界定 建議在實際生產環境中,業務方根據自身的業務特性進行去重,比如業務消息本身具備 等'性,或者借助 Redis 等其他產品進行去重處理。
到此,關于“rabbitmq的事務機制”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。