您好,登錄后才能下訂單哦!
這篇文章主要講解了“DAG實現任務調度以及優化”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“DAG實現任務調度以及優化”吧!
github:https://github.com/smartxing/algorithm
1 有向圖的構建
DAG dag = new DAG(); dag.addVertex("A"); dag.addVertex("B"); dag.addVertex("C"); dag.addVertex("D"); dag.addEdge("A", "B"); dag.addEdge("A", "C"); System.out.println(dag);
2 拓撲排序檢測圖中是否有環
public boolean isCircularity() { Set<Object> set = inDegree.keySet(); //入度表 Map<Object, AtomicInteger> inDegree = set.stream().collect(Collectors .toMap(k -> k, k -> new AtomicInteger(this.inDegree.get(k).size()))); //入度為0的節點 Set sources = getSources(); LinkedList<Object> queue = new LinkedList(); queue.addAll(sources); while (!queue.isEmpty()) { Object o = queue.removeFirst(); outDegree.get(o) .forEach(so -> { if (inDegree.get(so).decrementAndGet() == 0) { queue.add(so); } }); } return inDegree.values().stream().filter(x -> x.intValue() > 0).count() > 0; }
3 stage優化
eg 如果任務存在如下的關系 , task1 執行完后執行 task2 ,task2 執行完后執行task3 ... Task1 -> Task2 -> Task3 -> Task4 這些task 本來就要串行執行的 可以把這些task 打包在一塊 減少線程上下文的切換 eg : 復雜一點的DAG: /** * H * \ * G * \ * A -> B * \ * C- D -E - F-> J * * * * 優化后得 ==> * * (H,G) * \ * A -> B * \ * (C,D,E) - (F,J) * */ 詳見chain方法: 關鍵代碼如下 private void chain_(Set sources, final LinkedHashSetMultimap foutChain, final LinkedHashSetMultimap finChain) { sources.forEach(sourceNode -> { ArrayList<Object> maxStage = Lists.newArrayList(); findMaxStage(sourceNode, maxStage); if (maxStage.size() > 1) { //存在需要合并的stage addVertex(foutChain, finChain, maxStage);//添加一個新節點 Object o = maxStage.get(maxStage.size() - 1); //最后一個節點 reChain_(foutChain, finChain, maxStage, o); } if (maxStage.size() == 1) { //不存在需要合并的stage addVertex(foutChain, finChain, sourceNode);//添加一個新節點 Set subNodes = outDegree.get(sourceNode); addSubNodeage(foutChain, finChain, sourceNode, subNodes); } }); } 4 測試DAG 執行 測試程序: 詳見 DAGExecTest 1 新建一個task 只打印一句話 public static class Task implements Runnable { private String taskName; public Task(final String taskName) { this.taskName = taskName; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("i am running my name is " + taskName + " finish ThreadID: " + Thread.currentThread().getId()); } public String getTaskName() { return taskName; } @Override public String toString() { return taskName; } } 2 構建DAG DAG dag = DAG.create(); Task a = new Task("a"); Task b = new Task("b"); Task c = new Task("c"); Task d = new Task("d"); Task e = new Task("e"); Task f = new Task("f"); Task g = new Task("g"); Task h = new Task("h"); Task j = new Task("j"); dag.addVertex(a); dag.addVertex(b); dag.addVertex(c); dag.addVertex(d); dag.addVertex(e); dag.addVertex(f); dag.addVertex(g); dag.addVertex(h); dag.addVertex(j); dag.addEdge(h, g); dag.addEdge(g, b); dag.addEdge(a, b); dag.addEdge(b, f); dag.addEdge(c, d); dag.addEdge(d, e); dag.addEdge(e, f); dag.addEdge(f, j); 構建完成后如圖 * H * \ * G * \ * A -> B * \ * C- D -E - F-> J 3 stage 切分 DAG chain = dag.chain(); 執行完圖入下: * (H,G) * \ * A -> B * \ * (C,D,E) - (F,J) 4 執行 DAG DAGExecTest 最終結果打印如下如下: 可以發現有3個Stage stage1 包含3個task task分別在不同的線程里面執行 其中c-d-e g-c f-j是經過優化的在同一個線程里面執行,減少了不必要的上下文切換 i am running my name is a finish ThreadID: 10 i am running my name is c finish ThreadID: 11 i am running my name is h finish ThreadID: 12 i am running my name is d finish ThreadID: 11 i am running my name is g finish ThreadID: 12 i am running my name is e finish ThreadID: 11 stage 結束 : task detached:a, task chain c-d-e task chain h-g ----------------------------------------------- i am running my name is b finish ThreadID: 14 stage 結束 : task detached:b, ----------------------------------------------- i am running my name is f finish ThreadID: 11 i am running my name is j finish ThreadID: 11 stage 結束 : task chain f-j 測試執行關鍵代碼如下: chain.execute(col -> { Set set = (Set) col; List<CompletableFuture> completableFutures = Lists.newArrayList(); StringBuilder sb = new StringBuilder(); set.stream().forEach(x -> { if (x instanceof Task) { CompletableFuture<Void> future = CompletableFuture.runAsync((Task) x, executorService); completableFutures.add(future); sb.append(" task detached:" + ((Task) x).getTaskName()).append(","); } if (x instanceof List) { List<Task> taskList = (List) x; CompletableFuture<Void> future = CompletableFuture.runAsync(()-> taskList.forEach(Task::run)); completableFutures.add(future); sb.append( " task chain " + Joiner.on("-").join(taskList.stream().map(Task::getTaskName).collect(Collectors.toList()))); } }); CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join(); System.out.println("stage 結束 : " + sb.toString()); System.out.println("-----------------------------------------------"); });
感謝各位的閱讀,以上就是“DAG實現任務調度以及優化”的內容了,經過本文的學習后,相信大家對DAG實現任務調度以及優化這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。