您好,登錄后才能下訂單哦!
前面研究過ES的get api
的整體思路,作為編寫ES插件時的借鑒。當時的重點在與理解整體流程,主要是shardOperation()
的方法內部的調用邏輯,就弱化了shards()
方法。實際上shards()
方法在理解ES的結構層面,作用更大一些。我們還是從get api
入手來理解shards()
。
先回顧一下get api
的使用流程:
添加文檔到ES:
curl -XPUT 'http://localhost:9200/test1/type1/1' -d '{"name":"hello"}'
根據文檔ID讀取數據:
curl -XGET 'http://localhost:9200/test1/type1/1'
使用很簡單。但是如果考慮到分布式,背后的邏輯就不簡單了。 假如ES集群有3個節點,數據所在的索引也有3個分片,每個分片一個副本。即index的設置如下:
{
"test1" : {
"settings" : {
"index" : {
"number_of_replicas" : "1",
"number_of_shards" : "3"
}
}
}
}
那么id為1的doc該分發到那個分片呢? 這個問題需要一篇詳細的博文解答,這里我們先簡單給一個結論:
默認情況下,ES會按照文檔id計算一個hash值, 采用的是Murmur3HashFunction,然后根據這個id跟分片數取模。實現代碼是MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 最后的結果作為文檔所在的分片id,所以ES的分片標號是從0開始的。
不知存,焉知取。
再整理一下取數據的核心流程:
s1: 根據文檔id定位到數據所在分片。由于可以設為多個副本,所以一個分片會映射到多個節點。
s2: 根據分片節點的映射信息,選擇一個節點,去獲取數據。 這里重點關注的是節點的選擇方式,簡而言之,我們需要負載均衡,不然設置副本就沒有意義了。
上面兩步都關聯著一個核心的數據結構ClusterState
, 我們可以使用_cluster/state?pretty
來查看這個數據結構:
# http://localhost:9200/_cluster/state?pretty
{
"cluster_name" : "elasticsearch",
"version" : 4,
"state_uuid" : "b6B739p5SbanNLyKxTMHfQ",
"master_node" : "KnEE25tzRjaXblFJq5jqRA",
"blocks" : { },
"nodes" : {
"KnEE25tzRjaXblFJq5jqRA" : {
"name" : "Mysterio",
"transport_address" : "127.0.0.1:9300",
"attributes" : { }
}
},
"metadata" : {
"cluster_uuid" : "ZIl7g86YRiGv8Dqz4DCoAQ",
"templates" : { },
"indices" : {
"test1" : {
"state" : "open",
"settings" : {
"index" : {
"creation_date" : "1553995485603",
"uuid" : "U7v5t_T7RG6rNU3JlGCCBQ",
"number_of_replicas" : "1",
"number_of_shards" : "1",
"version" : {
"created" : "2040599"
}
}
},
"mappings" : { },
"aliases" : [ ]
}
}
},
"routing_table" : {
"indices" : {
"test1" : {
"shards" : {
"0" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "KnEE25tzRjaXblFJq5jqRA",
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"allocation_id" : {
"id" : "lcSHbfWDRyOKOhXAf3HXLA"
}
}, {
"state" : "UNASSIGNED",
"primary" : false,
"node" : null,
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"unassigned_info" : {
"reason" : "INDEX_CREATED",
"at" : "2019-03-31T01:24:45.845Z"
}
} ]
}
}
}
},
"routing_nodes" : {
"unassigned" : [ {
"state" : "UNASSIGNED",
"primary" : false,
"node" : null,
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"unassigned_info" : {
"reason" : "INDEX_CREATED",
"at" : "2019-03-31T01:24:45.845Z"
}
} ],
"nodes" : {
"KnEE25tzRjaXblFJq5jqRA" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "KnEE25tzRjaXblFJq5jqRA",
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"allocation_id" : {
"id" : "lcSHbfWDRyOKOhXAf3HXLA"
}
} ]
}
}
}
整個結構比較復雜,我們慢慢拆解, 一步步逐個擊破。 拆解的思路還是從使用場景入手。
"metadata" : {
"cluster_uuid" : "ZIl7g86YRiGv8Dqz4DCoAQ",
"templates" : { },
"indices" : {
"test1" : {
"state" : "open",
"settings" : {
"index" : {
"creation_date" : "1553995485603",
"uuid" : "U7v5t_T7RG6rNU3JlGCCBQ",
"number_of_replicas" : "1",
"number_of_shards" : "1",
"version" : {
"created" : "2040599"
}
}
},
"mappings" : { },
"aliases" : [ ]
}
}
}
即metadata中存儲了集群中每個索引的分片和副本數量, 索引的狀態, 索引的mapping, 索引的別名等。這種結構,能提供出來的功能就是根據索引名稱獲取索引元數據
, 代碼如下:
# OperationRouting.generateShardId()
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexMetaData == null) {
throw new IndexNotFoundException(index);
}
final Version createdVersion = indexMetaData.getCreationVersion();
final HashFunction hashFunction = indexMetaData.getRoutingHashFunction();
final boolean useType = indexMetaData.getRoutingUseType();
這里我們關注點就是clusterState.metaData().index(index)
這句代碼,它實現了根據索引名稱獲取索引元數據的功能
。 通過元數據中的分片數結合文檔id,我們就能定位出文檔所在的分片。 這個功能在Delete, Index, Get 三類API中都是必須的。 這里我們也能理解為什么ES的索引分片數量不能修改: 如果修改了,那么hash函數就沒法正確定位數據所在分片。
"routing_table" : {
"indices" : {
"test1" : {
"shards" : {
"0" : [ {
"state" : "STARTED",
"primary" : true,
"node" : "KnEE25tzRjaXblFJq5jqRA",
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"allocation_id" : {
"id" : "lcSHbfWDRyOKOhXAf3HXLA"
}
}, {
"state" : "UNASSIGNED",
"primary" : false,
"node" : null,
"relocating_node" : null,
"shard" : 0,
"index" : "test1",
"version" : 2,
"unassigned_info" : {
"reason" : "INDEX_CREATED",
"at" : "2019-03-31T01:24:45.845Z"
}
} ]
}
}
}
}
routing_table
存儲著每個索引的分片信息,通過這個結構,我們能清晰地了解如下的信息:
1. 索引分片在各個節點的分布
2. 索引分片是否為主分片
假如一個分片有2個副本,且都分配在不同的節點上,那么get api
一共有三個數據節點可供選擇, 選擇哪一個呢?這里暫時不考慮帶preference
參數。
為了使每個節點都能公平被選擇到,達到負載均衡的目的,這里用到了隨機數。參考RotateShuffer
/**
* Basic {@link ShardShuffler} implementation that uses an {@link AtomicInteger} to generate seeds and uses a rotation to permute shards.
*/
public class RotationShardShuffler extends ShardShuffler {
private final AtomicInteger seed;
public RotationShardShuffler(int seed) {
this.seed = new AtomicInteger(seed);
}
@Override
public int nextSeed() {
return seed.getAndIncrement();
}
@Override
public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
return CollectionUtils.rotate(shards, seed);
}
}
也就是說使用ThreadLocalRandom.current().nextInt()
生成隨機數作為種子, 然后取的時候依次旋轉。 Collections.rotate()
的效果可以用如下的代碼演示:
public static void main(String[] args) {
List<String> list = Lists.newArrayList("a","b","c");
int a = ThreadLocalRandom.current().nextInt();
List<String> l2 = CollectionUtils.rotate(list, a );
List<String> l3 = CollectionUtils.rotate(list, a+1);
System.out.println(l2);
System.out.println(l3);
}
-----
[b, c, a]
[c, a, b]
比如請求A得到的節點列表是[b,c,a], 那么請求B得到的節點列表是[c,a,b]。這樣就達到了負載均衡的目的。
routing_table
中存儲的是節點的id, 那么將請求發送到目標節點時,還需要知道節點的ip及端口等配置信息。 這些信息存儲在nodes
中。 "nodes" : {
"KnEE25tzRjaXblFJq5jqRA" : {
"name" : "Mysterio",
"transport_address" : "127.0.0.1:9300",
"attributes" : { }
}
}
通過這個nodes
獲取到節點信息后,就可以發送請求了,ES所有內部節點的通信都是基于transportService.sendRequest()
。
總結一下,本文基于get api
梳理了一下ES的ClusterState中的幾個核心結構: metadata
,nodes
, routing_table
。 還有一個routing_nodes
這里沒有用到。后面梳理清楚使用場景后再記錄。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。