91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

FLINK SIDDHI ADDON 學習筆記

發布時間:2020-07-17 13:51:17 來源:網絡 閱讀:16216 作者:zhanjia 欄目:大數據

SIDDHI 是一款功能強大的CEP 引擎,具有自己的DSL,豐富的模式匹配功能和可擴展性, 感謝陳浩同學提供了SIDDHI和FLINK的整合功能 https://github.com/haoch/flink-siddhi 本文主要介紹了這個ADDON的一些實現思路

  1. 將FLINK STREAM 轉化為 SIDDHI STREAM 定義

??用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)

??通過 FlinkDataStream.getType 獲得流對象的類型定義.registerStream方法會構造一個 SiddhiStreamSchema 對象,根據流對象的類型定義,自動獲取每個field對應的數據類型存在內部的fieldTypes數組中.

??SiddhiStreamSchema 內部會創建一個Siddhi StreamDefinition對象, StreamDefinition的attribute的定義根據fieldNames + fieldTypes 來添加.SiddhiTypeFactory.getAttributeType 負責將Flink 的數據類型映射為Siddhi的數據類型, 并可自動生成一段Define Stream的定義(見 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])

??SiddhiStreamSchema 包括一個StreamSerializer: 用于將DataStream 中的對象轉化為 Siddhi Stream 的輸入(Object Array):
????如果流對象是一個簡單類型 Atomic Type 直接將流對象放到 ARRAY中
????如果流對象是Tuple 類型,直接將Tuple 中前N個值放到ARRAY中
????如果流對象是Pojo或者CaseClass類型,直接根據每個fieldName 獲取Class對應的屬性放到ARRAY中

  1. 串聯FLINK STREAM 和 SIDDHI STREAM

??SiddhiStream: 抽象的Stream基類

??convertDataStream 將原始的FLINK流轉化為Tuple類型的流,Tuple的第一個元素為StreamId, 第二個元素為原來流中的數據,支持普通Stream 和 KeyedStream

??ExecutionSiddhiStream: 構建 SiddhiOperatorContext 并調用SiddhiStreamFactory.createDataStream 創建了集成Siddhi的 DataStream. DataStream的類型為Tuple的子類.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通過Siddhi輸出Stream的StreamDefinition獲得其Attribute的定義,再通過 TypeInfoParser.parse構造Flink Tuple 類型的定義

??ExecutableStream 根據Siddhi query 創建ExecutionSiddhiStream對象
??SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子類,支持Fluent Style的鏈式調用. UnionSiddhiStream 調用了DataStream.union 方法

??SiddhiStreamFactory.createDataStream 通過 FLINK DataStream的transform方法使用了自定義的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中創建SiddhiManager 和 SiddhiAppRuntime 并注冊了 InputHandler 和 OutputCallback (StreamOutputHandler)

??SiddhiStreamOperator.processElement 需要處理兩種場景:
????Flink TimeCharacteristic = ProcessingTime: 先調用StreamSerializer將數據轉化為Object Array, 再直接調用InputHandler.send將數據發送給Siddhi處理
????Flink TimeCharacteristic = EventTime: 緩存接收到的StreamRecord 到內部的priorityQueue中,直到收到Watermark, 將priorityQueue中小于watermark的StreamRecord一次發送給Siddhi處理

  StreamOutputHandler:根據Output的TypeInfo將Siddhi Event 轉化為 Flink StreamRecord. 再轉發到SiddhiStreamOperator的Output

  1. CHECKPOINT

??SiddhiStreamOperator中保留了兩種State信息,一種是priorityQueue中保存的由于watermark未發送給Siddhi的消息. 另一種是Siddhi本身的State, 通過SiddhiAppRuntime.snapshot() 獲得

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

刚察县| 佛冈县| 乾安县| 岳普湖县| 德庆县| 小金县| 昭平县| 离岛区| 方山县| 华坪县| 遂昌县| 嘉鱼县| 长垣县| 商丘市| 斗六市| 辛集市| 惠州市| 阿拉善左旗| 哈尔滨市| 安岳县| 循化| 渭源县| 新郑市| 新余市| 中卫市| 德钦县| 大埔县| 杨浦区| 开阳县| 漳州市| 白河县| 哈尔滨市| 泸水县| 许昌市| 阿图什市| 新干县| 马公市| 绥化市| 乐昌市| 晋宁县| 五常市|