您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何解決RocketMQ消息消費異常”,在日常操作中,相信很多人在如何解決RocketMQ消息消費異常問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何解決RocketMQ消息消費異常”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
開發中在項目重啟時會重復消費消息,但其實消息已經消費過了。
然而通過源碼斷點MQClientInstance 定時任務正常,只是每次更新的offset都是原offet
由于是用的spring-boot整合的client,跟蹤consumer源碼,代碼在DefaultRocketMQListenerContainer.handleMessage方法中
然而一切正常,再往上跟蹤到DefaultMessageListenerConcurrently
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
首先在catch代碼塊點打斷點看看是不是有問題,結果發現并沒有走到這里,這就坑爹了,害我又從其它方面各種查原因,浪費了很多時間。后面一步一步調試,最終在 log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); 打日志這一步時拋出了異常,這尼瑪打個日志還能異常,還不是Exception的異常。。本來松了口氣以為找到了原因就好解決了,沒想到這才是剛剛開始。
java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter
原因是在MessageClientExt類中調用getMsgId方法里,調用了MessageClientIDSetter.getUniqID(this)直接拋出的異常
從異常信息來看是MessageClientIDSetter 在初始化的時候出了問題
static { byte[] ip; try { ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4); tempBuffer.position(0); tempBuffer.put(ip); tempBuffer.position(ip.length); tempBuffer.putInt(UtilAll.getPid()); tempBuffer.position(ip.length + 2); tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); }
發面是在ip = UtilAll.getIP();出了問題,然則并沒有到catch代碼塊,而是跳到了DefaultMqPushConsumerImpl類中,這里又一個坑爹的是異常塊沒有任何處理,看不到異常信息,好吧只能一步一步繼續斷點調試
最終在UtillAll類的ipV6Check方法執行到InetAddressValidator.getInstance();出了問題。
private static boolean ipV6Check(byte[] ip) { if (ip.length != 16) { throw new RuntimeException("illegal ipv6 bytes"); } InetAddressValidator validator = InetAddressValidator.getInstance(); return validator.isValidInet6Address(ipToIPv6Str(ip)); }
但是在本地調試這段代碼又沒有任何問題。因此只能在debug時調試,報的錯是classNotFound異常
從rocketMq的依賴來看他需要的版本是1.6
因此我們需要把1.3.1的版本移除
看著這密密麻麻的依賴關系,Idea還沒有查找功能,只能慢慢找了,最后功夫不復有心人,把依賴移除,重啟一切都好了
到此,關于“如何解決RocketMQ消息消費異常”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。