91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink1.8如何批量Sink到HBase

發布時間:2021-12-09 10:18:18 來源:億速云 閱讀:775 作者:小新 欄目:大數據

這篇文章主要為大家展示了“Flink1.8如何批量Sink到HBase”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Flink1.8如何批量Sink到HBase”這篇文章吧。

  1. 實現背景:

  2.     消費Kafka數據寫入HBase時,單條處理效率太低。需要批量插入hbase,這里自定義時間窗口countWindowAll 實現100條hbase插入一次Hbase

  3. 前面我就不寫了 直接上核心代碼

/*每10秒一個處理窗口*/DataStream<List<Put>> putList = filterData.countWindowAll(Constants.windowCount).apply(new AllWindowFunction<String, List<Put>, GlobalWindow>() {    @Override    public void apply(GlobalWindow window, Iterable<String> message, Collector<List<Put>> out) throws Exception {        List<Put> putList=new ArrayList<Put>();        for (String value : message)        {            String rowKey=value.replace("::","_");            Put put = new Put(Bytes.toBytes(rowKey.toString()));            String[] column=value.split("::");            for (int i = 0; i < column.length; i++) {                put.addColumn(Bytes.toBytes(Constants.columnFamily),Bytes.toBytes(Constants.columnArray[i]),Bytes.toBytes(column[i]));            }            putList.add(put);        }        out.collect(putList);    }    }).setParallelism(4);
putList.addSink(new HBaseSinkFunction()).setParallelism(1);

這里sink需要繼承Flink的RichSinkFunction接口,實現其中的三個比較重要的函數:

1.open()任務開始只調用一次

2.invoke()每接收一條記錄調用一次,多條記錄調用多次

3.close()任務關閉只調用一次

寫HBase自定義Sink為

HBaseSinkFunction extends RichSinkFunction<List<Put>>{@Overridepublic void open(Configuration parameters) throws Exception {    super.open(parameters);    HbaseUtils.connectHbase();    TableName table=TableName.valueOf(Constants.tableNameStr);    Admin admin = HbaseUtils.connection.getAdmin();    if(!admin.tableExists(table)){        HTableDescriptor tableDescriptor = new HTableDescriptor(Constants.tableNameStr);        tableDescriptor.addFamily(new HColumnDescriptor(Constants.columnFamily));        admin.createTable(tableDescriptor);    }}@Overridepublic void invoke(List<Put> putList, Context context) throws Exception {    Table table=HbaseUtils.connection.getTable(TableName.valueOf(Constants.tableNameStr));    table.put(putList);}@Overridepublic void close() throws Exception {    super.close();    HbaseUtils.closeHBaseConnect();}}

以上是“Flink1.8如何批量Sink到HBase”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乐陵市| 巴楚县| 怀化市| 舟曲县| 锡林浩特市| 丹东市| 包头市| 金坛市| 龙陵县| 晋宁县| 宜兰县| 新丰县| 新昌县| 河北区| 曲靖市| 徐州市| 潍坊市| 韶山市| 台前县| 台东市| 如东县| 简阳市| 高邑县| 醴陵市| 巴青县| 莱西市| 绥德县| 吉木萨尔县| 靖远县| 余干县| 威信县| 南开区| 花莲市| 广水市| 昭平县| 苍溪县| 岳西县| 正蓝旗| 平和县| 福海县| 汝城县|