您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“大數據開發中Flink-CEP怎么用”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“大數據開發中Flink-CEP怎么用”這篇文章吧。
總結就是:輸入-規則-輸出
就是單事件的自關聯,其實匹配的也是時間序列的
(1)定義 復合事件處理(Complex Event Processing,CEP)是一種基于動態環境中事件流的分析技術,事件在這 里通常是有意義的狀態變化,通過分析事件間的關系,利用過濾、關聯、聚合等技術,根據事件間的時序關系和聚合 關系制定檢測規則,持續地從事件流中查詢出符合要求的事件序列,最終分析得到更復雜的復合事件
(2)特征 CEP的特征如下: 目標:從有序的簡單事件流中發現一些高階特征; 輸入:一個或多個簡單事件構成的事件流; 處 理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件; 輸出:滿足規則的復雜事件
(3)功能
CEP用于分析低延遲、頻繁產生的不同來源的事件流。CEP可以幫助在復雜的、不相關的時間流中找出有 意義的模式和復雜的關系,以接近實時或準實時的獲得通知或組織一些行為。 CEP支持在流上進行模式匹配,根據模 式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當條件范圍內沒有達到滿足的條件 時,會導致模式匹配超時。 看起來很簡單,但是它有很多不同的功能: ① 輸入的流數據,盡快產生結果; ② 在2個 事件流上,基于時間進行聚合類的計算; ③ 提供實時/準實時的警告和通知; ④ 在多樣的數據源中產生關聯分析模 式; ⑤ 高吞吐、低延遲的處理 市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專 門的庫支持。然而,Flink提供了專門的CEP庫。
(4)主要組件 Flink為CEP提供了專門的Flink CEP library
它包含如下組件:Event Stream、Pattern定義、Pattern檢測和生成Alert。 首先,開發人員要在DataStream流上定義出模 式條件,之后Flink CEP引擎進行模式檢測,必要時生成警告。
(1)個體模式(Individual Patterns) 組成復雜規則的每一個單
獨的模式定義,就是個體模式。
start.times(3).where(_.behavior.startsWith(‘fav’))
(2)組合模式(Combining Patterns,也叫模式序列) 很多個體模式組合起來,就形成了整個的模式序列。 模式序列
必須以一個初始模式開始:
val start = Pattern.begin(‘start’)
(3)模式組(Group of Pattern) 將一個模式序列作為條件嵌套在個體模式里,成為一組模式
個體模式包括單例模式和循環模式。單例模式只接收一個事件,而循環模式可以接收多個事件,
(1)量詞 可以在一個個體模式后追加量詞,也就是指定循環次數。
// 匹配出現4次 start.time(4) // 匹配出現0次或4次 start.time(4).optional // 匹配出現2、3或4次 start.time(2,4) // 匹配出現2、3或4次,并且盡可能多地重復匹配 start.time(2,4).greedy // 匹配出現1次或多次 start.oneOrMore // 匹配出現0、2或多次,并且盡可能多地重復匹配 start.timesOrMore(2).optional.greedy
(2)條件 每個模式都需要指定觸發條件,作為模式是否接受事件進入的判斷依據。CEP中的個體模式主要通過調 用.where()、.or()和.until()來指定條件。按不同的調用方式,可以分成以下幾類: ① 簡單條件 通過.where()方法對事 件中的字段進行判斷篩選,決定是否接收該事件
start.where(event=>event.getName.startsWith(“foo”))
② 組合條件 將簡單的條件進行合并;or()方法表示或邏輯相連,where的直接組合就相當于與and。 Pattern.where(event => …/some condition/).or(event => /or condition/) ③ 終止條件 如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態。 ④ 迭代條件 能夠對模式之前所有接收的事件進行處理;調用.where((value,ctx) => {…}),可以調用 ctx.getEventForPattern(“name”)
(1)嚴格近鄰
所有事件按照嚴格的順序出現,中間沒有任何不匹配的事件,由.next()指定。例如對于模式“a next b”,事件序列“a,c,b1,b2”沒有匹配。 (2)寬松近鄰 允許中間出現不匹配的事件,由.followedBy()指定。例如對于模 式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。 (3)非確定性寬松近鄰 進一步放寬條件,之前已經匹配過 的事件也可以再次使用,由.followedByAny()指定。例如對于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為 {ab1},{a,b2}。 除了以上模式序列外,還可以定義“不希望出現某種近鄰關系”: .notNext():不想讓某個事件嚴格緊 鄰前一個事件發生。 .notFollowedBy():不想讓某個事件在兩個事件之間發生。 需要注意:
所有模式序列必須以.begin()開始;
模式序列不能以.notFollowedBy()結束;
“not”類型的模式不能被optional所修飾;
可以為模式指定時間約束,用來要求在多長時間內匹配有效。 next.within(Time.seconds(10))
定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配。調用CEP.pattern(),給定輸入流和模式,就能 得到一個PatternStream。
val input:DataStream[Event] = … val pattern:Pattern[Event,_] = … val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
創建PatternStream之后,就可以應用select或者flatSelect方法,從檢測到的事件序列中提取事件了。 select()方法 需要輸入一個select function作為參數,每個成功匹配的事件序列都會調用它。 select()以一個 Map[String,Iterable[IN]]來接收匹配到的事件序列,其中key就是每個模式的名稱,而value就是所有接收到的事件的 Iterable類型。
def selectFn(pattern : Map[String,Iterable[IN]]):OUT={ val startEvent = pattern.get(“start”).get.next val endEvent = pattern.get(“end”).get.next OUT(startEvent, endEvent) }
flatSelect通過實現PatternFlatSelectFunction實現與select相似的功能。唯一的區別就是flatSelect方法可以返回多條 記錄,它通過一個Collector[OUT]類型的參數來將要輸出的數據傳遞到下游
當一個模式通過within關鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理
這些超時的部分匹配,select和flatSelect API調用允許指定超時處理程序。
Flink CEP 開發流程:
DataSource 中的數據轉換為 DataStream;
定義 Pattern,并將 DataStream 和 Pattern 組合轉換為 PatternStream;
PatternStream 經過 select、process 等算子轉換為 DataStraem;
再次轉換的 DataStream 經過處理后,sink 到目標庫。
select方法: SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() { @Override public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception { return map.get("begin").get(0); } }, new PatternSelectFunction<PayEvent, PayEvent>() { @Override public PayEvent select(Map<String, List<PayEvent>> map) throws Exception { return map.get("pay").get(0); } });
對檢測到的模式序列應用選擇函數。對于每個模式序列,調用提供的{@link PatternSelectFunction}。模式選擇函數
只能產生一個結果元素。
對超時的部分模式序列應用超時函數。對于每個部分模式序列,調用提供的{@link PatternTimeoutFunction}。模式
超時函數只能產生一個結果元素。
您可以在使用相同的{@link OutputTag}進行select操作的{@link SingleOutputStreamOperator}上獲得由{@link
SingleOutputStreamOperator}生成的{@link SingleOutputStreamOperator}生成的超時數據流。
@param timedOutPartialMatchesTag 標識端輸出超時模式的@link OutputTag}
@param patternTimeoutFunction 為超時的每個部分模式序列調用的模式超時函數。
@param patternSelectFunction 為每個檢測到的模式序列調用的模式選擇函數。
@param 產生的超時元素的類型
@param 結果元素的類型
return {@link DataStream},其中包含產生的元素和在邊輸出中產生的超時元素。
DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);
獲取{@link DataStream},該{@link DataStream}包含由操作發出到指定{@link OutputTag}的邊輸出的元素
DataSource 中的數據轉換為 DataStream;watermark、keyby
定義 Pattern,并將 DataStream 和 Pattern 組合轉換為 PatternStream;
PatternStream 經過 sele ct、process 等算子轉換為 DataStream;
再次轉換的 DataStream 經過處理后,sink 到目標庫
FlinkCEP在運行時會將用戶的邏輯轉化成這樣的一個NFA Graph (nfa對象) 所以有限狀態機的工作過程,就是從開始狀態,根據不同的輸入,自動進行狀態轉換的過程
上圖中的狀態機的功能,是檢測二進制數是否含有偶數個 0。從圖上可以看出,輸入只有 1 和 0 兩種。從 S1 狀態開 始,只有輸入 0 才會轉換到 S2 狀態,同樣 S2 狀態下只有輸入 0 才會轉換到 S1。所以,二進制數輸入完畢,如果滿 足最終狀態,也就是最后停在 S1 狀態,那么輸入的二進制數就含有偶數個 0 大數據開發
以上是“大數據開發中Flink-CEP怎么用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。