您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關JVM中Java和Scala并發性基礎是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
處理器速度數十年來一直持續快速發展,并在世紀交替之際走到了終點。從那時起,處理器制造商更多地是通過增加核心來提高芯片性能,而不再通過增加時鐘速率來提高芯片性能。多核系統現在成為了從手機到企業服務器等所有設備的標準,而這種趨勢可能繼續并有所加速。開發人員越來越需要在他們的應用程序代碼中支持多個核心,這樣才能滿足性能需求。
在本系列文章中,您將了解一些針對 Java 和 Scala 語言的并發編程的新方法,包括 Java 如何將 Scala 和其他基于 JVM 的語言中已經探索出來的理念結合在一起。***期文章將介紹一些背景,通過介紹 Java 7 和 Scala 的一些***技術,幫助了解 JVM 上的并發編程的全景。您將了解如何使用 Java ExecutorService
和 ForkJoinPool
類來簡化并發編程。還將了解一些將并發編程選項擴展到純 Java 中的已有功能之外的基本 Scala 特性。在此過程中,您會看到不同的方法對并發編程性能有何影響。后續幾期文章將會介紹 Java 8 中的并發性改進和一些擴展,包括用于執行可擴展的 Java 和 Scala 編程的 Akka 工具包。
在 Java 平臺誕生之初,并發性支持就是它的一個特性,線程和同步的實現為它提供了超越其他競爭語言的優勢。Scala 基于 Java 并在 JVM 上運行,能夠直接訪問所有 Java 運行時(包括所有并發性支持)。所以在分析 Scala 特性之前,我首先會快速回顧一下 Java 語言已經提供的功能。
在 Java 編程過程中創建和使用線程非常容易。它們由 java.lang.Thread
類表示,線程要執行的代碼為 java.lang.Runnable
實例的形式。如果需要的話,可以在應用程序中創建大量線程,您甚至可以創建數千個線程。在有多個核心時,JVM 使用它們來并發執行多個線程;超出核心數量的線程會共享這些核心。
Java 從一開始就包含對線程和同步的支持。但在線程間共享數據的最初規范不夠完善,這帶來了 Java 5 的 Java 語言更新中的重大變化 (JSR-133)。Java Language Specification for Java 5 更正并規范化了 synchronized
和 volatile
操作。該規范還規定不變的對象如何使用多線程。(基本上講,只要在執行構造函數時不允許引用 “轉義”,不變的對象始終是線程安全的。)以前,線程間的交互通常需要使用阻塞的 synchronized
操作。這些更改支持使用 volatile
在線程間執行非阻塞協調。因此,在 Java 5 中添加了新的并發集合類來支持非阻塞操作 — 這與早期僅支持阻塞的線程安全方法相比是一項重大改進。
線程操作的協調難以讓人理解。只要從程序的角度讓所有內容保持一致,Java 編譯器和 JVM 就不會對您代碼中的操作重新排序,這使得問題變得更加復雜。例如:如果兩個相加操作使用了不同的變量,編譯器或 JVM 可以安裝與指定的順序相反的順序執行這些操作,只要程序不在兩個操作都完成之前使用兩個變量的總數。這種重新排序操作的靈活性有助于提高 Java 性能,但一致性只被允許應用在單個線程中。硬件也有可能帶來線程問題。現代系統使用了多種緩存內存級別,一般來講,不是系統中的所有核心都能同樣看到這些緩存。當某個核心修改內存中的一個值時,其他核心可能不會立即看到此更改。
由于這些問題,在一個線程使用另一個線程修改的數據時,您必須顯式地控制線程交互方式。Java 使用了特殊的操作來提供這種控制,在不同線程看到的數據視圖中建立順序。基本操作是,線程使用 synchronized
關鍵字來訪問一個對象。當某個線程在一個對象上保持同步時,該線程將會獲得此對象所獨有的一個鎖的獨占訪問。如果另一個線程已持有該鎖,等待獲取該鎖的線程必須等待,或者被阻塞,直到該鎖被釋放。當該線程在一個 synchronized
代碼塊內恢復執行時,Java 會保證該線程可以 “看到了” 以前持有同一個鎖的其他線程寫入的所有數據,但只是這些線程通過離開自己的 synchronized
鎖來釋放該鎖之前寫入的數據。這種保證既適用于編譯器或 JVM 所執行的操作的重新排序,也適用于硬件內存緩存。一個 synchronized
塊的內部是您代碼中的一個穩定性孤島,其中的線程可依次安全地執行、交互和共享信息。
在變量上對 volatile
關鍵字的使用,為線程間的安全交互提供了一種稍微較弱的形式。synchronized
關鍵字可確保在您獲取該鎖時可以看到其他線程的存儲,而且在您之后,獲取該鎖的其他線程也會看到您的存儲。volatile
關鍵字將這一保證分解為兩個不同的部分。如果一個線程向volatile
變量寫入數據,那么首先將會擦除它在這之前寫入的數據。如果某個線程讀取該變量,那么該線程不僅會看到寫入該變量的值,還會看到寫入的線程所寫入的其他所有值。所以讀取一個 volatile
變量會提供與輸入 一個 synchronized
塊相同的內存保證,而且寫入一個volatile
變量會提供與離開 一個 synchronized
塊相同的內存保證。但二者之間有很大的差別:volatile
變量的讀取或寫入絕不會受阻塞。
同步很有用,而且許多多線程應用程序都是在 Java 中僅使用基本的 synchronized
塊開發出來的。但協調線程可能很麻煩,尤其是在處理許多線程和許多塊的時候。確保線程僅在安全的方式下交互并 避免潛在的死鎖(兩個或更多線程等待對方釋放鎖之后才能繼續執行),這很困難。支持并發性而不直接處理線程和鎖的抽象,這為開發人員提供了處理常見用例的更好方法。
java.util.concurrent
分層結構包含一些集合變形,它們支持并發訪問、針對原子操作的包裝器類,以及同步原語。這些類中的許多都是為支持非阻塞訪問而設計的,這避免了死鎖的問題,而且實現了更高效的線程。這些類使得定義和控制線程之間的交互變得更容易,但他們仍然面臨著基本線程模型的一些復雜性。
java.util.concurrent
包中的一對抽象,支持采用一種更加分離的方法來處理并發性:Future<T>
接口、Executor
和ExecutorService
接口。這些相關的接口進而成為了對 Java 并發性支持的許多 Scala 和 Akka 擴展的基礎,所以更詳細地了解這些接口和它們的實現是值得的。
Future<T>
是一個 T
類型的值的持有者,但奇怪的是該值一般在創建 Future
之后才能使用。正確執行一個同步操作后,才會獲得該值。收到Future
的線程可調用方法來:
查看該值是否可用
等待該值變為可用
在該值可用時獲取它
如果不再需要該值,則取消該操作
Future
的具體實現結構支持處理異步操作的不同方式。
Executor
是一種圍繞某個執行任務的東西的抽象。這個 “東西” 最終將是一個線程,但該接口隱藏了該線程處理執行的細節。Executor
本身的適用性有限,ExecutorService
子接口提供了管理終止的擴展方法,并為任務的結果生成了 Future
。Executor
的所有標準實現還會實現ExecutorService
,所以實際上,您可以忽略根接口。
線程是相對重量級的資源,而且與分配并丟棄它們相比,重用它們更有意義。ExecutorService
簡化了線程間的工作共享,還支持自動重用線程,實現了更輕松的編程和更高的性能。ExecutorService
的 ThreadPoolExecutor
實現管理著一個執行任務的線程池。
并發性的實際應用常常涉及到需要與您的主要處理邏輯獨立的外部交互的任務(與用戶、存儲或其他系統的交互)。這類應用很難濃縮為一個簡單的示例,所以在演示并發性的時候,人們通常會使用簡單的計算密集型任務,比如數學計算或排序。我將使用一個類似的示例。
任務是找到離一個未知的輸入最近的已知單詞,其中的最近 是按照Levenshtein 距離 來定義的:將輸入轉換為已知的單詞所需的最少的字符增加、刪除或更改次數。我使用的代碼基于 Wikipedia 上的 Levenshtein 距離 文章中的一個示例,該示例計算了每個已知單詞的 Levenshtein 距離,并返回***匹配值(或者如果多個已知的單詞擁有相同的距離,那么返回結果是不確定的)。
清單 1 給出了計算 Levenshtein 距離的 Java 代碼。該計算生成一個矩陣,將行和列與兩個對比的文本的大小進行匹配,在每個維度上加 1。為了提高效率,此實現使用了一對大小與目標文本相同的數組來表示矩陣的連續行,將這些數組包裝在每個循環中,因為我只需要上一行的值就可以計算下一行。
/** * Calculate edit distance from targetText to known word. * * @param word known word * @param v0 int array of length targetText.length() + 1 * @param v1 int array of length targetText.length() + 1 * @return distance */ private int editDistance(String word, int[] v0, int[] v1) { // initialize v0 (prior row of distances) as edit distance for empty 'word' for (int i = 0; i < v0.length; i++) { v0[i] = i; } // calculate updated v0 (current row distances) from the previous row v0 for (int i = 0; i < word.length(); i++) { // first element of v1 = delete (i+1) chars from target to match empty 'word' v1[0] = i + 1; // use formula to fill in the rest of the row for (int j = 0; j < targetText.length(); j++) { int cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1; v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost); } // swap v1 (current row) and v0 (previous row) for next iteration int[] hold = v0; v0 = v1; v1 = hold; } // return final value representing best edit distance return v0[targetText.length()]; }
如果有大量已知詞匯要與未知的輸入進行比較,而且您在一個多核系統上運行,那么您可以使用并發性來加速處理:將已知單詞的集合分解為多個塊,將每個塊作為一個獨立任務來處理。通過更改每個塊中的單詞數量,您可以輕松地更改任務分解的粒度,從而了解它們對總體性能的影響。清單 2 給出了分塊計算的 Java 代碼,摘自 示例代碼 中的 ThreadPoolDistance
類。清單 2 使用一個標準的 ExecutorService
,將線程數量設置為可用的處理器數量。
private final ExecutorService threadPool; private final String[] knownWords; private final int blockSize; public ThreadPoolDistance(String[] words, int block) { threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); knownWords = words; blockSize = block; } public DistancePair bestMatch(String target) { // build a list of tasks for matching to ranges of known words List<DistanceTask> tasks = new ArrayList<DistanceTask>(); int size = 0; for (int base = 0; base < knownWords.length; base += size) { size = Math.min(blockSize, knownWords.length - base); tasks.add(new DistanceTask(target, base, size)); } DistancePair best; try { // pass the list of tasks to the executor, getting back list of futures List<Future<DistancePair>> results = threadPool.invokeAll(tasks); // find the best result, waiting for each future to complete best = DistancePair.WORST_CASE; for (Future<DistancePair> future: results) { DistancePair result = future.get(); best = DistancePair.best(best, result); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return best; } /** * Shortest distance task implementation using Callable. */ public class DistanceTask implements Callable<DistancePair> { private final String targetText; private final int startOffset; private final int compareCount; public DistanceTask(String target, int offset, int count) { targetText = target; startOffset = offset; compareCount = count; } private int editDistance(String word, int[] v0, int[] v1) { ... } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ @Override public DistancePair call() throws Exception { // directly compare distances for comparison words in range int[] v0 = new int[targetText.length() + 1]; int[] v1 = new int[targetText.length() + 1]; int bestIndex = -1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(knownWords[i + startOffset], v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); } }
清單 2 中的 bestMatch()
方法構造一個 DistanceTask
距離列表,然后將該列表傳遞給 ExecutorService
。這種對 ExecutorService
的調用形式將會接受一個 Collection<? extends Callable<T>>
類型的參數,該參數表示要執行的任務。該調用返回一個 Future<T>
列表,用它來表示執行的結果。ExecutorService
使用在每個任務上調用 call()
方法所返回的值,異步填寫這些結果。在本例中,T
類型為DistancePair
— 一個表示距離和匹配的單詞的簡單的值對象,或者在沒有找到惟一匹配值時近表示距離。
bestMatch()
方法中執行的原始線程依次等待每個 Future
完成,累積***的結果并在完成時返回它。通過多個線程來處理 DistanceTask
的執行,原始線程只需等待一小部分結果。剩余結果可與原始線程等待的結果并發地完成。
要充分利用系統上可用的處理器數量,必須為 ExecutorService
配置至少與處理器一樣多的線程。您還必須將至少與處理器一樣多的任務傳遞給ExecutorService
來執行。實際上,您或許希望擁有比處理器多得多的任務,以實現***的性能。這樣,處理器就會繁忙地處理一個接一個的任務,近在***才空閑下來。但是因為涉及到開銷(在創建任務和 future 的過程中,在任務之間切換線程的過程中,以及最終返回任務的結果時),您必須保持任務足夠大,以便開銷是按比例減小的。
圖 1 展示了我在使用 Oracle 的 Java 7 for 64-bit Linux® 的四核 AMD 系統上運行測試代碼時測量的不同任務數量的性能。每個輸入單詞依次與 12,564 個已知單詞相比較,每個任務在一定范圍的已知單詞中找到***的匹配值。全部 933 個拼寫錯誤的輸入單詞會重復運行,每輪運行之間會暫停片刻供 JVM 處理,該圖中使用了 10 輪運行后的***時間。從圖 1 中可以看出,每秒的輸入單詞性能在合理的塊大小范圍內(基本來講,從 256 到大于 1,024)看起來是合理的,只有在任務變得非常小或非常大時,性能才會極速下降。對于塊大小 16,384,***的值近創建了一個任務,所以顯示了單線程性能。
ThreadPoolDistance
性能Java 7 引入了 ExecutorService
的另一種實現:ForkJoinPool
類。ForkJoinPool
是為高效處理可反復分解為子任務的任務而設計的,它使用 RecursiveAction
類(在任務未生成結果時)或 RecursiveTask<T>
類(在任務具有一個 T
類型的結果時)來處理任務。RecursiveTask<T>
提供了一種合并子任務結果的便捷方式,如清單 3 所示。
RecursiveTask<DistancePair>
示例private ForkJoinPool threadPool = new ForkJoinPool(); private final String[] knownWords; private final int blockSize; public ForkJoinDistance(String[] words, int block) { knownWords = words; blockSize = block; } public DistancePair bestMatch(String target) { return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords)); } /** * Shortest distance task implementation using RecursiveTask. */ public class DistanceTask extends RecursiveTask<DistancePair> { private final String compareText; private final int startOffset; private final int compareCount; private final String[] matchWords; public DistanceTask(String from, int offset, int count, String[] words) { compareText = from; startOffset = offset; compareCount = count; matchWords = words; } private int editDistance(int index, int[] v0, int[] v1) { ... } /* (non-Javadoc) * @see java.util.concurrent.RecursiveTask#compute() */ @Override protected DistancePair compute() { if (compareCount > blockSize) { // split range in half and find best result from bests in each half of range int half = compareCount / 2; DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords); t1.fork(); DistanceTask t2 = new DistanceTask(compareText, startOffset + half, compareCount - half, matchWords); DistancePair p2 = t2.compute(); return DistancePair.best(p2, t1.join()); } // directly compare distances for comparison words in range int[] v0 = new int[compareText.length() + 1]; int[] v1 = new int[compareText.length() + 1]; int bestIndex = -1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(i + startOffset, v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); } }
圖 2 顯示了清單 3 中的 ForkJoin
代碼與 清單 2 中的 ThreadPool
代碼的性能對比。ForkJoin
代碼在所有塊大小中穩定得多,僅在您只有單個塊(意味著執行是單線程的)時性能會顯著下降。標準的 ThreadPool
代碼僅在塊大小為 256 和 1,024 時會表現出更好的性能。
ThreadPoolDistance
與 ForkJoinDistance
的性能對比這些結果表明,如果可調節應用程序中的任務大小來實現***的性能,那么使用標準 ThreadPool
比 ForkJoin
更好。但請注意,ThreadPool
的 “***性能點” 取決于具體任務、可用處理器數量以及您系統的其他因素。一般而言,ForkJoin
以最小的調優需求帶來了優秀的性能,所以***盡可能地使用它。
Scala 通過許多方式擴展了 Java 編程語言和運行時,其中包括添加更多、更輕松的處理并發性的方式。對于初學者而言,Future<T>
的 Scala 版本比 Java 版本靈活得多。您可以直接從代碼塊中創建 future,可向 future 附加回調來處理這些 future 的完成。清單 4 顯示了 Scala future 的一些使用示例。該代碼首先定義了 futureInt()
方法,以便按需提供 Future<Int>
,然后通過三種不同的方式來使用 future。
Future<T>
示例代碼import ExecutionContext.Implicits.global val lastInteger = new AtomicInteger def futureInt() = future { Thread sleep 2000 lastInteger incrementAndGet } // use callbacks for completion of futures val a1 = futureInt val a2 = futureInt a1.onSuccess { case i1 => { a2.onSuccess { case i2 => println("Sum of values is " + (i1 + i2)) } } } Thread sleep 3000 // use for construct to extract values when futures complete val b1 = futureInt val b2 = futureInt for (i1 <- b1; i2 <- b2) yield println("Sum of values is " + (i1 + i2)) Thread sleep 3000 // wait directly for completion of futures val c1 = futureInt val c2 = futureInt println("Sum of values is " + (Await.result(c1, Duration.Inf) + Await.result(c2, Duration.Inf)))
清單 4 中的***個示例將回調閉包附加到一對 future 上,以便在兩個 future 都完成時,將兩個結果值的和打印到控制臺上。回調是按照創建它們的順序直接嵌套在 future 上,但是,即使更改順序,它們也同樣有效。如果在您附加回調時 future 已完成,該回調仍會運行,但無法保證它會立即運行。原始執行線程會在 Thread sleep 3000
行上暫停,以便在進入下一個示例之前完成 future。
第二個示例演示了使用 Scala for
comprehension 從 future 中異步提取值,然后直接在表達式中使用它們。for
comprehension 是一種 Scala 結構,可用于簡潔地表達復雜的操作組合(map
、filter
、flatMap
和 foreach
)。它一般與各種形式的集合結合使用,但 Scala future 實現了相同的單值方法來訪問集合值。所以可以使用 future 作為一種特殊的集合,一種包含最多一個值(可能甚至在未來某個時刻之前之后才包含該值)的集合。在這種情況下,for
語句要求獲取 future 的結果,并在表達式中使用這些結果值。在幕后,這種技術會生成與***個示例完全相同的代碼,但以線性代碼的形式編寫它會得到更容易理解的更簡單的表達式。和***個示例一樣,原始執行線程會暫停,以便在進入下一個示例之前完成 future。
第三個示例使用阻塞等待來獲取 future 的結果。這與 Java future 的工作原理相同,但在 Scala 中,一個獲取***等待時間參數的特殊Await.result()
方法調用會讓阻塞等待變得更為明顯。
清單 4 中的代碼沒有顯式地將 future 傳遞給 ExecutorService
或等效的對象,所以如果沒有使用過 Scala,那么您可能想知道 future 內部的代碼是如何執行的。答案取決于 清單 4 中最上面一行:import ExecutionContext.Implicits.global
。Scala API 常常為代碼塊中頻繁重用的參數使用 implicit
值。future { }
結構要求 ExecutionContext
以隱式參數的形式提供。這個 ExecutionContext
是 JavaExecutorService
的一個 Scala 包裝器,以相同方式用于使用一個或多個托管線程來執行任務。
除了 future 的這些基本操作之外,Scala 還提供了一種方式將任何集合轉換為使用并行編程的集合。將集合轉換為并行格式后,您在集合上執行的任何標準的 Scala 集合操作(比如 map
、filter
或 fold
)都會自動地盡可能并行完成。(本文稍后會在 清單 7 中提供一個相關示例,該示例使用 Scala 查找一個單詞的***匹配值。)
Java 和 Scala 中的 future 都必須解決錯誤處理的問題。在 Java 中,截至 Java 7,future 可拋出一個 ExecutionException
作為返回結果的替代方案。應用程序可針對具體的失敗類型而定義自己的 ExecutionException
子類,或者可連鎖異常來傳遞詳細信息,但這限制了靈活性。
Scala future 提供了更靈活的錯誤處理。您可以通過兩種方式完成 Scala future:成功時提供一個結果值(假設要求一個結果值),或者在失敗時提供一個關聯的 Throwable
。您也可以采用多種方式處理 future 的完成。在 清單 4 中,onSuccess
方法用于附加回調來處理 future 的成功完成。您還可以使用 onComplete
來處理任何形式的完成(它將結果或 throwable 包裝在一個 Try
中來適應兩種情況),或者使用 onFailure
來專門處理錯誤結果。Scala future 的這種靈活性擴展到了您可以使用 future 執行的所有操作,所以您可以將錯誤處理直接集成到代碼中。
這個 Scala Future<T>
還有一個緊密相關的 Promise<T>
類。future 是一個結果的持有者,該結果在某個時刻可能可用(或不可用 — 無法內在地確保一個 future 將完成)。future 完成后,結果是固定的,不會發生改變。promise 是這個相同契約的另一端:結果的一個一次性、可分配的持有者,具有結果值或 throwable 的形式。可從 promise 獲取 future,在 promise 上設置了結果后,就可以在該 future 上設置此結果。
現在您已熟悉一些基本的 Scala 并發性概念,是時候來了解一下解決 Levenshtein 距離問題的代碼了。清單 5 顯示了 Levenshtein 距離計算的一個比較符合語言習慣的 Scala 實現,該代碼基本上與 清單 1 中的 Java 代碼類似,但采用了函數風格。
val limit = targetText.length /** Calculate edit distance from targetText to known word. * * @param word known word * @param v0 int array of length targetText.length + 1 * @param v1 int array of length targetText.length + 1 * @return distance */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { val length = word.length @tailrec def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = { if (rnum >= length) r0(limit) else { // first element of r1 = delete (i+1) chars from target to match empty 'word' r1(0) = rnum + 1 // use formula to fill in the rest of the row for (j <- 0 until limit) { val cost = if (word(rnum) == targetText(j)) 0 else 1 r1(j + 1) = min(r1(j) + 1, r0(j + 1) + 1, r0(j) + cost); } // recurse with arrays swapped for next row distanceByRow(rnum + 1, r1, r0) } } // initialize v0 (prior row of distances) as edit distance for empty 'word' for (i <- 0 to limit) v0(i) = i // recursively process rows matching characters in word being compared to find best distanceByRow(0, v0, v1) }
清單 5 中的代碼對每個行值計算使用了尾部遞歸 distanceByRow()
方法。此方法首先檢查計算了多少行,如果該數字與檢查的單詞中的字符數匹配,則返回結果距離。否則會計算新的行值,然后遞歸地調用自身來計算下一行(將兩個行數組包裝在該進程中,以便正確地傳遞新的***的行值)。Scala 將尾部遞歸方法轉換為與 Java while
循環等效的代碼,所以保留了與 Java 代碼的相似性。
但是,此代碼與 Java 代碼之間有一個重大區別。清單 5 中的 for
comprehension 使用了閉包。閉包并不總是得到了當前 JVM 的高效處理(參閱Why is using for/foreach on a Range slow?,了解有關的詳細信息),所以它們在該計算的最里層循環上增加了大量開銷。如上所述,清單 5 中的代碼的運行速度沒有 Java 版本那么快。清單 6 重寫了代碼,將 for
comprehension 替換為添加的尾部遞歸方法。這個版本要詳細得多,但執行效率與 Java 版本相當。
val limit = targetText.length /** Calculate edit distance from targetText to known word. * * @param word known word * @param v0 int array of length targetText.length + 1 * @param v1 int array of length targetText.length + 1 * @return distance */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { val length = word.length @tailrec def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = { if (row >= length) r0(limit) else { // first element of v1 = delete (i+1) chars from target to match empty 'word' r1(0) = row + 1 // use formula recursively to fill in the rest of the row @tailrec def distanceByColumn(col: Int): Unit = { if (col < limit) { val cost = if (word(row) == targetText(col)) 0 else 1 r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost) distanceByColumn(col + 1) } } distanceByColumn(0) // recurse with arrays swapped for next row distanceByRow(row + 1, r1, r0) } } // initialize v0 (prior row of distances) as edit distance for empty 'word' @tailrec def initArray(index: Int): Unit = { if (index <= limit) { v0(index) = index initArray(index + 1) } } initArray(0) // recursively process rows matching characters in word being compared to find best distanceByRow(0, v0, v1) }
清單 7 給出的 Scala 代碼執行了與 清單 2 中的 Java 代碼相同的阻塞的距離計算。bestMatch()
方法找到由 Matcher
類實例處理的特定單詞塊中與目標文本最匹配的單詞,使用尾部遞歸 best()
方法來掃描單詞。*Distance
類創建多個 Matcher
實例,每個對應一個單詞塊,然后協調匹配結果的執行和組合。
class Matcher(words: Array[String]) { def bestMatch(targetText: String) = { val limit = targetText.length val v0 = new Array[Int](limit + 1) val v1 = new Array[Int](limit + 1) def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { ... } @tailrec /** Scan all known words in range to find best match. * * @param index next word index * @param bestDist minimum distance found so far * @param bestMatch unique word at minimum distance, or None if not unique * @return best match */ def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair = if (index < words.length) { val newDist = editDistance(words(index), v0, v1) val next = index + 1 if (newDist < bestDist) best(next, newDist, Some(words(index))) else if (newDist == bestDist) best(next, bestDist, None) else best(next, bestDist, bestMatch) } else DistancePair(bestDist, bestMatch) best(0, Int.MaxValue, None) } } class ParallelCollectionDistance(words: Array[String], size: Int) extends TimingTestBase { val matchers = words.grouped(size).map(l => new Matcher(l)).toList def shutdown = {} def blockSize = size /** Find best result across all matchers, using parallel collection. */ def bestMatch(target: String) = { matchers.par.map(m => m.bestMatch(target)). foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m)) } } class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase { val matchers = words.grouped(size).map(l => new Matcher(l)).toList def shutdown = {} def blockSize = size /** Find best result across all matchers, using direct blocking waits. */ def bestMatch(target: String) = { import ExecutionContext.Implicits.global val futures = matchers.map(m => future { m.bestMatch(target) }) futures.foldLeft(DistancePair.worstMatch)((a, v) => DistancePair.best(a, Await.result(v, Duration.Inf))) } }
清單 7 中的兩個 *Distance
類顯示了協調 Matcher
結果的執行和組合的不同方式。ParallelCollectionDistance
使用前面提到的 Scala 的并行集合 feature 來隱藏并行計算的細節,只需一個簡單的 foldLeft
就可以組合結果。
DirectBlockingDistance
更加明確,它創建了一組 future,然后在該列表上為每個結果使用一個 foldLeft
和嵌套的阻塞等待。
清單 7 中的兩個 *Distance
實現都是處理 Matcher
結果的合理方法。(它們不僅合理,而且非常高效。示例代碼 包含我在試驗中嘗試的其他兩種實現,但未包含在本文中。)在這種情況下,性能是一個主要問題,所以圖 3 顯示了這些實現相對于 Java ForkJoin
代碼的性能。
ForkJoinDistance
與 Scala 替代方案的性能對比圖 3 顯示,Java ForkJoin
代碼的性能比每種 Scala 實現都更好,但 DirectBlockingDistance
在 1,024 的塊大小下提供了更好的性能。兩種 Scala 實現在大部分塊大小下,都提供了比 清單 1 中的 ThreadPool
代碼更好的性能。
這些性能結果僅是演示結果,不具權威性。如果您在自己的系統上運行計時測試,可能會看到不同的性能,尤其在使用不同數量的核心的時候。如果希望為距離任務獲得***的性能,那么可以實現一些優化:可以按照長度對已知單詞進行排序,首先與長度和輸入相同的單詞進行比較(因為編輯距離總是不低于與單詞長度之差)。或者我可以在距離計算超出之前的***值時,提前退出計算。但作為一個相對簡單的算法,此試驗公平地展示了兩種并發操作是如何提高性能的,以及不同的工作共享方法的影響。
在性能方面,清單 7 中的 Scale 控制代碼與 清單 2 和 清單 3 中的 Java 代碼的對比結果很有趣。Scala 代碼短得多,而且(假設您熟悉 Scala!)比 Java 代碼更清晰。Scala 和 Java 可很好的相互操作,您可以在本文的 完整示例代碼 中看到:Scala 代碼對 Scala 和 Java 代碼都運行了計時測試,Java 代碼進而直接處理 Scala 代碼的各部分。得益于這種輕松的互操作性,您可以將 Scala 引入現有的 Java 代碼庫中,無需進行通盤修改。最初使用 Scala 為 Java 代碼實現高水平控制常常很有用,這樣您就可以充分利用 Scala 強大的表達特性,同時沒有閉包或轉換的任何重大性能影響。
清單 7 中的 ParallelCollectionDistance
Scala 代碼的簡單性非常具有吸引力。使用此方法,您可以從代碼中完全抽象出并發性,從而編寫類似單線程應用程序的代碼,同時仍然獲得多個處理器的優勢。幸運的是,對于喜歡此方法的簡單性但又不愿意或無法執行 Scala 開發的人而言,Java 8 帶來了一種執行直接的 Java 編程的類似特性。
現在您已經了解了 Java 和 Scala 并發性操作的基礎知識,Java 8 的許多改動您看起來可能都很熟悉(Scala 并發性特性中使用的許多相同的概念都包含在 Java 8 中),所以您很快就能夠在普通的 Java 代碼中使用一些 Scala 技術。
以上就是JVM中Java和Scala并發性基礎是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。