您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“elasticsearch索引創建create index集群matedata更新的方法”,內容詳細,步驟清晰,細節處理妥當,希望這篇“elasticsearch索引創建create index集群matedata更新的方法”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
創建索引需要創建索引并且更新集群index matedata,這一過程在MetaDataCreateIndexService的createIndex方法中完成。這里會提交一個高優先級,AckedClusterStateUpdateTask類型的task。索引創建需要即時得到反饋,異常這個task需要返回,會超時,而且這個任務的優先級也非常高。
下面具體看一下它的execute方法,這個方法會在master執行任務時調用,這個方法非常長,主要完成以下三個功能:更新合并request,template中的mapping和setting,調用indiceService創建索引,對創建后的索引添加mapping。這一系列功能完成后,合并完成后生成新的matedata,并更新集群狀態,完成了索引的創建。
代碼如下所示:
@Override public ClusterState execute(ClusterState currentState) throws Exception { boolean indexCreated = false; String removalReason = null; try { //檢查request的合法性,1.5版本主要檢查index名字是否合法,如不能含有某些字符,另外就是集群節點版本 validate(request, currentState); for (Alias alias : request.aliases()) {//檢查別稱是否合法 aliasValidator.validateAlias(alias, request.index(), currentState.metaData()); } // 查找索引模板 List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter); Map<String, Custom> customs = Maps.newHashMap(); // add the request mapping Map<String, Map<String, Object>> mappings = Maps.newHashMap(); Map<String, AliasMetaData> templatesAliases = Maps.newHashMap(); List<String> templateNames = Lists.newArrayList(); //取出request中的mapping配置,雖然mapping可以后面添加,多數情況創建索引的時候還是會附帶著mapping,在request中mapping是一個map for (Map.Entry<String, String> entry : request.mappings().entrySet()) { mappings.put(entry.getKey(), parseMapping(entry.getValue())); } //一些預設如warm等 for (Map.Entry<String, Custom> entry : request.customs().entrySet()) { customs.put(entry.getKey(), entry.getValue()); } // 將找到的template和request中的mapping合并 for (IndexTemplateMetaData template : templates) { templateNames.add(template.getName()); for (ObjectObjectCursor<String, CompressedString> cursor : template.mappings()) { if (mappings.containsKey(cursor.key)) { XContentHelper.mergeDefaults(mappings.get(cursor.key), parseMapping(cursor.value.string())); } else { mappings.put(cursor.key, parseMapping(cursor.value.string())); } } // 合并custom for (ObjectObjectCursor<String, Custom> cursor : template.customs()) { String type = cursor.key; IndexMetaData.Custom custom = cursor.value; IndexMetaData.Custom existing = customs.get(type); if (existing == null) { customs.put(type, custom); } else { IndexMetaData.Custom merged = IndexMetaData.lookupFactorySafe(type).merge(existing, custom); customs.put(type, merged); } } //處理合并別名 for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) { AliasMetaData aliasMetaData = cursor.value; //if an alias with same name came with the create index request itself, // ignore this one taken from the index template if (request.aliases().contains(new Alias(aliasMetaData.alias()))) { continue; } //if an alias with same name was already processed, ignore this one if (templatesAliases.containsKey(cursor.key)) { continue; } //Allow templatesAliases to be templated by replacing a token with the name of the index that we are applying it to if (aliasMetaData.alias().contains("{index}")) { String templatedAlias = aliasMetaData.alias().replace("{index}", request.index()); aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias); } aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData()); templatesAliases.put(aliasMetaData.alias(), aliasMetaData); } } // 合并完template和request,現在開始處理配置基本的mapping,合并邏輯跟之前相同,只是mapping來源不同 File mappingsDir = new File(environment.configFile(), "mappings"); if (mappingsDir.isDirectory()) { // first index level File indexMappingsDir = new File(mappingsDir, request.index()); if (indexMappingsDir.isDirectory()) { addMappings(mappings, indexMappingsDir); } // second is the _default mapping File defaultMappingsDir = new File(mappingsDir, "_default"); if (defaultMappingsDir.isDirectory()) { addMappings(mappings, defaultMappingsDir); } } //處理index的配置(setting) ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder(); //加入模板中的setting for (int i = templates.size() - 1; i >= 0; i--) { indexSettingsBuilder.put(templates.get(i).settings()); } // 加入request中的mapping,request中設置會覆蓋模板中的設置 indexSettingsBuilder.put(request.settings()); //處理shard,shard數量不能小于1,因此這里需要特殊處理,如果沒有則要使用默認值 if (request.index().equals(ScriptService.SCRIPT_INDEX)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); } } } if (request.index().equals(ScriptService.SCRIPT_INDEX)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0)); indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all"); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } } } //處理副本 if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS)); } if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) { DiscoveryNodes nodes = currentState.nodes(); final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion()); indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion); } if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) { indexSettingsBuilder.put(SETTING_CREATION_DATE, System.currentTimeMillis()); } indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID()); //創建setting Settings actualIndexSettings = indexSettingsBuilder.build(); // 通過indiceservice創建索引 indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id()); indexCreated = true; //如果創建成功這里就可以獲取到對應的indexservice,否則會拋出異常 IndexService indexService = indicesService.indexServiceSafe(request.index()); //獲取mappingService試圖放置mapping MapperService mapperService = indexService.mapperService(); // 為索引添加mapping,首先是默認mapping if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) { try { mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()), false); } catch (Exception e) { removalReason = "failed on parsing default mapping on index creation"; throw new MapperParsingException("mapping [" + MapperService.DEFAULT_MAPPING + "]", e); } } for (Map.Entry<String, Map<String, Object>> entry : mappings.entrySet()) { if (entry.getKey().equals(MapperService.DEFAULT_MAPPING)) { continue; } try { // apply the default here, its the first time we parse it mapperService.merge(entry.getKey(), new CompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()), true); } catch (Exception e) { removalReason = "failed on parsing mappings on index creation"; throw new MapperParsingException("mapping [" + entry.getKey() + "]", e); } } //添加request中的別稱 IndexQueryParserService indexQueryParserService = indexService.queryParserService(); for (Alias alias : request.aliases()) { if (Strings.hasLength(alias.filter())) { aliasValidator.validateAliasFilter(alias.name(), alias.filter(), indexQueryParserService); } } for (AliasMetaData aliasMetaData : templatesAliases.values()) { if (aliasMetaData.filter() != null) { aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), indexQueryParserService); } } // 以下更新Index的matedata, Map<String, MappingMetaData> mappingsMetaData = Maps.newHashMap(); for (DocumentMapper mapper : mapperService.docMappers(true)) { MappingMetaData mappingMd = new MappingMetaData(mapper); mappingsMetaData.put(mapper.type(), mappingMd); } final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings); for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } for (AliasMetaData aliasMetaData : templatesAliases.values()) { indexMetaDataBuilder.putAlias(aliasMetaData); } for (Alias alias : request.aliases()) { AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter()) .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build(); indexMetaDataBuilder.putAlias(aliasMetaData); } for (Map.Entry<String, Custom> customEntry : customs.entrySet()) { indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue()); } indexMetaDataBuilder.state(request.state()); //matedata更新完畢,build新的matedata final IndexMetaData indexMetaData; try { indexMetaData = indexMetaDataBuilder.build(); } catch (Exception e) { removalReason = "failed to build index metadata"; throw e; } indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()), indexMetaData.settings()); //更新集群的matedata,將新build的indexmatadata加入到metadata中 MetaData newMetaData = MetaData.builder(currentState.metaData()) .put(indexMetaData, false) .build(); logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); //阻塞集群,更新matadata ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!request.blocks().isEmpty()) { for (ClusterBlock block : request.blocks()) { blocks.addIndexBlock(request.index(), block); } } if (request.state() == State.CLOSE) { blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); } ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build(); if (request.state() == State.OPEN) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) .addAsNew(updatedState.metaData().index(request.index())); RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build()); updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build(); } removalReason = "cleaning up after validating index on master"; return updatedState; } finally { if (indexCreated) { // Index was already partially created - need to clean up indicesService.removeIndex(request.index(), removalReason != null ? removalReason : "failed to create index"); } } } }); }
以上就是創建index的create方法,方法中主要進行了兩個動作:合并更新index的matadata和創建index。更新合并matadata的過程都在上面的代碼中體現了。
創建索引是調用indiceSerivice構建一個guice的injector,這個injector包含了Index的所有功能(如分詞,相似度等)。同時會將其存儲到indiceService中,以一個map的格式存儲Map<String, Tuple<IndexService, Injector>> indices。運行中的集群每次對某個索引的操作都首先從indice中獲取對應的IndexService。
這一部分代碼如下所示:
public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException { if (!lifecycle.started()) { throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed"); } Index index = new Index(sIndexName); //檢測index是否已經存在 if (indices.containsKey(index.name())) { throw new IndexAlreadyExistsException(index); } indicesLifecycle.beforeIndexCreated(index, settings); logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS)); Settings indexSettings = settingsBuilder() .put(this.settings) .put(settings) .classLoader(settings.getClassLoader()) .build(); //構建index對應的injector ModulesBuilder modules = new ModulesBuilder(); modules.add(new IndexNameModule(index)); modules.add(new LocalNodeIdModule(localNodeId)); modules.add(new IndexSettingsModule(index, indexSettings)); modules.add(new IndexPluginsModule(indexSettings, pluginsService)); modules.add(new IndexStoreModule(indexSettings)); modules.add(new IndexEngineModule(indexSettings)); modules.add(new AnalysisModule(indexSettings, indicesAnalysisService)); modules.add(new SimilarityModule(indexSettings)); modules.add(new IndexCacheModule(indexSettings)); modules.add(new IndexFieldDataModule(indexSettings)); modules.add(new CodecModule(indexSettings)); modules.add(new MapperServiceModule()); modules.add(new IndexQueryParserModule(indexSettings)); modules.add(new IndexAliasesServiceModule()); modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class))); modules.add(new IndexModule(indexSettings)); Injector indexInjector; try { indexInjector = modules.createChildInjector(injector); } catch (CreationException e) { throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e)); } catch (Throwable e) { throw new IndexCreationException(index, e); } IndexService indexService = indexInjector.getInstance(IndexService.class); indicesLifecycle.afterIndexCreated(indexService); //將Indexservice和IndexInjector加入到indice map中 indices = newMapBuilder(indices).put(index.name(), new Tuple<>(indexService, indexInjector)).immutableMap(); return indexService; }
以上方法就是具體創建索引的過程,它是在master上操作的,同時它是同步方法。這樣才能保證集群的Index創建一致性,因此這也會導致之前所說的大量創建創建索引時候的速度瓶頸。但是創建大量索引的動作是不常見的,需要盡量避免。創建一個索引對于一個集群來說就是開啟對于該索引的各種操作,因此這里通過guice將索引的各個功能模塊注入,并獲得index操作的接口類Indexservice。如果這個方法執行成功,則可以合并template及request中的mapping,并且向剛創建的索引添加合并后的mapping,最后構建新的matadata,并將集群新的matadata發送給各個節點完成索引創建。
讀到這里,這篇“elasticsearch索引創建create index集群matedata更新的方法”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。