91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink sql怎么實時計算當天pv寫入mysql

發布時間:2021-09-16 12:41:15 來源:億速云 閱讀:287 作者:chen 欄目:大數據

這篇文章主要講解了“flink sql怎么實時計算當天pv寫入mysql”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“flink sql怎么實時計算當天pv寫入mysql”吧!

首先我們還是使用datagen生成測試數據,隨機生成一些用戶id

     String sourceSql = "CREATE TABLE datagen (\n" +
               " userid int,\n" +
               " proctime as PROCTIME()\n" +
               ") WITH (\n" +
               " 'connector' = 'datagen',\n" +
               " 'rows-per-second'='100',\n" +
               " 'fields.userid.kind'='random',\n" +
               " 'fields.userid.min'='1',\n" +
               " 'fields.userid.max'='100'\n" +
               ")";

定義mysql的sink,這里mysql是作為了一個upsert的sink,所以必須要一個主鍵,在mysql建表的時候我們指定了當天的日期作為主鍵,mysql ddl如下

CREATE TABLE `pv` (
 `day_str` varchar(100) NOT NULL,
 `pv` bigint(10) DEFAULT NULL,
 PRIMARY KEY (`day_str`)
)

Flink中的ddl要和mysql中對的上,也要指定主鍵。

    String mysqlsql = "CREATE TABLE pv (\n" +
               "  day_str STRING,\n" +
               "  pv bigINT,\n" +
               "  PRIMARY KEY (day_str) NOT ENFORCED\n" +
               ") WITH (\n" +
               "   'connector' = 'jdbc',\n" +
               "   'username' = 'root',\n" +
               "   'password' = 'root',\n" +
               "   'url' = 'jdbc:mysql://localhost:3306/test',\n" +
               "   'table-name' = 'pv'\n" +
               ")";

接下來我們寫一個簡單的查詢:

     tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) \n" +
               "FROM datagen \n" +
               "GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");

可能對于以前一直做批處理的同學來說會感到疑惑,對于流式處理來說,group by將會返回一個可撤回流(RetractStream),轉化成datastream,將會得到一個Tuple2<Boolean, T>對象,這個對象第一個字段如果是false表示數據要撤回,true表示數據是我們新添加的,第二個字段是實際的數據。在這里,我們將這個實時更新的結果寫入到了mysql。這樣mysql表,每天就會只有一個數據,系統會不斷地更新pv字段。

flink sql怎么實時計算當天pv寫入mysql

類似的需求我們還可以使用flink的窗口來實現,定義一個窗口周期是一天的窗口,然后自定義一個觸發器,比如每秒鐘觸發一次,然后將結果輸出寫入第三方sink。

感謝各位的閱讀,以上就是“flink sql怎么實時計算當天pv寫入mysql”的內容了,經過本文的學習后,相信大家對flink sql怎么實時計算當天pv寫入mysql這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

普安县| 武鸣县| 上杭县| 长子县| 东明县| 建昌县| 顺平县| 台中市| 永川市| 沂南县| 秦安县| 郧西县| 拜城县| 芜湖市| 永平县| 静安区| 荔浦县| 昌平区| 宜丰县| 潼南县| 娄烦县| 南康市| 莆田市| 泰顺县| 湖口县| 铜鼓县| 锡林浩特市| 嘉义县| 措勤县| 水富县| 孝感市| 长乐市| 青河县| 呈贡县| 汤阴县| 孟村| 金溪县| 介休市| 余庆县| 靖江市| 遵义县|