您好,登錄后才能下訂單哦!
java中的消息隊列怎么利用多線程實現?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
1、定義一個隊列緩存池:
//static修飾的成員變量和成員方法獨立于該類的任何對象。也就是說,它不依賴類特定的實例,被類的所有實例共享。 private static List<Queue> queueCache = new LinkedList<Queue>();
2、定義隊列緩沖池最大消息數,如果達到該值,那么隊列檢入將等待檢出低于該值時繼續進行。
private Integer offerMaxQueue = 2000;
3、定義檢出線程,如果隊列緩沖池沒有消息,那么檢出線程會線程等待中
new Thread(){ public void run(){ while(true){ String ip = null; try { synchronized (queueCache) { Integer size = queueCache.size(); if(size==0){ //隊列緩存池沒有消息,等待。。。。 queueCache.wait(); } Queue queue = queueCache.remove(0); if(isIpLock(queueStr)){//假若這個是一個多應用的分布式系統,那么這個判斷應該是分布式鎖,這里說的鎖不是線程停止,而是跳過該消息,滯后處理 queueCache.add(queue);該queue重新加入隊列緩沖池,滯后處理, continue; }else{ ;//這里是處理該消息的操作。 } size = queueCache.size(); if(size<offerMaxQueue&&size>=0){ queueCache.notifyAll();//在隊列緩存池不超過最大值的前提下,假若檢入正在等待中,那么那么讓他們排隊檢入。 } } } catch (Exception e) { e.printStackTrace(); }finally{ try {//檢出該消息隊列的鎖 unIpLock(queueStr); } catch (Execption e) {//捕獲異常,不能讓線程掛掉 e.printStackTrace(); } } } }.start();
4、檢入隊列
synchronized (queueCache) { while(true){ Integer size = queueCache.size(); if(size>=offerMaxQueue){ try { queueCache.wait(); continue;//繼續執行等待中的檢入任務。 } catch (InterruptedException e) { e.printStackTrace(); } }//IF if(size<=offerMaxQueue&&size>0){ queueCache.notifyAll(); } break;//檢入完畢 }//while }
5、鎖方法實現
/** * 鎖 * @param ip * @return * @throws */ public Boolean isLock(String queueStr) { return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1; } //解鎖 public void unIpLock(String queueStr) { if(ip!=null){ this.redisManager.del(queueStr+"_lock"); // lock.unlock(); } }
看完上述內容,你們掌握java中的消息隊列怎么利用多線程實現的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。