91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ParallelStream使用的坑怎么解決

發布時間:2022-01-12 21:30:03 來源:億速云 閱讀:229 作者:iii 欄目:安全技術

今天小編給大家分享一下ParallelStream使用的坑怎么解決的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

比如下面的代碼片段,讓人閱讀的時候就像是讀詩一樣。但是一旦用不好,也是會要命的。

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();

這段代碼有一個關鍵的函數,那就是stream。通過它,可以將一個普通的list,轉化為流,然后就可以使用類似于管道的方式對list進行操作。總之,用過的都說好。

對這些函數還不是太熟悉?可以參考:《到處是map、flatMap,啥意思?》

問題來了

假如我們把stream換成parallelStream,會發生什么情況?

根據字面上的意思,流會從串行 變成并行。

既然是并行,那用屁股想一想,就知道這里面肯定會有線程安全問題。不過我們這里討論的并不是要你使用線程安全的集合,這個話題太低級。現階段,知道在線程不安全的環境中使用線程安全的集合,已經是一個基本的技能。

這次踩坑的地方,是并行流的性能問題。

我們用代碼來說話。

下面的代碼,開啟了8個線程,這8個線程都在使用并行流進行數據計算。在執行的邏輯中,我們讓每個任務都sleep  1秒鐘,這樣就能夠模擬一些I/O請求的耗時等待。

使用stream,程序會在30秒后返回,但我們期望程序能夠在1秒多返回,因為它是并行流,得對得起這個稱號。

測試發現,我們等了好久,任務才執行完畢。

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }

實際上,在不同的機器上執行,這段代碼花費的時間都不一樣。

既然是并行,那肯定得有個并行度。太低了,體現不到并行的能能力;太大了,又浪費了上下文切換的時間。我是很沮喪的發現,很多高級研發,將線程池的各種參數背的滾瓜爛熟,各種調優,竟然敢睜一只眼閉一只眼的在I/O密集型業務中用上parallelStream。

要了解這個并行度,我們需要查看具體的構造方法。在ForkJoinPool類中找到這樣的代碼。

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;

可以看到,并行度到底是多少,是由下面的參數來控制的。如果無法獲取這個參數,則默認使用 CPU個數-1 的并行度。

可以看到,這個函數是為了計算密集型業務去設計的。如果你喂給它一大堆任務,它就會由并行執行退變成類似于串行的效果。

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N設置了一個初始值大小,它依然有問題。

因為,parallelism這個變量是final的,一旦設定,不允許修改。也就是說,上面的參數只會生效一次。

張三可能使用下面的代碼,設置了并行度大小為20。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

李四可能用同樣的方式,設置了這個值為30。那實際在項目中用的是哪個值,那就得問JVM是怎么加載的類信息了。

這種方式并不太非常靠譜。

一種解決方式

我們可以通過提供外置的forkjoinpool,也就是改變提交方式,來實現不同類型的任務分離。

代碼如下所示,通過顯式的代碼提交,即可實現任務分離。

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }

這樣,不同的場景,就可以擁有不同的并行度。這種方式和CountDownLatch有異曲同工之妙,我們需要手動管理資源。

以上就是“ParallelStream使用的坑怎么解決”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

孟村| 巴南区| 昌吉市| 扶余县| 运城市| 宜兴市| 铁岭市| 北辰区| 会宁县| 青河县| 黎平县| 廉江市| 抚顺县| 清水县| 高密市| 永年县| 辰溪县| 白玉县| 龙游县| 石阡县| 阿拉善右旗| 长垣县| 镇巴县| 沛县| 新沂市| 利津县| 江口县| 永仁县| 昌吉市| 宣武区| 循化| 邢台县| 陈巴尔虎旗| 潞城市| 阳泉市| 桓仁| 大冶市| 招远市| 伊吾县| 当雄县| 灵宝市|