91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何解決RabbitMq消息隊列Qos?Prefetch消息堵塞問題

發布時間:2022-02-07 10:44:24 來源:億速云 閱讀:234 作者:iii 欄目:開發技術

本篇內容介紹了“如何解決RabbitMq消息隊列Qos Prefetch消息堵塞問題”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

ConnectionFactory:創建connection的工廠類

Connection: 簡單理解為socket

Channel:和mq交互的接口,定義queue、exchange和綁定queue、exhange等接口都是它。

接下來就是和mq的交互類

exchange:簡單地看成路由,類型不是重點,看看官網即可

queue:客戶端監聽的是queue,而不是exchange,但是使用queue的前提要先將exchange和queue綁定。用過java queue工具類應該很容易上手,queue分為寫和讀,各自可以有自己頻率,寫得快讀得慢,容易堵塞;寫得慢讀得快又容易造成消費者的空閑。

Prefetc:一個重要卻容易被忽略的指標,也是這次遇到的問題。

prefetch與消息投遞

prefetch是指單一消費者最多能消費的unacked messages數目。

如何理解呢?

mq為每一個 consumer設置一個緩沖區,大小就是prefetch。每次收到一條消息,MQ會把消息推送到緩存區中,然后再推送給客戶端。當收到一個ack消息時(consumer 發出baseack指令),mq會從緩沖區中空出一個位置,然后加入新的消息。但是這時候如果緩沖區是滿的,MQ將進入堵塞狀態。

更具體點描述,假設prefetch值設為10,共有兩個consumer。也就是說每個consumer每次會從queue中預抓取 10 條消息到本地緩存著等待消費。同時該channel的unacked數變為20。而Rabbit投遞的順序是,先為consumer1投遞滿10個message,再往consumer2投遞10個message。如果這時有新message需要投遞,先判斷channel的unacked數是否等于20,如果是則不會將消息投遞到consumer中,message繼續呆在queue中。之后其中consumer對一條消息進行ack,unacked此時等于19,Rabbit就判斷哪個consumer的unacked少于10,就投遞到哪個consumer中。

我遇到的問題是一個粗心的程序員,在編寫代碼的時候,他對某些消息處理方式是這樣的

  if (success) {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    } else {
                        logger.error("######### The message is not delete from queue : {}", body);
                    }

首先他講ack機制設置為手動的,然后他的理解是如果處理成功的消息,就ack給MQ,期望MQ就可以刪除完成的數據。不然,保留數據再次被處理。

這里的誤區就是就是對ack的理解,失敗的時候,如果需要讓程序繼續處理,應該使用basicNack,并告訴mq將消息再次放入隊列

    if (success) {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        } else {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        }

對于客戶端意外宕機的情況,沒有ack服務器確實不會刪除掉數據,但是consumer重啟以后,對于服務器就是一個新的消費者了,也就是它的緩沖區又被重置為原來的n-prefetch,所以這個問題被粗心的小哥想當然地測試通過了。

prefetch的大小應該為多少

這篇文章給了很好的建議,我簡單地說一下我的理解。

理想狀況下,計算MQ SERVER 從緩沖區中拿到消息并推送到消費端,加上消費端處理完ack消息到MQ server,的時間,假設為100ms,其中消費端處理業務話費了10ms。

這里可以得出我們 prefetch = 100ms / 10ms = 10,也就是消息來回的總時間/業務處理的時間,這里要求我們 prefetch >= 10。一般計算這個時間不會太準確只能毛姑姑的,所以prefetch一般要大一點。但是這個值也不能太大,不然消費端就一只處于空閑狀態了。

所以如果你保證所有的消息都ack了,但是還是出現比較長時間的堵塞,你就或者加大一點prefetch,或者多加一些機器,或者減少業務處理的時間了。一開始建議采用或者,使用一個線程池來處理這些業務邏輯。

“如何解決RabbitMq消息隊列Qos Prefetch消息堵塞問題”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

丹阳市| 彝良县| 天峨县| 安泽县| 定南县| 句容市| 军事| 通城县| 亳州市| 瓦房店市| 城市| 潜山县| 水城县| 三河市| 贵州省| 怀仁县| 丹江口市| 巴马| 常山县| 彭州市| 文化| 永和县| 彰武县| 大邑县| 阳山县| 米脂县| 什邡市| 龙里县| 沂源县| 英山县| 汝城县| 平罗县| 申扎县| 迭部县| 潢川县| 泗洪县| 石河子市| 梁河县| 蕉岭县| 根河市| 阿拉善盟|