91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink sql-clent MATCH_RECOGNIZE kafka 例子

發布時間:2020-08-21 03:11:40 來源:網絡 閱讀:1120 作者:大海之中 欄目:大數據

環境 flink1.7.2

  1. 增加flink1.7.2 的lib 中的jar, 否則會報類找不到

    avro-1.8.2.jar            
    flink-connector-kafka-0.10_2.12-1.7.2.jar  
    flink-connector-kafka-base_2.12-1.7.2.jar  
    flink-json-1.7.2.jar         
    kafka-clients-0.11.0.0.jar
    flink-avro-1.7.2.jar      
    flink-connector-kafka-0.11_2.12-1.7.2.jar  
    flink-core-1.7.2.jar                       
    flink-python_2.12-1.7.2.jar  log4j-1.2.17.jar
    flink-cep_2.12-1.7.2.jar  
    flink-connector-kafka-0.9_2.12-1.7.2.jar   
    flink-dist_2.12-1.7.2.jar                  
    flink-table_2.12-1.7.2.jar   
    slf4j-log4j12-1.7.15.jar
  2. 修改 sql-client-defaults.yaml 中的table 值
tables:
  - name: myTable
    type: source
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: 0.11
      topic: im-message-topic2
      startup-mode: earliest-offset
      properties:
        - key: bootstrap.servers
          value: kafkaip:9092
        - key: group.id
          value: testGroup
    format:
      property-version: 1
      type: json
      schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)"
    schema:
      - name: sessionId
        type: STRING
      - name: fromUid
        type: STRING
      - name: toUid
        type: STRING        
      - name: chatType
        type: STRING
      - name: type
        type: STRING
      - name: msgId
        type: STRING
      - name: msg
        type: STRING            
      - name: rowTime
        type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "timestampSend"
          watermarks:
            type: "periodic-bounded"
            delay: "60"
      - name: procTime
        type: TIMESTAMP
        proctime: true
  1. 運行
./bin/sql-client.sh embedded
 select * from myTable;

flink  sql-clent  MATCH_RECOGNIZE kafka 例子

然后使用 MATCH_RECOGNIZE 的sql

SELECT  *   FROM myTable    MATCH_RECOGNIZE 
(   PARTITION BY sessionId   ORDER BY rowTime   MEASURES  
 e2.procTime as answerTime,  LAST(e1.procTime) as customer_event_time,  
 e2.fromUid as empUid,    
 e1.procTime as askTime,                       
 1 as total_talk           
 ONE ROW PER MATCH   AFTER MATCH SKIP TO LAST e2   
 PATTERN (e1 e2)   DEFINE   e1 as e1.type = 'yonghu',    
 e2 as e2.type = 'guanjia'   );

上面是使用sql-client 不用謝代碼,當然也可以寫代碼,下面是對應的程序

public static void main(String[] arg) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment  tableEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tableEnv.connect(new Kafka()
                    .version("0.11")
                    .topic("im-message-topic3")
                    //.property("zookeeper.connect","")
                    .property("bootstrap.servers","kafkaip:9092")
                    .startFromEarliest()
                    .sinkPartitionerRoundRobin()//Flink分區隨機映射到kafka分區

            ).withFormat(new Json()
                    .failOnMissingField(false)
                    .deriveSchema()

            ).withSchema(new Schema()
                    .field("sessionId", Types.STRING).from("sessionId")
                    .field("fromUid", Types.STRING).from("fromUid")
                    .field("toUid", Types.STRING).from("toUid")
                    .field("chatType", Types.STRING).from("chatType")
                    .field("type", Types.STRING).from("type")
                    .field("msgId", Types.STRING).from("msgId")
                    .field("msg", Types.STRING).from("msg")
//                  .field("timestampSend", Types.SQL_TIMESTAMP)
                    .field("rowtime", Types.SQL_TIMESTAMP)
                    .rowtime(new Rowtime()
                            .timestampsFromField("timestampSend")
                            .watermarksPeriodicBounded(1000)
                    )
                    .field("proctime", Types.SQL_TIMESTAMP).proctime()
            ).inAppendMode().registerTableSource("myTable");

        Table tb2 = tableEnv.sqlQuery(
                "SELECT " +
                        "answerTime, customer_event_time, empUid, noreply_counts, total_talk " +
                        "FROM myTable" +
                        " " +
                        "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sessionId " +
                        "ORDER BY rowtime " +
                        "MEASURES " +
                        "e2.rowtime as answerTime, "+
                        "LAST(e1.rowtime) as customer_event_time, " +
                        "e2.fromUid as empUid, " +
                        "1 as noreply_counts, " +
                        "e1.rowtime as askTime," +                      
                        "1 as total_talk " +          
                        "ONE ROW PER MATCH " +
                        "AFTER MATCH SKIP TO LAST e2 " +
                        "PATTERN (e1 e2) " +
                        "DEFINE " +
                        "e1 as e1.type = 'yonghu', " +
                        "e2 as e2.type = 'guanjia' " +
                        ")"+
                        ""
                );

           DataStream<Row> appendStream =tableEnv.toAppendStream(tb2, Row.class);

            System.out.println("schema is:");
            tb2.printSchema();

        appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE);

        logger.info("stream end");  

        Table tb3 = tableEnv.sqlQuery("select  sessionId, type  from myTable");
        DataStream<Row> temp =tableEnv.toAppendStream(tb3, Row.class);
        tb3.printSchema();
        temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE);

        env.execute("msg test");    

    }

大功告成,其實里面坑很多。

注意:如果使用了 TimeCharacteristic.EventTime, 請不用再使用procTime。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

怀柔区| 新平| 上林县| 武乡县| 西林县| 长治县| 介休市| 芮城县| 湟源县| 沐川县| 萝北县| 四川省| 通城县| 南京市| 车致| 岳阳县| 余姚市| 霍城县| 咸阳市| 滨海县| 临夏县| 繁昌县| 兴义市| 安远县| 合水县| 灵石县| 新建县| 正安县| 临西县| 禹州市| 驻马店市| 鹤山市| 丁青县| 赤壁市| 肥城市| 马鞍山市| 友谊县| 龙游县| 阿克苏市| 荔波县| 三门县|