您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Connectors怎樣連接ElasticSearch,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
通過使用Flink DataStream Connectors 數據流連接器連接到ElasticSearch搜索引擎的文檔數據庫Index,并提供數據流輸入與輸出操作;
示例環境
java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x
示例數據源 (項目碼云下載)
Flink 系例 之 搭建開發環境與數據
示例模塊 (pom.xml)
Flink 系例 之 DataStream Connectors 與 示例模塊
數據流輸入
DataStreamSource.java
package com.flink.examples.elasticsearch; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.http.Header; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Map; /** * @Description 從elasticsearch中獲取數據并輸出到DataStream數據流中 */ public class DataStreamSource { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){ private RestClientBuilder builder = null; //job開始執行,調用此方法創建數據源連接對象,該方法主要用于打開連接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http")); } //執行查詢并對數據進行封裝 @Override public void run(SourceContext<TUser> ctx) throws Exception { Gson gson = new Gson(); RestHighLevelClient client = null; //匹配查詢 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("sex", 1)); //定義索引庫 SearchRequest request = new SearchRequest(); request.types("doc"); request.indices("flink_demo"); request.source(sourceBuilder); try { client = new RestHighLevelClient(builder); SearchResponse response = client.search(request, new Header[]{}); SearchHits hits = response.getHits(); System.out.println("查詢結果有" + hits.getTotalHits() + "條"); for (SearchHit searchHits : hits ) { Map<String,Object> dataMap = searchHits.getSourceAsMap(); TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class); ctx.collect(user); } //ID查詢 // GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB"); // client = new RestHighLevelClient(builder); // GetResponse getResponse = client.get(request, new Header[]{}); // Map<String,Object> dataMap = getResponse.getSourceAsMap(); // TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class); // ctx.collect(user); }catch(IOException ioe){ ioe.printStackTrace(); }finally { if (client != null){ client.close(); } } } //Job結束時調用 @Override public void cancel() { try { super.close(); } catch (Exception e) { } builder = null; } }); dataStream.print(); env.execute("flink es to data job"); } }
數據流輸出
DataStreamSink.java
package com.flink.examples.elasticsearch; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @Description 將DataStream數據流輸出到elasticsearch中 */ public class DataStreamSink { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(2); //1.設置Elasticsearch連接,創建索引數據 List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("192.168.110.35", 9200, "http")); //創建數據源對象 ElasticsearchSink ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String user, RuntimeContext ctx, RequestIndexer indexer) { Gson gson = new Gson(); Map<String,Object> map = gson.fromJson(user, Map.class); indexer.add(Requests.indexRequest() .index("flink_demo") .type("doc") .source(map)); } } ); // 設置批量寫數據的最大動作量,對批量請求的配置;這指示接收器在每個元素之后發出,否則它們將被緩沖 esSinkBuilder.setBulkFlushMaxActions(10); //刷新前緩沖區的最大數據大小(以MB為單位) esSinkBuilder.setBulkFlushMaxSizeMb(500); //論緩沖操作的數量或大小如何都要刷新的時間間隔 esSinkBuilder.setBulkFlushInterval(4000); //2.寫入數據到流中 //封裝數據 TUser user = new TUser(); user.setId(9); user.setName("wang1"); user.setAge(23); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value)); //3.將數據寫入到Elasticearch中 input.addSink(esSinkBuilder.build()); env.execute("flink data to es job"); } }
數據展示
以上就是Connectors怎樣連接ElasticSearch,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。