91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

FlinkSQL中窗口的功能及實例用法

發布時間:2021-09-14 15:38:00 來源:億速云 閱讀:214 作者:chen 欄目:大數據

這篇文章主要介紹“FlinkSQL中窗口的功能及實例用法”,在日常操作中,相信很多人在FlinkSQL中窗口的功能及實例用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”FlinkSQL中窗口的功能及實例用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

前言

時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL  中,怎么利用時間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over  Windows

一、分組窗口(Group Windows) 分組窗口(Group  Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group)中,并對每個組的數據執行一次聚合函數。 Table API 中的 Group  Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在  group by 子句中,像常規的分組字段一樣引用。例子:

val table = input .window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)

Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層DataStream 或 DataSet 的窗口操作。

Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding和 會話(Session)。

1.1 滾動窗口

滾動窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:

  • over:定義窗口長度

  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段

  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求

設置滾動窗口為10秒鐘統計id出現的次數。

2.數據準備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.代碼實現

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.types.Row  /**  * @Package Windows  * @File :FlinkSQLTumBlingTie.java  * @author 大數據老哥  * @date 2020/12/25 21:58  * @version V1.0  *          設置滾動窗口  */ object FlinkSQLTumBlingTie {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數據     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)          // 先轉換成樣例類類型(簡單轉換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)     // table 實現     val resultTable = sensorTable       .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統計一次,滾動時間窗口       .groupBy('id, 'tw)       .select('id, 'id.count, 'tw.end)     //sql 實現     val sqlTable = tableEnv.sqlQuery(       """         |select         |id,         |count(id) ,         |tumble_end(ts,interval '10' second)         |from sensor         |group by         |id,         |tumble(ts,interval '10' second)         |""".stripMargin)      /***      * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時間字段 rowtime)      * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時間字段 proctime)      * .window(Tumble over 10.minutes on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組)      */     resultTable.toAppendStream[Row].print("talbe")     sqlTable.toRetractStream[Row].print("sqlTable")          env.execute("FlinkSQLTumBlingTie")   }    case class SensorReading(id: String, timestamp: Long, temperature: Double)  }

運行結果

FlinkSQL中窗口的功能及實例用法

1.2 滑動窗口

滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:

  • over:定義窗口長度

  • every:定義滑動步長

  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段

  • as:別名,必須出現在后面的 groupBy 中

實現案例

1.需求描述

設置窗口大小為10秒鐘設置滑動距離為5秒鐘,統計id的出現的次數。

