您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java常用的并發容器”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Java常用的并發容器”吧!
ConcurrentHashMap:并發版HashMap
CopyOnWriteArrayList:并發版ArrayList
CopyOnWriteArraySet:并發Set
ConcurrentLinkedQueue:并發隊列(基于鏈表)
ConcurrentLinkedDeque:并發隊列(基于雙向鏈表)
ConcurrentSkipListMap:基于跳表的并發Map
ConcurrentSkipListSet:基于跳表的并發Set
ArrayBlockingQueue:阻塞隊列(基于數組)
LinkedBlockingQueue:阻塞隊列(基于鏈表)
LinkedBlockingDeque:阻塞隊列(基于雙向鏈表)
PriorityBlockingQueue:線程安全的優先隊列
SynchronousQueue:讀寫成對的隊列
LinkedTransferQueue:基于鏈表的數據交換隊列
DelayQueue:延時隊列
最常見的并發容器之一,可以用作并發場景下的緩存。底層依然是哈希表,但在JAVA 8中有了不小的改變,而JAVA 7和JAVA 8都是用的比較多的版本,因此經常會將這兩個版本的實現方式做一些比較(比如面試中)。
一個比較大的差異就是,JAVA 7中采用分段鎖來減少鎖的競爭,JAVA 8中放棄了分段鎖,采用CAS(一種樂觀鎖),同時為了防止哈希沖突嚴重時退化成鏈表(沖突時會在該位置生成一個鏈表,哈希值相同的對象就鏈在一起),會在鏈表長度達到閾值(8)后轉換成紅黑樹(比起鏈表,樹的查詢效率更穩定)。
并發版ArrayList,底層結構也是數組,和ArrayList不同之處在于:當新增和刪除元素時會創建一個新的數組,在新的數組中增加或者排除指定對象,最后用新增數組替換原來的數組。
適用場景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場景。
局限:由于讀的時候不會加鎖(讀的效率高,就和普通ArrayList一樣),讀取的當前副本,因此可能讀取到臟數據。如果介意,建議不用。
看看源碼感受下:
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { final transient ReentrantLock lock = new ReentrantLock(); private transient volatile Object[] array; // 添加元素,有鎖 public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); // 修改時加鎖,保證并發安全 try { Object[] elements = getArray(); // 當前數組 int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); // 創建一個新數組,比老的大一個空間 newElements[len] = e; // 要添加的元素放進新數組 setArray(newElements); // 用新數組替換原來的數組 return true; } finally { lock.unlock(); // 解鎖 } } // 讀元素,不加鎖,因此可能讀取到舊數據 public E get(int index) { return get(getArray(), index); } }
基于CopyOnWriteArrayList實現(內含一個CopyOnWriteArrayList成員變量),也就是說底層是一個數組,意味著每次add都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。
適用場景:在CopyOnWriteArrayList適用場景下加一個,集合別太大(全部遍歷傷不起)。
基于鏈表實現的并發隊列,使用樂觀鎖(CAS)保證線程安全。因為數據結構是鏈表,所以理論上是沒有隊列大小限制的,也就是說添加數據一定能成功。
基于雙向鏈表實現的并發隊列,可以分別對頭尾進行操作,因此除了先進先出(FIFO),也可以先進后出(FILO),當然先進后出的話應該叫它棧了。
SkipList即跳表,跳表是一種空間換時間的數據結構,通過冗余數據,將鏈表一層一層索引,達到類似二分查找的效果
類似HashSet和HashMap的關系,ConcurrentSkipListSet里面就是一個ConcurrentSkipListMap,就不細說了。
基于數組實現的可阻塞隊列,構造時必須制定數組大小,往里面放東西時如果數組滿了便會阻塞直到有位置(也支持直接返回和超時等待),通過一個鎖ReentrantLock保證線程安全。
用offer操作舉個例子:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 讀寫共用此鎖,線程間通過下面兩個Condition通信 * 這兩個Condition和lock有緊密聯系(就是lock的方法生成的) * 類似Object的wait/notify */ final ReentrantLock lock; /** 隊列不為空的信號,取數據的線程需要關注 */ private final Condition notEmpty; /** 隊列沒滿的信號,寫數據的線程需要關注 */ private final Condition notFull; // 一直阻塞直到有東西可以拿出來 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } // 在尾部插入一個元素,隊列已滿時等待指定時間,如果還是不能插入則返回 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 鎖住 try { // 循環等待直到隊列有空閑 while (count == items.length) { if (nanos <= 0) return false;// 等待超時,返回 // 暫時放出鎖,等待一段時間(可能被提前喚醒并搶到鎖,所以需要循環判斷條件) // 這段時間可能其他線程取走了元素,這樣就有機會插入了 nanos = notFull.awaitNanos(nanos); } enqueue(e);//插入一個元素 return true; } finally { lock.unlock(); //解鎖 } }
乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀線程來了不會一直阻塞嗎?
答案就在notEmpty、notFull里,這兩個出自lock的小東西讓鎖有了類似synchronized + wait + notify的功能。
基于鏈表實現的阻塞隊列,想比與不阻塞的ConcurrentLinkedQueue,它多了一個容量限制,如果不設置默認為int最大值。
類似LinkedBlockingQueue,但提供了雙向鏈表特有的操作。
構造時可以傳入一個比較器,可以看做放進去的元素會被排序,然后讀取的時候按順序消費。某些低優先級的元素可能長期無法被消費,因為不斷有更高優先級的元素進來。
一個虛假的隊列,因為它實際上沒有真正用于存儲元素的空間,每個插入操作都必須有對應的取出操作,沒取出時無法繼續放入。
一個簡單的例子感受一下:
import java.util.concurrent.*; public class Main { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); new Thread(() -> { try { // 沒有休息,瘋狂寫入 for (int i = 0; ; i++) { System.out.println("放入: " + i); queue.put(i); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { // 咸魚模式取數據 while (true) { System.out.println("取出: " + queue.take()); Thread.sleep((long) (Math.random() * 2000)); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } /* 輸出: 放入: 0 取出: 0 放入: 1 取出: 1 放入: 2 取出: 2 放入: 3 取出: 3 */
可以看到,寫入的線程沒有任何sleep,可以說是全力往隊列放東西,而讀取的線程又很不積極,讀一個又sleep一會。輸出的結果卻是讀寫操作成對出現。
JAVA中一個使用場景就是Executors.newCachedThreadPool(),創建一個緩存線程池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, // 核心線程為0,沒用的線程都被無情拋棄 Integer.MAX_VALUE, // 最大線程數理論上是無限了,還沒到這個值機器資源就被掏空了 60L, TimeUnit.SECONDS, // 閑置線程60秒后銷毀 new SynchronousQueue<Runnable>()); // offer時如果沒有空閑線程取出任務,則會失敗,線程池就會新建一個線程 }
實現了接口TransferQueue,通過transfer方法放入元素時,如果發現有線程在阻塞在取元素,會直接把這個元素給等待線程。如果沒有人等著消費,那么會把這個元素放到隊列尾部,并且此方法阻塞直到有人讀取這個元素。和SynchronousQueue有點像,但比它更強大。
可以使放入隊列的元素在指定的延時后才被消費者取出,元素需要實現Delayed接口。
感謝各位的閱讀,以上就是“Java常用的并發容器”的內容了,經過本文的學習后,相信大家對Java常用的并發容器這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。