您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何分析Java的Fork/Join并發框架,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
今天我就把自己對Fork/Join一些淺顯的理解記錄下來。
1. Fork/Join是什么
Oracle的官方給出的定義是:Fork/Join框架是一個實現了ExecutorService接口的多線程處理器。它可以把一個大的任務劃分為若干個小的任務并發執行,充分利用可用的資源,進而提高應用的執行效率。
Fork/Join實現了ExecutorService,所以它的任務也需要放在線程池中執行。它的不同在于它使用了工作竊取算法,空閑的線程可以從滿負荷的線程中竊取任務來幫忙執行。(我個人理解的工作竊取大意就是:由于線程池中的每個線程都有一個隊列,而且線程間互不影響。那么線程每次都從自己的任務隊列的頭部獲取一個任務出來執行。如果某個時候一個線程的任務隊列空了,而其余的線程任務隊列中還有任務,那么這個線程就會從其他線程的任務隊列中取一個任務出來幫忙執行。就像偷取了其他人的工作一樣)
Fork/Join框架的核心是繼承了AbstractExecutorService的ForkJoinPool類,它保證了工作竊取算法和ForkJoinTask的正常工作。
2. Fork/Join的基本用法
(1)Fork/Join基類
上文已經提到,Fork/Join就是要講一個大的任務分割成若干小的任務,所以***步當然是要做任務的分割,大致方式如下:
if (這個任務足夠小){ 執行要做的任務 } else { 將任務分割成兩小部分 執行兩小部分并等待執行結果 }
要實現FrokJoinTask我們需要一個繼承了RecursiveTask或RecursiveAction的基類,并根據自身業務情況將上面的代碼放入基類的coupute方法中。RecursiveTask和RecursiveAction都繼承了FrokJoinTask,它倆的區別就是RecursiveTask有返回值而RecursiveAction沒有。下面是我做的一個選出字符串列表中還有"a"的元素的Demo:
@Override protected List<String> compute() { // 當end與start之間的差小于閾值時,開始進行實際篩選 if (end - this.start < threshold) { List<String> temp = list.subList(this.start, end); return temp.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); } else { // 如果當end與start之間的差大于閾值時 // 將大任務分解成兩個小任務。 int middle = (this.start + end) / 2; ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold); ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold); // 并行執行兩個“小任務” left.fork(); right.fork(); // 把兩個“小任務”的結果合并起來 List<String> join = left.join(); join.addAll(right.join()); return join; } }
(2)執行類
做好了基類就可以開始調用了,調用時首先我們需要Fork/Join線程池ForkJoinPool,然后向線程池中提交一個ForkJoinTask并得到結果。ForkJoinPool的submit方法的入參是一個ForkJoinTask,返回值也是一個ForkJoinTask,它提供一個get方法可以獲取到執行結果。
代碼如下:
ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的ForkJoinTask任務 ForkJoinTask<List<String>> future = pool.submit(forkJoinService); System.out.println(future.get()); // 關閉線程池 pool.shutdown();
就這樣我們就完成了一個簡單的Fork/Join的開發。
提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封裝的方法也都用到了Fork/Join。(細心的讀者可能注意到我在Fork/Join中也有用到stream,所以其實這個Fork/Join是多余的,因為stream已經實現了Fork/Join,不過這只是一個Demo展示,沒有任何實際用處也就無所謂了)
附完整代碼以便以后參考:
1. 定義抽象類(用于拓展,此例中沒有實際作用,可以不定義此類):
import java.util.concurrent.RecursiveTask; /** * Description: ForkJoin接口 * Designer: jack * Date: 2017/8/3 * Version: 1.0.0 */ public abstract class ForkJoinService<T> extends RecursiveTask<T>{ @Override protected abstract T compute(); }
2. 定義基類
import java.util.List; import java.util.stream.Collectors; /** * Description: ForkJoin基類 * Designer: jack * Date: 2017/8/3 * Version: 1.0.0 */ public class ForkJoinTest extends ForkJoinService<List<String>> { private static ForkJoinTest forkJoinTest; private int threshold; //閾值 private List<String> list; //待拆分List private ForkJoinTest(List<String> list, int threshold) { this.list = list; this.threshold = threshold; } @Override protected List<String> compute() { // 當end與start之間的差小于閾值時,開始進行實際篩選 if (list.size() < threshold) { return list.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); } else { // 如果當end與start之間的差大于閾值時,將大任務分解成兩個小任務。 int middle = list.size() / 2; List<String> leftList = list.subList(0, middle); List<String> rightList = list.subList(middle, list.size()); ForkJoinTest left = new ForkJoinTest(leftList, threshold); ForkJoinTest right = new ForkJoinTest(rightList, threshold); // 并行執行兩個“小任務” left.fork(); right.fork(); // 把兩個“小任務”的結果合并起來 List<String> join = left.join(); join.addAll(right.join()); return join; } } /** * 獲取ForkJoinTest實例 * @param list 待處理List * @param threshold 閾值 * @return ForkJoinTest實例 */ public static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) { if (forkJoinTest == null) { synchronized (ForkJoinTest.class) { if (forkJoinTest == null) { forkJoinTest = new ForkJoinTest(list, threshold); } } } return forkJoinTest; } }
3. 執行類
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; /** * Description: Fork/Join執行類 * Designer: jack * Date: 2017/8/3 * Version: 1.0.0 */ public class Test { public static void main(String args[]) throws ExecutionException, InterruptedException { String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te", "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"}; List<String> stringList = new ArrayList<>(Arrays.asList(strings)); ForkJoinPool pool = new ForkJoinPool(); ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20); // 提交可分解的ForkJoinTask任務 ForkJoinTask<List<String>> future = pool.submit(forkJoinService); System.out.println(future.get()); // 關閉線程池 pool.shutdown(); } }
關于如何分析Java的Fork/Join并發框架就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。