您好,登錄后才能下訂單哦!
小編給大家分享一下Apache Flink Table API和SQL API的外部數據源是什么,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
“ Apache Flink的Table API與SQL API實現對接外部數據源的方式是Table connector直連外部數據源,Tabel connector將外部的數據源讀取到Flink中進行計算或者把Apache Flink的結果寫入到外部數據源之中。”
Apache Flink的Table API與SQL API在之前已經提到了Table Source與Table Sink的方式對實現數據源的注冊于結果的寫入等。Apache Flink已經基本實現了對于的數據源的注冊于寫入的需求,但是仍然不夠靈活,用戶更加希望的可能是通過配置的方式實現對于不同數據源的選擇,并且實現數據的寫入。所以Apache Flink又提供了Table connector。
Table connector的實現不僅僅能夠更好的API與SQL Client,同時Table connector類似與實現了CREATE TABLE 的方式對數據進行了注冊,能夠實現再不修改代碼的情況下實現把數據寫入不同的位置。Tabel connector方法指定了需要連接Table connector對應的Descriptor,withFormat方式指定了輸出或輸入的文件格式(csv,JSON,Parquet等)。withSchema方法指定了注冊在TableEnvironment中的表結構。
tableEnv.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode() //更新模式
.registerTableSource("MyTable")
通過以上方式注冊數據源后,我們可以直接把數據讀取或者寫入到該表中。也就是說我們可以實現類似于像操作DBMS表的數據一樣操作該表進行數據讀取與寫入。
在當前的很多云場景中,類似于阿里云,華為云等均提供了實時計算的產品,產品使用SQL的方式進行注冊,注冊指定云廠商的其他產品與存儲格式等 withSchema ,指定讀取的數據結構 withSchema 與更新的模式等即可實現數據的讀取或計算結果的寫入。
CREATE TABLE datahub_stream(
name VARCHAR,
age BIGINT,
birthday BIGINT
) WITH (
type='產品',
endPoint='...',
project='...',
topic='...'
...
);
我們在使用時涉及的步驟也大致分為這些,如下代碼。connect方法指定了連接對應的Desciriptor,withFormat指定輸出或輸入的文件格式,例如JSON,CSV,Avro等。withSchema用以指定注冊在TableEnvironment中的表結構。inAppendMode指定了數據的更新模式。最終通過registerTableSource方法將本次外部數據源注冊到TableEnvironment中,用以進行查詢計算。
talbEnv.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.registerTableSource("TableName")
以上是“Apache Flink Table API和SQL API的外部數據源是什么”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。