您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么掌握Java LinkedBlockingQueue”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么掌握Java LinkedBlockingQueue”吧!
隊列在生活中隨處可見,醫院繳費需要排隊、做核酸需要排隊、汽車等紅綠燈需要排隊等等。
隊列是一個按照先來到就排在前面,后來到排在后面的數據結構,并且出隊的時候也是按照先來到先出隊。使用數組和鏈表進行實現。通常用于協調任務的執行和數據的交換。
LinkedBlockingQueue 是一個可選有界阻塞隊列,有界指的是隊列存在一個最大容量;阻塞指的是如果隊列已經滿了,想要往隊列繼續添加元素的話,那么這個操作將會被暫停,直到隊列中有空位才會繼續完成添加操作。如果隊列已經為空,想要從隊列中獲取元素,那么這個操作將會被暫停,直接隊列中存在元素才會繼續完成獲取操作。
LinkedBlockingQueue 內部使用鏈表作為元素的存儲結構。內部使用了兩個鎖,分別使用于存操作和取操作。
執行存取操作時,都必須先獲取鎖,才可以執行存取操作,保證 LinkedBlockingQueue 是線程安全。
LinkedBlockingQueue 通過兩個 Condition 條件隊列,一個 notFull 條件,一個 notEmpty 條件。在對隊列進行插入元素操作時,判斷當前隊列已經滿,則通過 notFull 條件將線程阻塞,直到其他線程通知該線程隊列可以繼續插入元素。在對隊列進行移除元素操作時,判斷當前隊列已經空,則通過 notEmpty 條件阻塞線程,直到其他線程通過該線程可以繼續獲取元素。
這樣保證線程的存取操作不會出現錯誤。避免隊列在滿時,丟棄插入的元素;也避免在隊列空時取到一個 null 值。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
無參構造函數中,默認使用 Integer.MAX_VALUE
作為隊列的最大容量。
有參構造函數中,可以自己指定隊列的最大容量,并且創建了頭節點和尾節點。那么 LinkedBlockingQueue 使用的是有頭單向鏈表。
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取鎖 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存鎖 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
并且在對象初始化時,創建了兩個鎖,分別使用于存操作和取操作。創建了兩個條件隊列,分別用于隊列空和滿的情況。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1.獲取鎖
2.判斷當前隊列是否已經滿了
如果隊列已經滿了,調用 notFull 條件隊列的 await() 方法,將該線程阻塞,暫停該線程的插入操作。避免內部溢出的問題。
如果沒有滿,則直接調用入隊函數 enqueue 插入到隊列末尾。
3.檢查此時隊列是否已滿
如果未滿,則調用 notFull 條件隊列的 signal() 方法,喚醒被阻塞在 notFull 條件隊列的線程。
4.解鎖
檢查插入元素前的隊列元素數量是否等于0
等于 0,則調用 notEmpty 條件隊列的 signal() 方法,通知其隊列現在不為空,可以喚醒阻塞線程獲取元素。
為什么需要調用 notFull 條件隊列的 signal() 方法? 因為隊列取操作和存操作所使用的鎖是不一樣的,那么就說明,一個線程執行存入操作時,其他線程是可以執行取出操作的。我們來看下面這個例子:
隊列總容量為 5,當前元素數量為5。線程 A 獲取了存鎖,想要插入了元素。但是因為隊列容量已滿,釋放鎖,并且加入到條件隊列中,等待被喚醒。
線程 B 獲取了存鎖,想要插入了元素。但是因為隊列容量已滿,釋放鎖,并且加入到條件隊列中,等待被喚醒。
線程 C 獲取了取鎖,取出了元素 1。并且通過 notFull 的 signal 方法喚醒條件隊列中被阻塞的線程 A。線程 A 被喚醒后加入到同步隊列中,但是此時還沒有競爭到鎖。
線程 D 獲取了取鎖,取出了元素 2。但是還沒有執行到喚醒阻塞線程的代碼。
線程 A 競爭到鎖,開始執行插入元素操作。將元素插入之后,檢查到隊列元素數量為 4,小于隊列的總容量,因此會執行 notFull 的 signal 方法喚醒條件隊列中被阻塞的線程 B。線程 B 被喚醒后加入到同步隊列中,開始競爭鎖。
線程 B 競爭到鎖,開始執行插入元素操作。將元素插入之后,檢查到隊列元素數量等于 5,不進行喚醒操作。
這樣做的目的是盡快的喚醒阻塞線程,可以更快的完成插入元素操作。因為線程存和取的操作相互之間并不是互斥的,而是獨立運行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.獲得取鎖
2.判斷當前隊列是否為空
如果隊列沒有元素,調用 notEmpty 條件隊列的 await() 方法,將該線程阻塞,暫停該線程的獲取操作。避免獲取元素出錯。
如果不為空,則直接調用出隊函數 dequeue 移除隊列第一個元素,并返回給客戶端。
3.檢查此時隊列是否為空
如果不為空,則調用 notEmpty 條件隊列的 signal() 方法,喚醒被阻塞在 notEmpty 條件隊列的線程。
4.釋放鎖
5.檢查獲取元素前的隊列元素數量是否等于最大容量
等于最大容量,因為此時已經取出一個元素,因此隊列處于未滿的狀態,可以喚醒阻塞在 notFull 條件的線程,讓線程繼續插入元素。
步驟 3 的目的是盡快的喚醒阻塞線程,可以更快的完成取元素操作。提高吞吐量。可以嘗試自己畫出流程圖。
private void enqueue(Node<E> node) { last = last.next = node; }
入隊函數極其簡單,只要將最后一個元素的 next 指針指向當前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我們前面說 LinkedBlockingQueue 使用的是有頭鏈表。頭節點只是作為一個標志,實際上并不是一個真正的元素。當獲取元素時,將頭節點的下一個節點作為頭節點,將原來的頭節點取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一樣適用于多個線程之間需要共享數據、協調任務執行的場景。因此可以總結出以下幾個應用場景:
線程池:線程池是一個常見的并發編程模型,它通過線程池中的線程執行任務。并且可以重復使用這些線程。在線程池中,可以使用 LinkedBlockingQueue 來存儲需要執行的任務,以此控制任務數量和執行順序。當線程池中的線程執行完任務之后,可以從 LinkedBlockingQueue 中取出下一個任務執行。
生產者-消費者:在生產者-消費者模型中,生產者負責生產數據,消費者負責對數據進行處理。在這種模式下,LinkedBlockingQueue 可以作為生產者與消費者之間的數據通道,保證線程安全和數據正確。
Nacos: Nacos 是一個動態服務發現、配置和服務管理平臺,它使用 LinkedBlockingQueue 來實現內部的任務隊列。
Tomcat:從 Tomcat 7 開始,請求隊列默認使用了 LinkedBlockingQueue 實現。
Hystrix: 一個流行的容錯框架,其默認使用 LinkedBlockingQueue 作為請求隊列。
感謝各位的閱讀,以上就是“怎么掌握Java LinkedBlockingQueue”的內容了,經過本文的學習后,相信大家對怎么掌握Java LinkedBlockingQueue這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。