您好,登錄后才能下訂單哦!
這篇文章主要介紹“Stream流水線的實現原理是什么”,在日常操作中,相信很多人在Stream流水線的實現原理是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Stream流水線的實現原理是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
前面我們已經學會如何使用Stream API
Lambda表達式和函數接口的關系。
Java集合框架(Collections)新加入的方法
Stream API基本用法
Stream規約操作用法
用起來真的很爽,但簡潔的方法下面似乎隱藏著無盡的秘密,如此強大的API是如何實現的呢?比如Pipeline是怎么執行的,每次方法調用都會導致一次迭代嗎?自動并行又是怎么做到的,線程個數是多少?本節我們學習Stream流水線的原理,這是Stream實現的關鍵所在。
首先回顧一下容器執行Lambda表達式的方式,以ArrayList.forEach()
方法為例,具體代碼如下:
我們看到ArrayList.forEach()
方法的主要邏輯就是一個for循環,在該for循環里不斷調用action.accept()
回調方法完成對元素的遍歷。這完全沒有什么新奇之處,回調方法在Java GUI的監聽器中廣泛使用。Lambda表達式的作用就是相當于一個回調方法,這很好理解。?
Stream API中大量使用Lambda表達式作為回調方法,但這并不是關鍵。理解Stream我們更關心的是另外兩個問題:流水線和自動并行。使用Stream或許很容易寫入如下形式的代碼:
上述代碼求出以字母A開頭的字符串的最大長度,一種直白的方式是為每一次函數調用都執一次迭代,這樣做能夠實現功能,但效率上肯定是無法接受的。類庫的實現著使用流水線(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中盡可能多的執行用戶指定的操作。為講解方便我們匯總了Stream的所有操作。
Stream上的所有操作分為兩類:中間操作和結束操作,中間操作只是一種標記,只有結束操作才會觸發實際計算。中間操作又可以分為無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操作是指元素的處理不受前面元素的影響,而有狀態的中間操作必須等到所有元素處理之后才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前并不能確定排序結果;結束操作又可以分為短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結果,比如找到第一個滿足條件的元素。之所以要進行如此精細的劃分,是因為底層對每一種情況的處理方式不同。為了更好的理解流的中間操作和終端操作,可以通過下面的兩段代碼來看他們的執行過程。
image.png
輸出為:A1B1C1 A2B2C2 A3B3C3 中間操作是懶惰的,也就是中間操作不會對數據做任何操作,直到遇到了最終操作。而最終操作,都是比較熱情的。他們會往前回溯所有的中間操作。也就是當執行到最后的forEach操作的時候,它會回溯到它的上一步中間操作,上一步中間操作,又會回溯到上上一步的中間操作,...,直到最初的第一步。第一次forEach執行的時候,會回溯peek 操作,然后peek會回溯更上一步的limit操作,然后limit會回溯更上一步的peek操作,頂層沒有操作了,開始自上向下開始執行,輸出:A1B1C1 第二次forEach執行的時候,然后會回溯peek 操作,然后peek會回溯更上一步的limit操作,然后limit會回溯更上一步的peek操作,頂層沒有操作了,開始自上向下開始執行,輸出:A2B2C2
... 當第四次forEach執行的時候,然后會回溯peek 操作,然后peek會回溯更上一步的limit操作,到limit的時候,發現limit(3)這個job已經完成,這里就相當于循環里面的break操作,跳出來終止循環。
再來看第二段代碼:
image.png
輸出為:A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9 第一次forEach執行的時候,會回溯peek操作,然后peek會回溯更上一步的skip操作,skip回溯到上一步的peek操作,頂層沒有操作了,開始自上向下開始執行,執行到skip的時候,因為執行到skip,這個操作的意思就是跳過,下面的都不要執行了,也就是就相當于循環里面的continue,結束本次循環。輸出:A1
第二次forEach執行的時候,會回溯peek操作,然后peek會回溯更上一步的skip操作,skip回溯到上一步的peek操作,頂層沒有操作了,開始自上向下開始執行,執行到skip的時候,發現這是第二次skip,結束本次循環。輸出:A2
...
第七次forEach執行的時候,會回溯peek操作,然后peek會回溯更上一步的skip操作,skip回溯到上一步的peek操作,頂層沒有操作了,開始自上向下開始執行,執行到skip的時候,發現這是第七次skip,已經大于6了,它已經執行完了skip(6)的job了。這次skip就直接跳過,繼續執行下面的操作。輸出:A7B7C7
...直到循環結束。
仍然考慮上述求最長字符串的程序,一種直白的流水線實現方式是為每一次函數調用都執一次迭代,并將處理中間結果放到某種數據結構中(比如數組,容器等)。具體說來,就是調用filter()
方法后立即執行,選出所有以A開頭的字符串并放到一個列表list1中,之后讓list1傳遞給mapToInt()
方法并立即執行,生成的結果放到list2中,最后遍歷list2找出最大的數字作為最終結果。程序的執行流程如如所示:
這樣做實現起來非常簡單直觀,但有兩個明顯的弊端:
迭代次數多。迭代次數跟函數調用的次數相等。
頻繁產生中間結果。每次函數調用都產生一次中間結果,存儲開銷無法接受。
這些弊端使得效率底下,根本無法接受。如果不使用Stream API我們都知道上述代碼該如何在一次迭代中完成,大致是如下形式:
image.png
采用這種方式我們不但減少了迭代次數,也避免了存儲中間結果,顯然這就是流水線,因為我們把三個操作放在了一次迭代當中。只要我們事先知道用戶意圖,總是能夠采用上述方式實現跟Stream API等價的功能,但問題是Stream類庫的設計者并不知道用戶的意圖是什么。如何在無法假設用戶行為的前提下實現流水線,是類庫的設計者要考慮的問題。
我們大致能夠想到,應該采用某種方式記錄用戶每一步的操作,當用戶調用結束操作時將之前記錄的操作疊加到一起在一次迭代中全部執行掉。沿著這個思路,有幾個問題需要解決:
用戶的操作如何記錄?
操作如何疊加?
疊加之后的操作如何執行?(后續專門針對問題點分析)
執行后的結果(如果有)在哪里?(后續專門針對問題點分析)
注意這里使用的是“操作(operation)”一詞,指的是“Stream中間操作”的操作,很多Stream操作會需要一個回調函數(Lambda表達式),因此一個完整的操作是<數據來源,操作,回調函數>構成的三元組。Stream中使用Stage的概念來描述一個完整的操作,并用某種實例化后的PipelineHelper來代表Stage,將具有先后順序的各個Stage連到一起,就構成了整個流水線。跟Stream相關類和接口的繼承關系圖示。
還有IntPipeline, LongPipeline, DoublePipeline沒在圖中畫出,這三個類專門為三種基本類型(不是包裝類型)而定制的,跟ReferencePipeline是并列關系。圖中Head用于表示第一個Stage,即調用調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage里不包含任何操作;StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應于無狀態和有狀態的中間操作。
Stream流水線組織結構示意圖如下:
圖中通過Collection.stream()
方法得到Head也就是stage0,緊接著調用一系列的中間操作,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一起,構成整個流水線,由于每個Stage都記錄了前一個Stage和本次的操作以及回調函數,依靠這種結構就能建立起對數據源的所有操作。這就是Stream記錄操作的方式。
以上只是解決了操作記錄的問題,要想讓流水線起到應有的作用我們需要一種將所有操作疊加到一起的方案。你可能會覺得這很簡單,只需要從流水線的head開始依次執行每一步的操作(包括回調函數)就行了。這聽起來似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底執行了哪種操作,以及回調函數是哪種形式。換句話說,只有當前Stage本身才知道該如何執行自己包含的動作。這就需要有某種協議來協調相鄰Stage之間的調用關系。
這種協議由Sink接口完成,Sink接口包含的方法如下表所示:
有了上面的協議,相鄰Stage之間調用就很方便了,每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調用后一個Stage的accept()
方法即可,并不需要知道其內部是如何處理的。當然對于有狀態的操作,Sink的begin()
和end()
方法也是必須實現的。比如Stream.sorted()是一個有狀態的中間操作,其對應的Sink.begin()方法可能創建一個盛放結果的容器,而accept()方法負責將元素添加到該容器,最后end()負責對容器進行排序。對于短路操作,Sink.cancellationRequested()
也是必須實現的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者盡快結束查找。Sink的四個接口方法常常相互協作,共同完成計算任務。實際上Stream API內部實現的的本質,就是如何重寫Sink的這四個接口方法。
有了Sink對操作的包裝,Stage之間的調用問題就解決了,執行時只需要從流水線的head開始對數據源依次調用每個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一種可能的Sink.accept()方法流程是這樣的:
Sink接口的其他幾個方法也是按照這種[處理->轉發]的模型實現。下面我們結合具體例子看看Stream的中間操作是如何將自身的操作包裝成Sink以及Sink是如何將處理結果轉發給下一個Sink的。先看Stream.map()方法:
上述代碼看似復雜,其實邏輯很簡單,就是將回調函數mapper包裝到一個Sink當中。由于Stream.map()是一個無狀態的中間操作,所以map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將得到一個包裝了當前回調函數的Sink。
再來看一個復雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態的中間操作,因為讀取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進入問題本質,sorted()方法是如何將操作封裝成Sink的呢?sorted()一種可能封裝的Sink代碼如下:
上述代碼完美的展現了Sink的四個接口方法是如何協同工作的:
首先begin()方法告訴Sink參與排序的元素個數,方便確定中間結果容器的的大小;
之后通過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷所有元素;
最后end()方法告訴Sink所有元素遍歷完畢,啟動排序步驟,排序完成后將結果傳遞給下游的Sink;
如果下游的Sink是短路操作,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否可以結束處理。
本文詳細介紹了Stream流水線的組織方式,后續會持續針對Stream流水線的執行過程進行詳細介紹。學習本文將有助于理解原理并寫出正確的Stream代碼,同時打消你對Stream API效率方面的顧慮。如你所見,Stream API實現如此巧妙,即使我們使用外部迭代手動編寫等價代碼,也未必更加高效。
到此,關于“Stream流水線的實現原理是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。