2.數據準備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.實現代碼

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading  /**  * @Package windows  * @File :FlinkSQLSlideTime.java  * @author 大數據老哥  * @date 2020/12/27 22:19  * @version V1.0  *          滑動窗口  */ object FlinkSQLSlideTime {   def main(args: Array[String]): Unit = {     //構建運行環境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 設置分區為1 方便后面測試     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     // 創建表env     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數據     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)      // 先轉換成樣例類類型(簡單轉換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)     // table API 實現     val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w)       .groupBy('w, 'id)       .select('id, 'id.count, 'w.end)     val tableSql = tableEnv.sqlQuery(       """         |select         |id,         |count(id),         |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w         |from sensor         |group by         |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id         |""".stripMargin)      tableApi.toAppendStream[Row].print("tableApi")     tableSql.toAppendStream[Row].print("tableSql")     /** .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時間字段 rowtime) .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時間字段 proctime)  .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類似于計數窗口,按處理時間排序,10 行一組)    **/     env.execute("FlinkSQLSlideTime")   } }

4.運行結果

FlinkSQL中窗口的功能及實例用法

1.3 會話窗口

會話窗口(Session windows)要用 Session 類來定義,另外還有三個方法:

  • withGap:會話時間間隔

  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段

  • as:別名,必須出現在后面的 groupBy 中實現案例

1.需求描述

設置一個session 為10秒鐘 統計id的個數

2.準備數據

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.編寫代碼

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Session, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading  /**  * @Package windows  * @File :FlinkSqlSessionTime.java  * @author 大數據老哥  * @date 2020/12/27 22:52  * @version V1.0  */ object FlinkSqlSessionTime {   def main(args: Array[String]): Unit = {     //構建運行環境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 設置分區為1 方便后面測試     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時間      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     // 創建表env     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數據     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)      // 先轉換成樣例類類型(簡單轉換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)      val tableApi = sensorTable.       window(Session withGap 10.seconds on 'ts as 'w)       .groupBy('id, 'w)       .select('id, 'id.count, 'w.end)     val tableSQL = tableEnv.sqlQuery(       """         |SELECT         |id,         |COUNT(id),         |SESSION_END(ts, INTERVAL '10' SECOND) AS w         |FROM sensor         |GROUP BY         |id,         |SESSION(ts, INTERVAL '10' SECOND)         |""".stripMargin)     tableApi.toAppendStream[Row].print("tableApi")     tableSQL.toAppendStream[Row].print("tableSQL")      /**      * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時間字段 rowtime)      * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時間字段 proctime)      */     env.execute("FlinkSqlSessionTime")   } }

4.運行結果

FlinkSQL中窗口的功能及實例用法

二、 Over Windows

Over window 聚合是標準 SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over  window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows使用.window(w:overwindows*)子句定義,并在  select()方法中通過別名來引用。例子:

val table = input .window([w: OverWindow] as 'w) .select('a, 'b.sum over 'w, 'c.min over 'w)

Table API 提供了 Over 類,來配置 Over 窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義 Over  windows。

無界的 over window 是使用常量指定的。也就是說,時間間隔要指定 UNBOUNDED_RANGE,或者行計數間隔要指定  UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。

2.1 無界的 over window

// 無界的事件時間 over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //無界的處理時間 over window (時間字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 無界的事件時間 Row-count over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //無界的處理時間 Row-count over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

2.2 有界的 over window

// 有界的事件時間 over window (時間字段 "rowtime",之前 1 分鐘) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的處理時間 over window (時間字段 "rowtime",之前 1 分鐘) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件時間 Row-count over window (時間字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的處理時間 Row-count over window (時間字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

2.3 代碼練習

我們可以綜合學習過的內容,用一段完整的代碼實現一個具體的需求。例如,統計每個sensor每條數據,與之前兩行數據的平均溫度。

數據準備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

代碼分析:

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row  /** * @Package windows * @File :FlinkSqlTumBlingOverTime.java * @author 大數據老哥 * @date 2020/12/28 21:45 * @version V1.0 */ object FlinkSqlTumBlingOverTime {  def main(args: Array[String]): Unit = {    // 構建運行環境    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1) // 設置并行度為1方便后面進行測試    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設置事件時間     val settings = EnvironmentSettings.newInstance()      .useBlinkPlanner()      .inStreamingMode()      .build()    //構建table Env    val tableEnv = StreamTableEnvironment.create(env, settings)     // 讀取數據    val inputPath = "./data/sensor.txt"    val inputStream = env.readTextFile(inputPath)    // 先轉換成樣例類類型(簡單轉換操作)    // 解析數據 封裝成樣例類    val dataStream = inputStream      .map(data => {        val arr = data.split(",")        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)      })      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L      })    // 將數據注冊成一張臨時表    val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts)    tableEnv.createTemporaryView("sensor",dataTable)    var tableRes= dataTable.window( Over partitionBy 'id orderBy  'ts preceding 2.rows as 'ow)     .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow)    var tableSql= tableEnv.sqlQuery(      """        |select        |id,        |ts,        |count(id) over ow,        |avg(temperature) over ow        |from sensor        |window ow as(        | partition by id        | order by ts        | rows between 2 preceding and current row        |)        |""".stripMargin)     tableRes.toAppendStream[Row].print("tableRes")    tableSql.toAppendStream[Row].print("tableSql")    env.execute("FlinkSqlTumBlingOverTime")  }  case class SensorReading(id: String, timestamp: Long, temperature: Double)  }

FlinkSQL中窗口的功能及實例用法

運行結果

到此,關于“FlinkSQL中窗口的功能及實例用法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巩义市| 梁平县| 许昌县| 科技| 天津市| 迭部县| 桃源县| 全椒县| 呼图壁县| 佛冈县| 台南市| 阳春市| 阿勒泰市| 成武县| 惠水县| 板桥市| 乌兰察布市| 漳浦县| 光山县| 崇阳县| 边坝县| 南部县| 荣昌县| 赫章县| 巴中市| 广饶县| 夏津县| 论坛| 泾源县| 靖西县| 芜湖市| 门头沟区| 长寿区| 义乌市| 金平| 都匀市| 白城市| 石林| 泰和县| 德阳市| 顺义区|