您好,登錄后才能下訂單哦!
為了使用Beam,首先必須使用Beam SDKs其中一個SDK里面的類創建一個驅動程序。驅動程序定義了管道,包括所有的輸入,轉換以及輸出。它還為您的管道設置了執行選項(通常使用命令行選項傳遞)。這些包括管道運行器,又決定了管道運行的后端。
Beam SDK提供了許多簡化大規模分布式數據處理的機制的抽象。相同的Beam抽象在批處理和流數據源中都可以使用。當創建Beam管道時,您可以根據這些抽象思考數據處理任務。他們包括:
l 管道Pipeline: Pipeline
從頭到尾封裝整個數據處理任務。包括讀取輸入數據,轉換數據和寫入輸出數據。所有Beam驅動程序必須創建一個Pipeline
。創建Pipeline
時,還必須指定執行選項,告訴Pipeline
在哪里以及如何運行。
l PCollection: PCollection
表示Beam管道運行操作的分布式數據集。數據集可以是有界的,這意味著它來自像文件這樣的固定的源,或者是×××的,這意味著它來自訂閱或其他機制持續不斷更新的源。您的管道通常通過從外部數據源讀取數據創建PCollection
初始值,但也可以從驅動程序中的內存中的數據來創建PCollection
。因此,PCollections
是管道中每個步驟的輸入和輸出。
l 轉換Transform: Transform
代表管道中的數據處理操作或步驟。每個Transform
都需要一個或多個PCollection
對象作為輸入,在PCollection
對象的元素上執行您提供的處理函數,并生成一個或多個輸出PCollection
對象。
l I/O Source 和Sink: Beam提供Source
和Sink
APIs分別表示讀取和寫入數據。Source
封裝從一些外部來源(如云端文件存儲或訂閱流式數據源)將數據讀取到Beam管道所需的代碼。Sink
同樣封裝將PCollection
的元素寫入外部數據源所需的代碼。
一個典型的Beam驅動程序的工作原理如下:
l 創建一個Pipeline對象并設置管道執行選項,包括管道運行器。
l 為管道數據創建PCollection的初始值,使用Source API從外部源讀取數據,或使用Create轉換從內存中的數據構建PCollection。
l 對每個PCollection應用轉換。轉換可以改變,過濾,分組,分析或以其他方式處理PCollection中的元素。轉換會創建一個新輸出的PCollection,而不消費輸入的集合。典型的管道依次將后續轉換應用于每個新輸出的PCollection,直到處理完成。
l 輸出最終的轉換的PCollection,通常使用Sink API將數據寫入外部源。
l 使用指定的管道運行器運行管道。
當運行Beam驅動程序時,指定的管道運行器將根據創建的管道的工作流圖,基于您創建的PCollection對象已經應用的轉換。然后使用適當的分布式處理后端執行該圖形,成為后端異步的“作業”(或等效的)。
Pipeline
抽象封裝了數據處理任務的所有數據和步驟。Beam驅動程序通常從構建一個Pipeline對象開始,然后使用該對象作為創建管道數據集PCollection
和以及Transform
s操作基礎。
要使用Beam,驅動程序必須首先創建Beam SDK Pipeline
類的實例(通常在main()
函數中)。創建Pipeline
時,還需要設置一些配置選項。可以以編程方式設置管道的配置選項,但提前設置選項(或從命令行讀取)通常更容易,并在創建對象時將其傳遞給Pipeline
對象。
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
使用管道選項來配置管道的不同方面,例如將要執行管道的管道運行器以及所選運行器所需的任何特定配置。管道選項可能包含諸如項目ID或存儲文件位置等信息。
當您在所選擇的運行程序上運行管道時,PipelineOptions的副本將可用于您的代碼。例如,可以從DoFn的上下文中讀取PipelineOptions。
PipelineOptions options = context.getPipelineOptions();
命令行參數設置PipelineOptions
可以通過創建PipelineOptions
對象并直接設置字段來配置管道,Beam SDK包含一個命令行解析器,可以使用此解析器解析命令行參數后設置PipelineOptions
字段。
要從命令行讀取選項,構建PipelineOptions
對象,如以下示例代碼所示:
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().create();
將解釋遵循以下格式的命令行參數:
--<option>=<value>
注意:附加方法.withValidation
將檢查所需的命令行參數并驗證參數值。
以這種方式構建PipelineOptions
,可以將任何選項指定為命令行參數。
注意:該WordCount示例管道指明了在運行時如何使用命令行選項來設置管道選項。
除了標準PipelineOptions
之外,還可以添加自己的自定義選項。要添加自定義選項,請為每個選項定義一個帶有getter和setter方法的接口,如以下示例所示:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
還可以指定描述和默認值,當用戶使用--help
作為命令行參數傳遞時會顯示它們。
使用注釋設置描述和默認值,如下所示:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
建議在創建PipelineOptions對象時將自定義的接口注冊到PipelineOptionsFactory。當把自定義的接口注冊到PipelineOptionsFactory之后,--help可以找到自定義的選項接口,并將其添加到--help命令的輸出。PipelineOptionsFactory還將驗證自定義選項與所有其他注冊的選項是否兼容。
以下示例代碼顯示如何注冊自定義選項接口到PipelineOptionsFactory:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
現在管道可以接受--myCustomOption=value
作為一個命令行參數。
所述PCollection抽象表示潛在分布式,多元素的數據集。可以把PCollection
認為是“管道”的數據; Beam轉換使用PCollection
對象作為輸入和輸出。因此,如果要處理管道中的數據,則必須采用PCollection
的形式。
創建Pipeline
完畢后,需要先創建至少一個某種形式的PCollection
。創建的PCollection
作為管道中第一個操作的輸入。
創建PCollection
對象實例,可以通過使用Beam的Source API從外部數據源讀取數據,也可以在驅動程序中存儲在內存集合類中的數據。前者通常是生產環境下管道獲取數據的方式; Beam的Source API包含多種適配器,可以從外部來源(如大型的基于云的文件,數據庫或訂閱服務)中讀取。后者主要用于測試和調試的目的。
要從外部源讀取,需要使用Beam提供的I/O適配器之一。適配器的具體用途不同,但它們都讀取某些外部數據源,并返回PCollection
,它的元素表示該源中的數據記錄。
每個數據源適配器都有一個Read
轉換; 如果要讀取,必須將該轉換應用于Pipeline
對象本身。例如,讀取外部文本文件并返回其元素類型為String的PCollection
,每個String表示文本文件中的一行。如下是如何將TextIO.Read
應用到Pipeline
以便創建一個PCollection
:
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
}
請參閱I/O部分,了解有關如何閱讀Beam SDK支持各種數據源的更多信息。
要從內存中的Java Collection
創建PCollection
,可以使用Beam提供的Create
轉換。很像數據適配器Read
,可以將Create
直接應用于Pipeline
對象本身。
Create
接受Java Collection
和一個Coder
對象作為參數,在Coder
指定的Collection
中的元素如何編碼。
以下示例代碼顯示了如何從內存中的List
創建PCollection
:
public static void main(String[] args) {
// Create a Java Collection, in this case a List of Strings.
static final List<String> LINES = Arrays.asList(
"To be, or not to be: that is the question: ",
"Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune, ",
"Or to take arms against a sea of troubles, ");
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Apply Create, passing the list and the coder, to create the PCollection.
p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}
PCollection
由創建它的特定Pipeline
對象擁有; 多個管道不能共享同一個PCollection
。在某些方面,PCollection
的功能就像一個集合類。然而,PCollection
在幾個關鍵方面有所不同:
PCollection
的元素可以是任何類型的,但是必須都是相同的類型。然而,為了支持分布式處理,Beam需要能夠將每個單獨的元素編碼為字節串(byte string)(因此元素可以傳遞給分布式Worker)。Beam SDK提供了一種數據編碼機制,包括常用類型的內置編碼,以及根據需要指定的自定義編碼支持。
PCollection
是不可變的,一旦創建后,無法添加,刪除或更改單個元素。Beam轉換可以處理PCollection
中每個元素并生成新的管道數據(作為新的PCollection
),但不消費或修改原始的輸入集合。
PCollection
不支持隨機訪問單個元素。相反,Beam 轉換可以單獨考慮PCollection
的每個元素。
PCollection
是一個大的,不可變的元素的“包”。一個PCollection
可以包含的元素數量沒有上限; 任何給定的PCollection
也許適合單個機器上的內存,或者它可以表示非常大的持久存儲的分布式數據集。
PCollection
可以是有界的也可以是×××的。有界的 PCollection
代表已知的固定大小的數據集,而×××的PCollection
代表無限大小的數據集。PCollection
是有界還是×××取決于它代表的源數據集。從批量數據源(如文件或數據庫)讀取可創建有界的PCollection
。從流或持續更新的數據源(如Pub/Sub或Kafka)讀取會創建一個×××的PCollection
(除非明確告訴它不要)。
PCollection
有界(或×××)的性質影響Beam如何處理它的數據。可以使用批處理作業處理有界的PCollection
,可以一次讀取整個數據集,并在有限長度的作業中執行處理。必須使用連續運行的流式作業來處理×××的PCollection
,因為整個集合不會一次都可用于處理。
當對×××PCollection
的元素進行分組操作時,Beam需要一個稱為窗口的概念,將不斷更新的數據集劃分為有限大小的邏輯窗口。Beam將每個窗口處理作為一個bundle,隨著數據集的生成,處理將持續進行。這些邏輯窗口由與數據元素相關聯的一些特性(諸如時間戳)來確定。
PCollection
中的每個元素都具有與其相關聯的內在時間戳。每個元素的時間戳記最初由創建的PCollection
源分配。創建×××PCollection
的源通常會為每個新元素分配元素被讀取或添加時相對應的時間戳。
注意:PCollection
為固定數據集創建有界限的源也會自動分配時間戳,但最常見的行為是為每個元素分配相同的時間戳(Long.MIN_VALUE
)。
時間戳對于包含固有時間概念元素的PCollection
很有用。如果管道正在讀取一系列事件,如推文或其他社交媒體消息,則每個元素可能會將事件發布的時間用作元素時間戳。
可以手動將時間戳分配給PCollection
的某個元素,如果源不為元素分配時間戳。如果元素具有固有的時間戳,但是時間戳在元素本身的結構中(例如服務器日志條目中的“時間”字段),則需要執行此操作。Beam有轉換操作,其把PCollection
作為輸入且輸出與附加的時間戳完全相同的PCollection
; 有關如何執行此操作的詳細信息,請參閱分配時間戳。
在Beam的SDK中,轉換是管道中的操作。轉換采用PCollection
(或多個PCollection
)作為輸入,在該集合中的每個元素上執行指定的操作,并生成新的輸出PCollection
。要調用轉換,必須將其應用于輸入PCollection
。
Beam SDK包含許多不同的轉換,可以將其應用于管道的PCollection
。這些轉換包括通用的核心轉換,如ParDo或Combine。還包括SDK中包含的預編寫的復合變換,它們將一個或多個核心轉換組合在一個有用的處理模式中,例如計數或組合集合中的元素。還可以定義自己的更復雜的復合轉換,以適應管道確切的用例。
Beam SDK中的每個轉換都有通用的apply
方法。調用多個Beam轉換類似于方法鏈,但有一點不同之處:將轉換應用于輸入PCollection
,將轉換本身作為參數傳遞,操作返回輸出PCollection
。一般形式如下:
[Output PCollection] = [Input PCollection].apply([Transform])
由于Beam對PCollection
使用通用的apply
方法,因此可以依次鏈接轉換,也可以應用包含嵌套的其他轉換的轉換(在Beam SDK中稱為復合轉換)。
如何應用管道的轉換決定了管道的結構。最好方法就是把管道作為一個有向無環圖,其中節點是PCollection
s,邊是轉換。例如,可以以鏈式轉換的方式來創建順序管道,如下所示:
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
上述管道的生成工作流程圖如下所示:[順序圖圖形] [Sequential Graph Graphic]
但是,請注意,轉換不消費或以其他方式更改輸入集合 - 記住,PCollection
根據定義是不可變的。這意味著您可以將多個轉換應用于同一個輸入PCollection
以創建分支管道,如下所示:
[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])
上述管道的生成工作流程圖如下所示:[分支圖圖形] [Branching Graph Graphic]
還可以構建自己的復合變換,即將多個子步驟嵌套在單個更大的變換中。復合變換對于構建可在許多不同地方使用的可重用的簡單步驟序列特別有用。
Beam SDK中的轉換提供了通用的處理框架,可以以函數對象(俗稱“用戶代碼”)的形式提供處理邏輯。用戶代碼應用于輸入的PCollection
的元素。用戶代碼的實例可能會由集中群的許多不同的Worker并行執行,具體取決于選擇執行Beam管道的管道運行器和后端。在每個Worker上運行的用戶代碼生成的輸出元素最終添加到轉換產生的最終輸出PCollection
中。
Beam提供以下轉換,每個轉換代表不同的處理范例:
l ParDo
l GroupByKey
l Combine
l Flatten 和 Partition
ParDo
是用于通用并行處理的Beam轉換。ParDo
的處理范例類似于map/shuffle/reduce形式的算法中的“Map”操作:一個ParDo
轉換考慮到了輸入PCollection
中的每個元素,在該元素上執行一些處理函數(用戶代碼),并發送0個,1個或多個元素到輸出PCollection
。
ParDo
可用于各種常見的數據處理操作,包括:
l 過濾數據集。可以使用ParDo
來過慮PCollection
中的每個元素,并將該元素輸出到新集合,或者將其丟棄。
l 格式化或轉換數據集中的每個元素。如果輸入PCollection
包含與想要的不同類型或格式的元素,則可以使用ParDo
對每個元素執行轉換并將結果輸出到新的PCollection
。
l 提取數據集中每個元素的部分數據。如果PCollection
中的記錄帶有多個字段,例如,可以使用 ParDo
解析出將想要的字段輸出到新的PCollection
。
l 對數據集中的每個元素執行計算。可以使用ParDo
對PCollection
中的每個元素或某些確定元素進行簡單或復雜的計算,并將結果輸出為新的PCollection
。
在這樣的角色中,ParDo
是管道中一個通用的中間步驟。可以使用它從一組原始輸入記錄中提取某些字段,或將原始輸入轉換為不同的格式; 也可以使用ParDo
將處理后的數據轉換為適合輸出的格式,如數據庫表行或可打印字符串。
應用ParDo
變換時,需要以DoFn
對象的形式提供用戶代碼。DoFn
是一個定義分布式處理功能的Beam SDK類。
當創建一個DoFn的子類時,請注意該子類應遵守4.3節編寫Beam轉換用戶代碼的要求。
像所有Beam轉換一樣,應用ParDo
轉換可以通過在輸入PCollection
上調用apply
方法和把ParDo
作為參數來傳遞,如以下示例代碼所示:
// The input PCollection of Strings.
PCollection<String> words = ...;
// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
ParDo.of(
new ComputeWordLengthFn())); // The DoFn to perform on each element, which
// we define above.
在示例中,輸入PCollection
包含String
值。應用一個ParDo
轉換,即指定一個函數(ComputeWordLengthFn
)來計算每個字符串的長度,并將結果輸出到一個新的PCollection
中,且它的元素為Integer
類型存儲每個字的長度的值。
傳遞給ParDo
的DoFn
對象包含應用于輸入集合中的元素的處理邏輯。當使用Beam時,通常寫出的最重要的代碼片就是這些DoFn
,是它們定義了管道的確切數據處理任務是什么。
注意:創建DoFn
時,請注意4.3節編寫Beam轉換用戶代碼的要求,并確保您的代碼遵循它們。
DoFn
一次處理輸入PCollection
中的一個元素。創建DoFn
的子類時,需要提供與輸入和輸出元素類型匹配的類型參數。如果DoFn
處理傳入String
類型的元素并生成Integer
類型的輸出集合的元素(如之前的示例ComputeWordLengthFn
),則類聲明將如下所示:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
在DoFn
的子類中,編寫一個帶有@ProcessElement
注釋的方法,其中提供實際的處理邏輯。不需要從輸入集合中手動提取元素; Beam SDK可以處理。注釋為@ProcessElement
方法應該接受的對象類型ProcessContext
。該ProcessContext
對象允許訪問輸入元素和發出輸出元素的方法:
static class ComputeWordLengthFn extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the input element from ProcessContext.
String word = c.element();
// Use ProcessContext.output to emit the output element.
c.output(word.length());
}
}
注意:如果輸入PCollection
中的元素是鍵/值對,則可以分別使用ProcessContext.element().getKey()
或鍵訪問鍵或值ProcessContext.element().getValue()
。
給定的DoFn
實例通常被調用一次或多次來處理一些任意的元素組。但是,Beam不能保證確切的調用次數; 可以在給定的工作節點上多次調用它,以解決故障和重試。因此,您可以跨多個調用緩存信息到處理方法,但如果這樣做,請確保實現不依賴于調用數量。
在處理方法中,您還需要滿足一些不變性要求,以確保Beam和處理后端可以安全地序列化并緩存管道中的值。您的方法應符合以下要求:
您不應以任何方式修改ProcessContext.element()
或ProcessContext.sideInput()
(或從輸入集合傳入的元素)返回的元素。
使用ProcessContext.output()
或輸出值后ProcessContext.sideOutput()
,您不應以任何方式修改該值。
如果您的功能相對簡單,您可以ParDo
通過提供一個輕量級DoFn
的在線內容來簡化您的使用,作為匿名內部類實例。
下面是前面的例子,ParDo
用ComputeLengthWordsFn
,用DoFn
指定為匿名內部類的實例:
// The input PCollection.
PCollection<String> words = ...;
// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
"ComputeWordLengths", // the transform name
ParDo.of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().length());
}
}));
如果您ParDo
對輸出元素執行輸入元素的一對一映射,也就是說,對于每個輸入元素,它將應用一個產生一個輸出元素的函數,您可以使用更高級的變換。可以接受一個匿名的Java 8 lambda函數來進一步簡化。MapElementsMapElements
以下是使用以下示例:MapElements
// The input PCollection.
PCollection<String> words = ...;
// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
MapElements.into(TypeDescriptors.integers())
.via((String word) -> word.length()));
注意:您可以使用Java 8 lambda函數與其他幾個波束轉換,其中包括Filter
,FlatMapElements
,和Partition
。
GroupByKey
是一個用于處理鍵/值對集合的波束變換。這是一個并行還原操作,類似于Map / Shuffle / Reduce-style算法的Shuffle階段。輸入GroupByKey
是表示多重映射的鍵/值對的集合,其中集合包含具有相同鍵但具有不同值的多個對。給定這樣的集合,您可以使用GroupByKey
收集與每個唯一鍵相關聯的所有值。
GroupByKey
是匯總具有共同點的數據的好方法。例如,如果您有一個存儲客戶訂單記錄的集合,則可能需要將來自相同郵政編碼的所有訂單分組在一起(其中鍵/值對的“鍵”是郵政編碼字段,而“值“是記錄的剩余部分)。
我們來看一下GroupByKey
簡單例子的機制,其中我們的數據集由文本文件中的單詞和出現的行號組成。我們想將所有共享相同單詞(鍵)的行號(值)組合在一起,讓我們看到文本中出現特定單詞的所有位置。
我們的輸入是一個PCollection
鍵/值對,其中每個單詞都是一個鍵,該值是該單詞出現在文件中的行號。以下是輸入集合中鍵/值對的列表:
cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...
GroupByKey
使用相同的密鑰收集所有值,并輸出一個新的對,其中包含唯一鍵和與輸入集合中的該關鍵字相關聯的所有值的集合。如果我們應用GroupByKey
到上面的輸入集合,輸出集合將如下所示:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
因此,GroupByKey
表示從多重映射(多個鍵到單個值)到單一映射(唯一鍵到值集合)的轉換。
CoGroupByKey
連接兩個或多個PCollection
具有相同鍵類型的鍵/值,然后發出一組KV<K, CoGbkResult>
對。設計您的流水線顯示使用連接的示例管道。
給出以下輸入集合:
// collection 1
user1, address1
user2, address2
user3, address3
// collection 2
user1, order1
user1, order2
user2, order3
guest, order4
...
CoGroupByKey
從所有PCollection
s中收集具有相同鍵的值,并輸出由唯一鍵和CoGbkResult
包含與該鍵相關聯的所有值的對象組成的新對。如果您應用于CoGroupByKey
上面的輸入集合,則輸出集合將如下所示:
user1, [[address1], [order1, order2]]
user2, [[address2], [order3]]
user3, [[address3], []]
guest, [[], [order4]]
...
關鍵/價值對的注意事項:根據您使用的語言和SDK,Beam代表鍵/值對略有不同。在Beam SDK for Java中,您可以使用類型對象來表示鍵/值對KV<K, V>
。在Python中,您使用2元組表示鍵/值對。
Combine
是一種用于組合數據中元素或值集合的波束變換。Combine
具有在整個PCollection
s 上工作的變體,并且一些組合PCollection
鍵/值對中的每個鍵的值。
應用Combine
變換時,必須提供包含用于組合元素或值的邏輯的函數。組合函數應該是可交換和關聯的,因為函數不一定在給定鍵的所有值上正確調用一次。由于輸入數據(包括值集合)可以分布在多個工作者之間,因此可以多次調用組合函數以在值集合的子集上執行部分組合。Beam SDK還提供了一些預構建的組合功能,用于常數數字組合操作,如sum,min和max。
簡單的組合操作(如和)通常可以實現為一個簡單的功能。更復雜的組合操作可能需要您創建一個CombineFn
具有與輸入/輸出類型不同的累加類型的子類。
以下示例代碼顯示了一個簡單的組合函數。
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
@Override
public Integer apply(Iterable<Integer> input) {
int sum = 0;
for (int item : input) {
sum += item;
}
return sum;
}
}
對于更復雜的組合函數,可以定義一個子類CombineFn
。您應該使用CombineFn
組合功能需要更復雜的累加器,必須執行額外的預處理或后處理,可能會更改輸出類型或將密鑰考慮在內。
一般組合操作由四個操作組成。創建子類時CombineFn
,必須通過覆蓋相應的方法來提供四個操作:
1. 創建累加器創建一個新的“本地”累加器。在示例情況下,取平均值,本地累加器跟蹤運行的值(我們的最終平均除法的分子值)和到目前為止的總和值(分母值)。它可以以分布式的方式被稱為任何次數。
2. Add Input將一個輸入元素添加到累加器,返回累加器值。在我們的例子中,它會更新總和并增加計數。也可以并行調用它。
3. 合并累加器將多個累加器合并到單個累加器中; 這是在最終計算之前如何組合多個累加器中的數據。在平均平均計算的情況下,表示劃分的每個部分的累加器被合并在一起。它的輸出可能再次被呼叫多次。
4. 提取輸出執行最終計算。在計算平均值的情況下,這意味著將所有值的組合和除以求和的數量。在最終合并的累加器上調用一次。
以下示例代碼顯示如何定義一個CombineFn
計算平均值的平均值:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public static class Accum {
int sum = 0;
int count = 0;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, Integer input) {
accum.sum += input;
accum.count++;
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
@Override
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
}
如果您正在組合PCollection
鍵值對,則每鍵合并通常就足夠了。如果您需要根據密鑰更改組合策略(例如,某些用戶的MIN和其他用戶的MIN),則可以KeyedCombineFn
在組合策略中定義一個訪問密鑰。
使用全局組合將給定中的所有元素PCollection
轉換成單個值,在您的流水線中表示為新的PCollection
包含一個元素。以下示例代碼顯示了如何應用Beam提供的sum combine函數為一個PCollection
整數產生一個總和值。
// Sum.SumIntegerFn() combines the elements in the input PCollection.
// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()));
如果您的輸入PCollection
使用默認的全局窗口,則默認行為是返回PCollection
包含一個項目。該項的值來自在應用時指定的合并函數中的累加器Combine
。例如,提供的sum組合函數返回零值(空輸入的和),而min組合函數返回最大或無限值。
有Combine
,而不是返回一個空PCollection
當輸入為空,指定.withoutDefaults
當你申請你的Combine
變換,如下面的代碼示例:
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
如果您PCollection
使用任何非全局窗口函數,則Beam不提供默認行為。應用時,您必須指定以下選項之一Combine
:
指定.withoutDefaults
輸入PCollection
集合中輸入中為空的窗口同樣為空。
指定.asSingletonView
,其中輸出立即轉換為a PCollectionView
,當為邊輸入時,將為每個空窗口提供默認值。通常情況下,如果將管道的結果Combine
用作后面的邊輸入,通常只需要使用此選項。
在創建密鑰分組的集合(例如,通過使用GroupByKey
轉換)之后,公共模式是將與每個密鑰相關聯的值的集合組合成單個合并的值。根據以前的例子GroupByKey
,一個鍵分組的PCollection
調用groupedWords
看起來像這樣:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
在上文中PCollection
,每個元素都有一個字符串鍵(例如“cat”)和一個可迭代的整數(在第一個元素中包含[1,5,9])。如果我們的管道的下一個處理步驟組合了這些值(而不是單獨考慮它們),則可以組合整數的迭代,以創建與每個鍵配對的單個合并值。這種模式,GroupByKey
然后合并值的集合相當于Beam的Combine PerKey轉換。Combine PerKey提供的組合函數必須是關聯縮減函數或子類CombineFn
。
// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
salesRecords.apply(Combine.<String, Double, Double>perKey(
new Sum.SumDoubleFn()));
// The combined value is of a different type than the original collection of values per key.
// PCollection has keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
new MeanInts())));
Flatten
并且是存儲相同數據類型的對象的Beam變換。合并多個對象到一個單一的邏輯,和分割一個單一成固定數量的更小的集合。Partition
PCollectionFlattenPCollectionPCollectionPartitionPCollection
以下示例顯示如何應用Flatten
轉換來合并多個PCollection
對象。
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
缺省情況下,輸出編碼器PCollection
是一樣的編碼器,用于在第一PCollection
輸入PCollectionList
。但是,輸入PCollection
對象可以使用不同的編碼器,只要它們都以您所選擇的語言包含相同的數據類型即可。
當使用Flatten
合并PCollection
具有應用窗口策略的PCollection
對象時,要合并的所有對象必須使用兼容的窗口策略和窗口大小。例如,您合并的所有集合必須全部使用(假設)相同的5分鐘固定窗口或每30秒鐘啟動4分鐘滑動窗口。
如果您的管道嘗試使用不兼容的窗口Flatten
合并PCollection
對象,則IllegalStateException
當您的管道構建時,Beam會生成錯誤。
PartitionPCollection
根據您提供的分區功能劃分一個元素。分區功能包含確定如何將輸入元素分割PCollection
成每個生成的分區的邏輯PCollection
。分區數必須在圖形構建時確定。例如,您可以在運行時將分區數作為命令行選項傳遞(然后用于構建流水線圖),但是您無法確定中間管道中的分區數(基于以后計算的數據)例如,您的流水線圖是構建的)。
以下示例將a PCollection
分為百分位組。
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。