您好,登錄后才能下訂單哦!
這篇文章主要講解了“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”吧!
Reindex會將一個索引的數據復制到另一個已存在的索引,但是并不會復制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
簡單實例如下
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", // 遠程es的ip和port列表 "socket_timeout": "1m", "connect_timeout": "10s" // 超時時間設置 }, "index": "my_index_name", // 源索引名稱 "query": { // 滿足條件的數據 "match": { "test": "data" } } }, "dest": { "index": "dest_index_name" // 目標索引名稱 } }
具體詳細的使用參考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基礎 —— ReIndex
在java中對于reindexapi沒有找到,于是作者采用了別名轉換和全Index查詢加上bulk插入的方式對于索引進行遷移。
但是轉移數據實在太慢,所以使用了slice對scorll查詢進行優化
多線程reindex
具體開啟線程數根據Index分片數進行調整,最好和主分片數相同,本例子為五個分片,同時還使用了別名轉換對索引進行無縫銜接避免數據正常插入讀取
//建新索引 createUserRecordIndex(newIndexName, typeName); //篩選時間 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder rangeQueryBuilder; rangeQueryBuilder = QueryBuilders.rangeQuery("createTime") .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)) .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)); boolQueryBuilder.must(rangeQueryBuilder); try { //多線程處理查詢請求 List<Future> list = new ArrayList<>(); for (int i = 0; i < 5; i++) { SliceBuilder sliceBuilder = new SliceBuilder(i, 5); SearchResponse response = EsBuildersServiceUtil.getESClient() .prepareSearch(userRecordAlias) .setTypes(userRecordType) .setQuery(boolQueryBuilder) .setSize(1000).setScroll(new TimeValue(10000)) .slice(sliceBuilder) .execute() .actionGet(); SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response); Future submit = threadPoolTaskExecutor.submit(sliceQuery); list.add(submit); } for (Future future : list) { future.get(); } } catch (Exception e) { log.error("reindex error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR); } try { //別名轉換 EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet(); EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet(); } catch (Exception e) { log.error(" convertAlias error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR); }
slice線程
class SliceQuery implements Callable { private String newIndexName; private String typeName; private SearchResponse response; private SliceQuery(String newIndexName, String typeName, SearchResponse response) { this.newIndexName = newIndexName; this.typeName = typeName; this.response = response; } @Override public Void call() { //獲取總數量 long totalCount = response.getHits().getTotalHits(); //計算總次數,每次搜索數量為分片數*設置的size大小 int page = (int) totalCount / 1000; operateRecordList(response, newIndexName, typeName); for (int i = 0; i < page; i++) { //再次發送請求,并使用上次搜索結果的ScrollId response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(10000)).execute() .actionGet(); operateRecordList(response, newIndexName, typeName); } return null; } }
批量插入
/** * 從查詢數據中獲取并批量插入Index * * @param response * @param indexName * @param typeName */ private void operateRecordList(SearchResponse response, String indexName, String typeName) { try { SearchHits hits = response.getHits(); List<AddUserRecordRequest> list = new ArrayList<>(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class)); } //批量插入 saveBulkRecord(list, indexName, typeName); } catch (Exception e) { log.error("operateRecordList error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } /** * 批量插入 * * @param list * @param indexName * @param typeName */ private void saveBulkRecord(List<AddUserRecordRequest> list, String indexName, String typeName) { try { BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk(); for (AddUserRecordRequest recordRequest : list) { JSONObject json = JSONObject.fromObject(recordRequest); bulkRequest.add(EsBuildersServiceUtil.getESClient() .prepareIndex(indexName, typeName) .setSource(json)); } if (list.size() > 0) { bulkRequest.execute().actionGet(); } } catch (Exception e) { log.error("saveBulkRecord error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } }
感謝各位的閱讀,以上就是“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”的內容了,經過本文的學習后,相信大家對Elasticsearch reindex及Java使用sliceScorll優化查詢的方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。