您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Java 如何實現協程,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
協程(Coroutine)這個詞其實有很多叫法,比如有的人喜歡稱為纖程(Fiber),或者綠色線程(GreenThread)。其實究其本質,對于協程最直觀的解釋是線程的線程。雖然讀上去有點拗口,但本質上就是這樣。
協程的核心在于調度那塊由他來負責解決,遇到阻塞操作,立刻放棄掉,并且記錄當前棧上的數據,阻塞完后立刻再找一個線程恢復棧并把阻塞的結果放到這個線程上去跑,這樣看上去好像跟寫同步代碼沒有任何差別,這整個流程可以稱為coroutine,而跑在由coroutine負責調度的線程稱為Fiber。
java協程的實現
早期,在JVM上實現協程一般會使用kilim,不過這個工具已經很久不更新了,現在常用的工具是Quasar,而本文章會全部基于Quasar來介紹。
下面嘗試通過Quasar來實現類似于go語言的coroutine以及channel。
為了能有明確的對比,這里先用go語言實現一個對于10以內自然數分別求平方的例子。
func counter(out chan<- int) { for x := 0; x < 10; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { //定義兩個int類型的channel naturals := make(chan int) squares := make(chan int) //產生兩個Fiber,用go關鍵字 go counter(naturals) go squarer(squares, naturals) //獲取計算結果 printer(squares) }
上面這個例子,通過channel兩解耦兩邊的數據共享。對于這個channel,大家可以理解為Java里的SynchronousQueue。下面我直接上Quasar版JAVA代碼的,幾乎可以原封不動的復制go語言的代碼。
public class Example { private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException { Integer v; while ((v = in.receive()) != null) { System.out.println(v); } } public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { //定義兩個Channel Channel<Integer> naturals = Channels.newChannel(-1); Channel<Integer> squares = Channels.newChannel(-1); //運行兩個Fiber實現. new Fiber(() -> { for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> { Integer v; while ((v = naturals.receive()) != null) squares.send(v * v); squares.close(); }).start(); printer(squares); } }
兩者對比,看上去Java似好像更復雜些,沒辦法這就是Java的風格,而且這還是通過第三方的庫來實現的。
說到這里各位肯定對Fiber很好奇了。也許你會表示懷疑Fiber是不是如上面所描述的那樣,下面我們嘗試用Quasar建立一百萬個Fiber,看看內存占用多少,我先嘗試了創建百萬個Thread。
for (int i = 0; i < 1_000_000; i++) { new Thread(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
很不幸,直接報Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,這是情理之中的。下面是通過Quasar建立百萬個Fiber。
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { int FiberNumber = 1_000_000; CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(0); for (int i = 0; i < FiberNumber; i++) { new Fiber(() -> { counter.incrementAndGet(); if (counter.get() == FiberNumber) { System.out.println("done"); } Strand.sleep(1000000); }).start(); } latch.await(); }
我這里加了latch,阻止程序跑完就關閉,Strand.sleep其實跟Thread.sleep一樣,只是這里針對的是Fiber。
最終控制臺是可以輸出done的,說明程序已經創建了百萬個Fiber,設置Sleep是為了讓Fiber一直運行,從而方便計算內存占用。官方宣稱一個空閑的Fiber大約占用400Byte,那這里應該是占用400MB堆內存,但是這里通過jmap -heap pid顯示大約占用了1000MB,也就是說一個Fiber占用1KB。
Quasar是怎么實現Fiber的
其實Quasar實現的coroutine的方式與Go語言很像,只不過前者是使用框架來實現,而go語言則是語言內置的功能。
不過如果你熟悉了Go語言的調度機制的話,那么對于Quasar的調度機制就會好理解很多了,因為兩者有很多相似之處。
Quasar里的Fiber其實是一個continuation,他可以被Quasar定義的scheduler調度,一個continuation記錄著運行實例的狀態,而且會被隨時中斷,并且也會隨后在他被中斷的地方恢復。
Quasar其實是通過修改bytecode來達到這個目的,所以運行Quasar程序的時候,你需要先通過java-agent在運行時修改你的代碼,當然也可以在編譯期間這么干。go語言的內置了自己的調度器,而Quasar則是默認使用ForkJoinPool這個具有work-stealing功能的線程池來當調度器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動態的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。
那這里你會問了,Quasar怎么知道修改哪些字節碼呢,其實也很簡單,Quasar會通過java-agent在運行時掃描哪些方法是可以中斷的,同時會在方法被調用前和調度后的方法內插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調用該注解的方法做類似下面的事情。
這里假設你在方法f上定義了@Suspendable,同時去調用了有同樣注解的方法g,那么所有調用f的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在未來可以動態的恢復。(Fiber類似線程也有自己的棧)。在suspendable方法鏈內Fiber的父類會調用Fiber.park,這樣會拋出SuspendExecution異常,從而來停止線程的運行,好讓Quasar的調度器執行調度。這里的SuspendExecution會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(調度器層面會去調用Fiber.unpark),那么f會在被中斷的地方重新被調用(這里Fiber會知道自己在哪里被中斷),同時會把g的調用結果(g會return結果)插入到f的恢復點,這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。
上面說了一大堆,其實簡單點來講就是,想辦法讓運行中的線程棧停下來,然后讓Quasar的調度器介入。
JVM線程中斷的條件有兩個:
1、拋異常
2、return。
而在Quasar中,一般就是通過拋異常的方式來達到的,所以你會看到上面的代碼會拋出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,所以一般會這么寫。
@Suspendable public int f() { try { // do some stuff return g() * 2; } catch(SuspendExecution s) { //這里不應該捕獲到異常. throw new AssertionError(s); } }
以上就是Java 如何實現協程,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。