您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關利用JCTools怎么實現Java并發程序,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
傳統上,在可變共享狀態下工作的多線程代碼使用鎖來確保數據一致性和發布(一個線程所做的更改對另一個線程可見)。
這種方法有許多缺點:
線程在試圖獲取鎖時可能會被阻塞,在另一個線程的操作完成之前不會取得任何進展—這有效地防止了并行性
鎖爭用越重,JVM處理調度線程、管理爭用和等待線程隊列的時間就越多,實際工作就越少
如果涉及多個鎖,并且它們以錯誤的順序獲取/釋放,則可能出現死鎖
優先級反轉的危險是可能的——高優先級線程被鎖定,試圖獲得由低優先級線程持有的鎖
大多數情況下,使用粗粒度鎖會嚴重損害并行性—細粒度鎖需要更仔細的設計,增加鎖開銷,并且更容易出錯
另一種方法是使用非阻塞算法,即任何線程的故障或掛起都不會導致另一個線程的故障或掛起的算法。
如果所涉及的線程中至少有一個能夠在任意時間段內取得進展,即在處理過程中不會出現死鎖,則非阻塞算法是無鎖的。
此外,如果保證每個線程的進程,這些算法是無等待的。
下面是一個非阻塞堆棧示例,它定義了基本狀態:
public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); private static class Node <E> { public E item; public Node<E> next; // standard constructor } }
還有一些API方法:
public void push(E item){ Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }
我們可以看到,該算法使用細粒度比較和交換(CAS)指令,并且是無鎖的(即使多個線程調用top.compareAndSet()同時,它們中的一個保證會成功)但不能無等待,因為不能保證CAS最終會對任何特定線程成功。
首先,讓我們將JCTools依賴項添加到pom.xml文件:
<dependency> <groupId>org.jctools</groupId> <artifactId>jctools-core</artifactId> <version>2.1.2</version> </dependency>
請注意,Maven Central上提供了最新的可用版本。
該庫提供了許多隊列以在多線程環境中使用,即一個或多個線程以線程安全的無鎖方式寫入隊列,一個或多個線程以線程安全的無鎖方式從隊列中讀取。
所有隊列實現的通用接口是org.jctools.queues.MessagePassingQueue。
隊列類型
所有隊列都可以根據其生產者/消費者策略進行分類:
單個生產者,單個消費者–此類類使用前綴Spsc命名,例如SpscArrayQueue
單個生產者,多個消費者–使用Spmc前綴,例如SpmcArrayQueue
多個生產者,單個消費者-使用Mpsc前綴,例如MpscArrayQueue
多個生產者、多個消費者—使用Mpmc前綴,例如MpmcArrayQueue
需要注意的是,在內部沒有策略檢查,也就是說,如果使用不正確,隊列可能會無聲地發生故障。
例如,下面的測試從兩個線程填充單個生產者隊列并通過,即使不能保證使用者看到來自不同生產者的數據:
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set<Integer> fromQueue = new HashSet<>(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);
總結以上分類,以下是JCTools隊列列表:
SpscArrayQueue–單個生產者,單個消費者,在內部使用一個數組,限制容量
SpscLinkedQueue–單個生產者,單個消費者,內部使用鏈表,未綁定容量
SpscChunkedArrayQueue–單生產商、單消費者,從初始容量開始,一直增長到最大容量
SpscGrowableArrayQueue–單生產者、單消費者,從初始容量開始,一直增長到最大容量。這與SpscChunkedArrayQueue是相同的契約,唯一的區別是內部塊管理。建議使用SpscChunkedArrayQueue,因為它有一個簡化的實現
SpscUnboundedArrayQueue–單個生產者,單個消費者,在內部使用數組,未綁定容量
SpmcArrayQueue–單個生產者、多個使用者,在內部使用一個陣列,限制容量
MpscArrayQueue—多個生產者、單個消費者在內部使用一個陣列,限制容量
MpscLinkedQueue–多個生產者,單個消費者,在內部使用鏈表,未綁定容量
MpmcArrayQueue—多個生產者、多個消費者在內部使用一個陣列,限制容量
前面提到的所有隊列都使用sun.misc.Unsafe. 然而,隨著java9和JEP-260的出現,這個API在默認情況下變得不可訪問。
因此,有其他隊列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能較差)而不是sun.misc.Unsafe.
它們是從上面的隊列生成的,它們的名稱中間插入了單詞Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。
如果可能,建議使用“常規”隊列,并且僅在sun.misc.Unsafe像Hot Java9+和JRockit一樣被禁止/無效。
所有JCTools隊列也可能具有最大容量或未綁定。當隊列已滿且受容量限制時,它將停止接受新元素。
在以下示例中,我們:
填滿隊列
確保在此之后停止接受新元素
從中排出,并確保之后可以添加更多元素
請注意,為了可讀性,刪除了幾個代碼語句。
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set<Integer> fromQueue = new HashSet<>(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));
JCTools還提供了一些非隊列數據結構。
它們都列在下面:
NonBlockingHashMap–一個無鎖的ConcurrentHashMap替代方案,具有更好的伸縮性和通常更低的突變成本。它是實現sun.misc.Unsafe,因此,不建議在Java9+或JRockit環境中使用此類
NonBlockingHashMapLong–與NonBlockingHashMap類似,但使用基本長鍵
NonBlockingHashSet–一個簡單的包裝器,圍繞著像JDK的java.util.Collections.newSetFromMap()一樣的NonBlockingHashMap
NonBlockingIdentityHashMap–與NonBlockingHashMap類似,但按標識比較鍵。
NonBlockingSetInt–一個多線程位向量集,實現為一個原始long數組。在無聲自動裝箱的情況下工作無效
讓我們使用JMH來比較JDK的ArrayBlockingQueue和JCTools隊列的性能。JMH是Sun/Oracle JVM gurus提供的一個開源微基準框架,它保護我們不受編譯器/JVM優化算法的不確定性的影響。
請注意,為了提高可讀性,下面的代碼段遺漏了幾個語句。
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue<Long> queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }
結果:
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
我們可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了兩倍。
使用JCTools有一個重要的缺點——不可能強制正確使用庫類。例如,考慮在我們的大型成熟項目中開始使用MpscArrayQueue的情況(注意,必須有一個使用者)。
不幸的是,由于項目很大,有可能有人出現編程或配置錯誤,現在從多個線程讀取隊列。這個系統看起來像以前一樣工作,但現在有可能消費者錯過了一些信息。這是一個真正的問題,可能會有很大的影響,是很難調試。
理想情況下,應該可以運行具有特定系統屬性的系統,該屬性強制JCTools確保線程訪問策略。例如,本地/測試/暫存環境(而不是生產環境)可能已啟用它。遺憾的是,JCTools沒有提供這樣的屬性。
另一個需要考慮的問題是,盡管我們確保JCTools比JDK的對應工具快得多,但這并不意味著我們的應用程序獲得了與我們開始使用自定義隊列實現時相同的速度。大多數應用程序不會在線程之間交換很多對象,而且大多是I/O綁定的。
以上就是利用JCTools怎么實現Java并發程序,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。