您好,登錄后才能下訂單哦!
本篇內容主要講解“Elasticsearch跨集群數據遷移怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Elasticsearch跨集群數據遷移怎么實現”吧!
方案 | elasticsearch-dump | reindex | snapshot | logstash |
---|---|---|---|---|
基本原理 | 邏輯備份,類似mysqldump將數據一條一條導出后再執行導入 | reindex 是 Elasticsearch 提供的一個 API 接口,可以把數據從一個集群遷移到另外一個集群 | 從源集群通過Snapshot API 創建數據快照,然后在目標集群中進行恢復 | 從一個集群中讀取數據然后寫入到另一個集群 |
網絡要求 | 集群間互導需要網絡互通,先導出文件再通過文件導入集群則不需要網絡互通 | 網絡需要互通 | 無網絡互通要求 | 網絡需要互通 |
遷移速度 | 慢 | 快 | 快 | 一般 |
適合場景 | 適用于數據量小的場景 | 適用于數據量大,在線遷移數據的場景 | 適用于數據量大,接受離線數據遷移的場景 | 適用于數據量一般,近實時數據傳輸 |
配置復雜度 | 中等 | 簡單 | 復雜 | 中等 |
創建 mapping:
PUT dumpindex { "mappings": { "properties": { "name": { "type": "text" }, "age": { "type": "integer" } } } }
插入數據:
POST _bulk {"index":{"_index":"dumpindex"}} {"name":"tom","age":18} {"index":{"_index":"dumpindex"}} {"name":"jack","age":19} {"index":{"_index":"dumpindex"}} {"name":"bob","age":20}
elasticsearch-dump是一款開源的ES數據遷移工具, github地址: https://github.com/taskrabbit/elasticsearch-dump
elasticsearch-dump使用node.js開發,可使用npm包管理工具直接安裝:
npm install elasticdump -g
也可以之間通過啟動制作好的 elasticsearch-dump docker 容器來運行,需要通過 -v
參數掛載宿主機的目錄到容器中
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ 加上命令
例如:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=/tmp/dumpindex.json \ #這里input的文件是容器中的文件路徑,宿主機上對應的就是/root/elasticsearch-dump/dumpindex.json文件 --output=http://192.168.1.67:9200/dumpindex \ --type=data
通過以下命令將 Elasticsearch 中的數據導出到 dumpindex_data.json 文件中。
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output=/tmp/dumpindex_data.json \ --type=data
查看文件內容,包含了索引的數據信息:
[root@elastic1]# cat /root/elasticsearch-dump/dumpindex_data.json {"_index":"dumpindex","_type":"_doc","_id":"q28kPngB8Nd5nYNvOgHd","_score":1,"_source":{"name":"tom","age":18}} {"_index":"dumpindex","_type":"_doc","_id":"rG8kPngB8Nd5nYNvOgHd","_score":1,"_source":{"name":"jack","age":19}} {"_index":"dumpindex","_type":"_doc","_id":"rW8kPngB8Nd5nYNvOgHd","_score":1,"_source":{"name":"bob","age":20}}
另外還需要導出索引的 mapping,如果直接將前面的數據到新的 Elasticsearch 集群,新集群會根據數據自動生成 mapping,有可能和源集群的 mapping 不一致:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output=/tmp/dumpindex_mapping.json \ --type=mapping
查看導出的 mapping 文件內容:
[root@elastic1 ~]# cat /root/elasticsearch-dump/dumpindex_mapping.json {"dumpindex":{"mappings":{"properties":{"age":{"type":"integer"},"name":{"type":"text"}}}}}
首先導入 mapping 信息:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=/tmp/dumpindex_mapping.json \ --output=http://192.168.1.67:9200/dumpindex \ --type=mapping
然后導入數據:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=/tmp/dumpindex_data.json \ --output=http://192.168.1.67:9200/dumpindex \ --type=data
查看新集群上該索引的mapping信息,和源集群的一致:
GET dumpindex/_mapping #輸出結果 { "dumpindex" : { "mappings" : { "properties" : { "age" : { "type" : "integer" }, "name" : { "type" : "text" } } } } }
查看新集群的數據,也和源集群的一致:
GET dumpindex/_search #輸出結果 { "took" : 3, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 3, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dumpindex", "_type" : "_doc", "_id" : "rW8kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "bob", "age" : 20 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "rG8kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "jack", "age" : 19 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "q28kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "tom", "age" : 18 } } ] } }
打開 Kibana 界面,創建 Index Pattern,然后在 Discover 中就可以看到該索引。
然后創建一個 Save Search 任務:
創建完任務后,選擇生成 CSV 文件:
可以在 Reports 中下載生成的 CSV 文件:
查看導出的 CSV 文件:
? cat dumpindex.csv "_id","_index","_score","_type",age,name q28kPngB8Nd5nYNvOgHd,dumpindex,0,"_doc",18,tom rG8kPngB8Nd5nYNvOgHd,dumpindex,0,"_doc",19,jack rW8kPngB8Nd5nYNvOgHd,dumpindex,0,"_doc",20,bob
通過 elasticsearch-dump 命令導出成 CSV 文件:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output="csv:///tmp/dumpindex.csv"
查看導出的 CSV 文件:
[root@elastic1 ~]# cat /root/elasticsearch-dump/dumpindex.csv name,age,@id,@index,@type tom,18,q28kPngB8Nd5nYNvOgHd,dumpindex,_doc jack,19,rG8kPngB8Nd5nYNvOgHd,dumpindex,_doc bob,20,rW8kPngB8Nd5nYNvOgHd,dumpindex,_doc
這里需要注意的是,通過 elasticsearch-dump 命令導出的 CSV 文件可以直接用該命令導入 Elasticsearch。但是通過 Kibana 導出的 CSV 文件需要先將第一行(表頭)的 "_id","_index","_score","_type" 修改成自定義的其他字段(elasticsearch-dump 是改成了@開頭)才可以進行導入(因為這些字段是 Elasticsearch 內置的字段)。
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ --input "csv:///tmp/dumpindex.csv" \ --output=http://192.168.1.67:9200/dumpindex \ --csvSkipRows 1 #第一行(表頭)不作為數據導入
查看導入后的數據,可以看到之前的 _id 等字段,其實變成了 @id,索引真正的 _id 是改變了的。因此不推薦使用通過 CSV 的方式導入導出數據。
GET dumpindex/_search #輸出結果 { "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dumpindex", "_type" : "_doc", "_id" : "W9OmPngBcQzbxUdL_4fB", "_score" : 1.0, "_source" : { "name" : "bob", "age" : "20", "@id" : "rW8kPngB8Nd5nYNvOgHd", "@index" : "dumpindex", "@type" : "_doc" } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "XNOmPngBcQzbxUdL_4fB", "_score" : 1.0, "_source" : { "name" : "jack", "age" : "19", "@id" : "rG8kPngB8Nd5nYNvOgHd", "@index" : "dumpindex", "@type" : "_doc" } } ] } }
前面將 Elasticsearch 集群中的數據導出文件,然后再通過文件將數據導入新的 Elasticsearch 集群的做法適合兩個集群間網絡不通的情況。如果兩個集群的網絡相通,可以通過下面更簡便的方式直接在兩個集群間互導數據:
先導出mapping到新集群
docker run --rm -ti elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output=http://192.168.1.67:9200/dumpindex \ --type=mapping
然后導出數據到新集群:
docker run --rm -ti elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output=http://192.168.1.67:9200/dumpindex \ --type=data
可以通過查詢語句過濾要遷移的數據:
docker run --rm -ti elasticdump/elasticsearch-dump \ --input=http://192.168.1.171:9200/dumpindex \ --output=http://192.168.1.67:9200/dumpindex \ --searchBody="{\"query\":{\"match\":{\"name\": \"tom\"}}}"
查看新集群的數據:
GET dumpindex/_search #輸出結果 { "took" : 2, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dumpindex", "_type" : "_doc", "_id" : "q28kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "tom", "age" : 18 } } ] } }
multielasticdump 在 elasticdump 的基礎上做了一層封裝,可以同時 fork 出多個子線程(默認情況下是主機的 CPU 數量)并行對多個索引進行操作。
--input
必須是URL,--output
必須是文件名,也就是說只能將數據從 Elasticsearch 導出到文件中。導出的文件默認包含索引的 data,mapping,setting,template。
正則表達式匹配并導出 Elasticsearch 中 dumpindex.*
的索引:
docker run --rm -ti -v /root/elasticsearch-dump:/tmp elasticdump/elasticsearch-dump \ multielasticdump \ --direction=dump \ --match='dumpindex.*' \ #支持通過正則表達式匹配索引 --input=http://192.168.1.171:9200 \ --output=/tmp
查看導出的文件:
[root@elastic1 ~]# ll elasticsearch-dump/total 32 -rw-r--r--. 1 root root 343 Mar 17 13:35 dumpindex2.json -rw-r--r--. 1 root root 149 Mar 17 13:35 dumpindex2.mapping.json -rw-r--r--. 1 root root 187 Mar 17 13:35 dumpindex2.settings.json -rw-r--r--. 1 root root 1581 Mar 17 13:35 dumpindex2.template.json -rw-r--r--. 1 root root 337 Mar 17 13:35 dumpindex.json -rw-r--r--. 1 root root 92 Mar 17 13:35 dumpindex.mapping.json -rw-r--r--. 1 root root 186 Mar 17 13:35 dumpindex.settings.json -rw-r--r--. 1 root root 1581 Mar 17 13:35 dumpindex.template.json
可以通過 elasticdump 將文件數據導入 Elasticsearch 中。
如果 Elasticsearch 需要認證用戶名密碼,可以通過如下方式指定:
--input=http://username:password@192.168.1.171:9200/my_index
首先需要在目標 Elasticsearch 集群中配置白名單,編輯 elasticsearch.yml 文件,然后重新啟動集群:
reindex.remote.whitelist: 192.168.1.171:9200
在目標集群上執行 reindex 命令:
POST _reindex { "source": { "remote": { "host": "http://192.168.1.171:9200" #如果需要用戶認證 #"username": "user", #"password": "pass", }, "index": "kibana_sample_data_flights" # 也支持通過查詢語句過濾 }, "dest": { "index": "kibana_sample_data_flights" } }
篩選出 reindex 任務:
GET _tasks?actions=*reindex
查詢具體 reindex 任務的執行情況:
GET _tasks/pMrJwVGSQcSgeTZdh71QRw:1413
Snapshot API 是 Elasticsearch 用于對數據進行備份和恢復的一組 API 接口,可以通過 Snapshot API 進行跨集群的數據遷移,原理就是從源 Elasticsearch 集群創建數據快照,然后在目標 Elasticsearch 集群中進行恢復。
創建快照前先要注冊 Repository , 一個 Repository 可以包含多份快照文件, Repository 主要有以下幾種類型:
fs: 共享文件系統,將快照文件存放于文件系統中 url: 指定文件系統的URL路徑,支持協議:http,https,ftp,file,jar s3: AWS S3對象存儲,快照存放于S3中,以插件形式支持 hdfs: 快照存放于hdfs中,以插件形式支持 azure: 快照存放于azure對象存儲中,以插件形式支持 gcs: 快照存放于google cloud對象存儲中,以插件形式支持
我們這里選擇共享文件系統的方式作為 Repository,首先部署一臺 NFS 服務器,用于文件共享。
安裝 NFS:
yum install -y nfs-utils systemctlenable nfs.service --now
創建相關目錄:
mkdir /home/elasticsearch/snapshot chmod 777 /home/elasticsearch/snapshot
編輯 NFS 配置文件 /etc/exports:
# rw 讀寫權限,sync 同步寫入硬盤 /home/elasticsearch/snapshot 192.168.1.0/24(rw,sync)
修改完成后重新 NFS 服務:
systemctl restart nfs
編輯 /etc/fstab文件,添加如下內容:
192.168.1.65:/home/elasticsearch/snapshot /home/elasticsearch/snapshot/ nfs defaults 0 0
編輯完成后執行 mount 命令掛載:
[root@elastic1 ~]# mount -a # 查看掛載點 [root@elastic1 ~]# df -hT Filesystem Type Size Used Avail Use% Mounted on ... 192.168.1.65:/home/elasticsearch/snapshot nfs 142G 39M 142G 1% /home/elasticsearch/snapshot
編輯 elasticsearch.yml 文件,添加如下內容:
path.repo: ["/home/elasticsearch/snapshot"]
添加完成后重啟 Elasticsearch,然后通過如下命令可以驗證是否添加 repo 目錄成功:
GET _cluster/settings?include_defaults&filter_path=*.path.repo # 輸出結果 { "defaults" : { "path" : { "repo" : [ "/home/elasticsearch/snapshot" ] } } }
注冊一個名為 dumpindex 的 Repository:
PUT /_snapshot/my_fs_backup { "type": "fs", "settings": { "location": "/home/elasticsearch/snapshot/dumpindex", #可以就寫dumpindex,相對路徑 "compress": true } }
查看 RRepository 在各個節點上的注冊情況:
POST _snapshot/my_fs_backup/_verify #輸出結果 { "nodes" : { "MjS0guiLSMq3Oouh008uSg" : { "name" : "elastic3" }, "V-UXoQMkQYWi5RvkjcO_yw" : { "name" : "elastic2" }, "9NPH3gJoQAWfgEovS8ww4w" : { "name" : "elastic4" }, "gdUSuXuhQ7GvPogi0RqvDw" : { "name" : "elastic1" } } }
indices
:做快照的索引。
wait_for_completion=true
:是否等待完成快照后再響應,如果為true會等快照完成后才響應。(默認為false,不等快照完成立即響應)
ignore_unavailable
: 設置為true時,當創建快照時忽略不存在的索引。
include_global_state
: 設置為false時,當某個索引所有的主分片不是全部的都可用時,可以完成快照。
通過如下命令指定對 dumpindex 索引做快照:
PUT _snapshot/my_fs_backup/snapshot_1?wait_for_completion=true { "indices": "dumpindex", "ignore_unavailable": true, "include_global_state": false } # 輸出結果 { "snapshot" : { "snapshot" : "snapshot_1", "uuid" : "cTvmz15pQzedDE-fHbzsCQ", "version_id" : 7110199, "version" : "7.11.1", "indices" : [ "dumpindex" ], "data_streams" : [ ], "include_global_state" : false, "state" : "SUCCESS", "start_time" : "2021-03-17T14:33:20.866Z", "start_time_in_millis" : 1615991600866, "end_time" : "2021-03-17T14:33:21.067Z", "end_time_in_millis" : 1615991601067, "duration_in_millis" : 201, "failures" : [ ], "shards" : { "total" : 1, "failed" : 0, "successful" : 1 } } }
在前面注冊 Repository 的目錄下可以看到生成了相關的快照文件:
[elasticsearch@elastic1 ~]$ ll /home/elasticsearch/snapshot/dumpindex/ total 16 -rw-rw-r--. 1 elasticsearch elasticsearch 439 Mar 17 22:18 index-0 -rw-rw-r--. 1 elasticsearch elasticsearch 8 Mar 17 22:18 index.latest drwxrwxr-x. 3 elasticsearch elasticsearch 36 Mar 17 22:18 indices -rw-rw-r--. 1 elasticsearch elasticsearch 193 Mar 17 22:18 meta-cTvmz15pQzedDE-fHbzsCQ.dat -rw-rw-r--. 1 elasticsearch elasticsearch 252 Mar 17 22:18 snap-cTvmz15pQzedDE-fHbzsCQ.dat
和在源集群注冊的方式一樣,修改 elasicsearch.yml 配置文件,并且通過下面命令注冊 Repository:
PUT _snapshot/my_fs_backup { "type": "fs", "settings": { "location": "/home/elasticsearch/snapshot/dumpindex", "compress": true } }
將源集群生成的快照文件拷貝到目標集群的 Repository 目錄下:
[elasticsearch@elastic1 ~]$ scp -r /home/elasticsearch/snapshot/dumpindex/* elasticsearch@192.168.1.67:/home/elasticsearch/snapshot/dumpindex/
在目標集群上查看快照信息:
GET _snapshot/my_fs_backup/snapshot_1 # 輸出結果 { "snapshots" : [ { "snapshot" : "snapshot_1", "uuid" : "cTvmz15pQzedDE-fHbzsCQ", "version_id" : 7110199, "version" : "7.11.1", "indices" : [ "dumpindex" ], "data_streams" : [ ], "include_global_state" : false, "state" : "SUCCESS", "start_time" : "2021-03-17T14:33:20.866Z", "start_time_in_millis" : 1615991600866, "end_time" : "2021-03-17T14:33:21.067Z", "end_time_in_millis" : 1615991601067, "duration_in_millis" : 201, "failures" : [ ], "shards" : { "total" : 1, "failed" : 0, "successful" : 1 } } ] }
將快照導入目標集群的 dumpindex 索引中:
POST _snapshot/my_fs_backup/snapshot_1/_restore { "indices": "dumpindex" }
查看索引數據,可以看到和源集群的一致:
{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 3, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dumpindex", "_type" : "_doc", "_id" : "q28kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "tom", "age" : 18 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "rG8kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "jack", "age" : 19 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "rW8kPngB8Nd5nYNvOgHd", "_score" : 1.0, "_source" : { "name" : "bob", "age" : 20 } } ] } }
Logstash支持從一個 Elasticsearch 集群中讀取數據然后寫入到另一個 Elasticsearch 集群:
編輯 conf/logstash.conf文件:
input { elasticsearch { hosts => ["http://192.168.1.171:9200"] index => "dumpindex" #如果需要用戶認證 #user => "username" #password => "password" } } output { elasticsearch { hosts => ["http://192.168.1.67:9200"] index => "dumpindex" } }
啟動 Logstash:
[elasticsearch@es1 logstash-7.11.1]$ bin/logstash -f config/logstash.conf
在目標集群上查看 dumpindex 索引數據,可以看到和源集群一致:
GET dumpindex/_search # 輸出結果 { "took" : 3, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 3, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dumpindex", "_type" : "_doc", "_id" : "jrfpQHgBERNPF_kwE-Jk", "_score" : 1.0, "_source" : { "@version" : "1", "name" : "tom", "@timestamp" : "2021-03-17T15:58:39.423Z", "age" : 18 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "j7fpQHgBERNPF_kwE-Jk", "_score" : 1.0, "_source" : { "@version" : "1", "name" : "jack", "@timestamp" : "2021-03-17T15:58:39.440Z", "age" : 19 } }, { "_index" : "dumpindex", "_type" : "_doc", "_id" : "kLfpQHgBERNPF_kwE-Jk", "_score" : 1.0, "_source" : { "@version" : "1", "name" : "bob", "@timestamp" : "2021-03-17T15:58:39.440Z", "age" : 20 } } ] } }
到此,相信大家對“Elasticsearch跨集群數據遷移怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。