您好,登錄后才能下訂單哦!
Redis是一個開源的使用ANSI C語言編寫、支持網絡、可基于內存亦可持久化的日志型、Key-Value數據庫,并提供多種語言的API。從2010年3月15日起,Redis的開發工作由VMware主持。
在Redis中使用隊列
像任何消息代理一樣,Redis需要以正確的順序發送消息。 可以根據消息的年齡或某些其他預定義的優先級等級發送消息。
為了存儲這些未決消息,Redis開發人員需要隊列數據結構。 Redisson是使用Redis和Java進行分布式編程的框架,它提供了許多分布式數據結構(包括隊列)的實現。
Redisson通過提供Java API使Redis開發更加容易。 Redisson不需要開發人員學習Redis命令,而是包括所有眾所周知的Java接口,例如Queue和BlockingQueue。 Redisson還處理Redis中繁瑣的幕后工作,例如連接管理,故障轉移處理和數據序列化。
基于Redis的分布式Java隊列
Redisson提供了Java中基本隊列數據結構的多個基于Redis的實現,每種實現都有不同的功能。 這使你可以選擇最適合你目的的隊列類型。
下面,我們將使用Redisson Java框架討論六種不同類型的基于Redis的分布式隊列。
隊列
Redisson中的RQueue對象實現了java.util.Queue接口。 隊列用于需要從最早的最早的元素開始處理(也稱為“先進先出”或FIFO)的情況。
與普通Java一樣,可以使用peek()方法檢查RQueue的第一個元素,或者使用poll()方法檢查和刪除RQueue的第一個元素:
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
阻塞隊列
Redisson中的RBlockingQueue對象實現了java.util.BlockingQueue接口。
BlockingQueues是阻塞線程的隊列,這些線程試圖從空隊列中進行輪詢,或者試圖在已滿的隊列中插入元素。 該線程將被阻塞,直到另一個線程將一個元素插入到空隊列中,或從完整隊列中輪詢第一個元素為止。
下面的示例代碼演示了RBlockingQueue的正確實例化和使用。 特別是,你可以使用參數指定對象將等待線程變得可用的時間來調用poll()方法:
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
在故障轉移或重新連接到Redis服務器的過程中,將自動重新預訂poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。
BoundedBlockingQueue
Redisson中的RBoundedBlockingQueue
對象實現了有界的阻塞隊列結構。 有界阻塞隊列是容量已受限制(即有限)的阻塞隊列。
以下代碼演示了如何在Redisson中實例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用于嘗試設置阻塞隊列的容量。 trySetCapacity()返回布爾值“ true”或“ false”,這取決于是否成功設置了容量或是否已經設置了容量:
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
延遲排隊
Redisson中的RDelayedQueue對象允許你在Redis中實現延遲隊列。 當使用諸如指數補償的策略將消息傳遞給消費者時,這可能會很有用。 每次嘗試發送郵件失敗后,重試之間的時間將成倍增加。
在與元素一起指定的延遲之后,延遲隊列中的每個元素將被轉移到目標隊列。 此目標隊列可以是實現RQueue接口的任何隊列,例如RBlockingQueue或RBoundedBlockingQueue。
RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在不再需要隊列之后,通過使用destroy()方法銷毀延遲的隊列是一個好主意。 但是,如果要關閉Redisson,則沒有必要。
PriorityQueue
Redisson中的RPriorityQueue對象實現了java.util.Queue接口。 優先級隊列是不是按元素的使用期限而是按照與每個元素相關聯的優先級排序的隊列。
如下面的示例代碼所示,RPriorityQueue使用比較器對隊列中的元素進行排序:
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();
PriorityBlockingQueue
Redisson中的RPriorityBlockingQueue對象結合了RPriorityQueue和RBlockingQueue的功能。 與RPriorityQueue一樣,RPriorityBlockingQueue也使用Comparator對隊列中的元素進行排序。
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();
在故障轉移或重新連接到Redis服務器的過程中,將自動重新預訂poll(),pollLastAndOfferFirstTo()和take()Java方法。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。