91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Connectors怎樣連接ElasticSearch

發布時間:2021-12-16 17:43:18 來源:億速云 閱讀:146 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關Connectors怎樣連接ElasticSearch,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

通過使用Flink DataStream Connectors 數據流連接器連接到ElasticSearch搜索引擎的文檔數據庫Index,并提供數據流輸入與輸出操作;

示例環境

java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x

示例數據源 (項目碼云下載)

Flink 系例 之 搭建開發環境與數據

示例模塊 (pom.xml)

Flink 系例 之 DataStream Connectors 與 示例模塊

數據流輸入

DataStreamSource.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Map;
/**
 * @Description 從elasticsearch中獲取數據并輸出到DataStream數據流中
 */
public class DataStreamSource {
    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
            private RestClientBuilder builder = null;
            //job開始執行,調用此方法創建數據源連接對象,該方法主要用于打開連接
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http"));
            }
            //執行查詢并對數據進行封裝
            @Override
            public void run(SourceContext<TUser> ctx) throws Exception {
                Gson gson = new Gson();
                RestHighLevelClient client = null;
                //匹配查詢
                SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
                //定義索引庫
                SearchRequest request = new SearchRequest();
                request.types("doc");
                request.indices("flink_demo");
                request.source(sourceBuilder);
                try {
                    client = new RestHighLevelClient(builder);
                    SearchResponse response = client.search(request, new Header[]{});
                    SearchHits hits = response.getHits();
                    System.out.println("查詢結果有" + hits.getTotalHits() + "條");
                    for (SearchHit searchHits : hits ) {
                        Map<String,Object> dataMap = searchHits.getSourceAsMap();
                        TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
                        ctx.collect(user);
                    }
                    //ID查詢
//                    GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
//                    client = new RestHighLevelClient(builder);
//                    GetResponse getResponse = client.get(request, new Header[]{});
//                    Map<String,Object> dataMap = getResponse.getSourceAsMap();
//                    TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
//                    ctx.collect(user);
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }finally {
                    if (client != null){
                        client.close();
                    }
                }
            }
            //Job結束時調用
            @Override
            public void cancel() {
                try {
                    super.close();
                } catch (Exception e) {
                }
                builder = null;
            }
        });
        dataStream.print();
        env.execute("flink es to data job");
    }

}

數據流輸出

DataStreamSink.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Description 將DataStream數據流輸出到elasticsearch中
 */
public class DataStreamSink {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(2);
        //1.設置Elasticsearch連接,創建索引數據
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.110.35", 9200, "http"));
        //創建數據源對象 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
                        Gson gson = new Gson();
                        Map<String,Object> map = gson.fromJson(user, Map.class);
                        indexer.add(Requests.indexRequest()
                                .index("flink_demo")
                                .type("doc")
                                .source(map));
                    }
                }
        );
        // 設置批量寫數據的最大動作量,對批量請求的配置;這指示接收器在每個元素之后發出,否則它們將被緩沖
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前緩沖區的最大數據大小(以MB為單位)
        esSinkBuilder.setBulkFlushMaxSizeMb(500);
        //論緩沖操作的數量或大小如何都要刷新的時間間隔
        esSinkBuilder.setBulkFlushInterval(4000);

        //2.寫入數據到流中
        //封裝數據
        TUser user = new TUser();
        user.setId(9);
        user.setName("wang1");
        user.setAge(23);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
        //3.將數據寫入到Elasticearch中
        input.addSink(esSinkBuilder.build());
        env.execute("flink data to es job");
    }

}

數據展示

Connectors怎樣連接ElasticSearch

以上就是Connectors怎樣連接ElasticSearch,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

屯门区| 金沙县| 当阳市| 安庆市| 雅安市| 西乡县| 西吉县| 阿合奇县| 重庆市| 海安县| 古蔺县| 亚东县| 崇义县| 林芝县| 甘德县| 邳州市| 宁乡县| 千阳县| 时尚| 玉环县| 鹤峰县| 慈溪市| 颍上县| 东至县| 房产| 镇雄县| 呈贡县| 会理县| 莒南县| 搜索| 云阳县| 南京市| 汽车| 新乡县| 隆化县| 曲松县| 扎赉特旗| 曲麻莱县| 雅安市| 会宁县| 姜堰市|