您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“InfluxDB如何使用”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“InfluxDB如何使用”這篇文章吧。
  InfluxDB 是用Go語言編寫的一個開源分布式時序、事件和指標數據庫,無需外部依賴。 與 Elasticsearch 有些類似。
  功能:
     - 基于時間序列,支持與時間有關的相關函數(如最大,最小,求和等);
     - 可度量性:你可以實時對大量數據進行計算;
     - 基于事件:它支持任意的事件數據基于事件:它支持任意的事件數據。
  主要特點:
     - 無結構(無模式):可以是任意數量的列
     - 可拓展的,支持min, max, sum, count, mean, median 等一系列函數,方便統計支持min, max, sum, count, mean, median 等一系列函數,方便統計
     - 原生的HTTP支持,內置HTTP API原生的HTTP支持,內置HTTP API
     - 強大的類SQL語法強大的類SQL語法
     - 自帶管理界面,方便使用自帶管理界面,方便使用
  InfluxDB與傳統數據庫的比較:
  接下來通過一個insert操作,展開對InfluxDB獨有概念的介紹,在 InfluxDB 中,我們可以粗略的將要存入的一條數據看作一個虛擬的 key 和其對應的 value(field value),格式如下:
insert cpu_usage,host=server01,region=us-west value=0.64 1434055562000000000`
  虛擬的 key 包括以下幾個部分: database, retention policy, measurement, tag sets, field name, timestamp。
database:數據庫名,在 在 InfluxDB 中可以創建多個數據庫,不同數據庫中的數據文件是隔離存放的 。
retention policy : 存儲策略,用于設置數據保留的時間,每個數據庫剛開始會自動創建一個默認的存儲策略 autogen,數據保留時間為永久,之后用戶可以自己設置
measurement :類似于關系型數據庫中的表。
tag sets : tags 在 InfluxDB 中會按照字典序排序 ,例如: host=server01,region=us-west 和 host=server02,region=us-west 就是兩個不同的 tag set 。
tag :標簽, 在InfluxDB中,tag是一個非常重要的部分,表名+tag一起作為數據庫的索引,是“key-value”的形式 。
ield name: 例如上面數據中的 value 就是 fieldName,InfluxDB 中支持一條數據中插入多個 fieldName 。
timestamp : 每一條數據都需要指定一個時間戳,在 TSM 存儲引擎中會特殊對待,以為了優化后續的查詢操作 。
  Point 由時間戳(time)、數據(field)、標簽(tags)組成。
   Point相當于傳統數據庫里的一行數據 , 如下表所示:
  Series 相當于是 InfluxDB 中一些數據的集合,在同一個 database 中,retention policy、measurement、tag sets 完全相同的數據同屬于一個 series,同一個 series 的數據在物理上會按照時間順序排列存儲在一起。
   Shard 在 InfluxDB 中是一個比較重要的概念,它和 retention policy 相關聯。每一個存儲策略下會存在許多 shard,每一個 shard 存儲一個指定時間段內的數據,并且不重復 ; 例如 : 7點-8點 的數據落入 shard0 中,8點-9點的數據則落入 shard1 中。每一個 shard 都對應一個底層的 tsm 存儲引擎,有獨立的 cache、wal、tsm file。
   TSM 存儲引擎主要由幾個部分組成:cache、wal、tsm file、compactor 。
    1)Cache :cache 相當于是 LSM Tree 中的 memtabl。插入數據時,實際上是同時往 cache 與 wal 中寫入數據,可以認為 cache 是 wal 文件中的數據在內存中的緩存。當 InfluxDB 啟動時,會遍歷所有的 wal 文件,重新構造 cache,這樣即使系統出現故障,也不會導致數據的丟失。
   cache 中的數據并不是無限增長的,有一個 maxSize 參數用于控制當 cache 中的數據占用多少內存后就會將數據寫入 tsm 文件。如果不配置的話,默認上限為 25MB,每當 cache 中的數據達到閥值后,會將當前的 cache 進行一次快照,之后清空當前 cache 中的內容,再創建一個新的 wal 文件用于寫入,剩下的 wal 文件最后會被刪除,快照中的數據會經過排序寫入一個新的 tsm 文件中 。
     2)WAL:wal 文件的內容與內存中的 cache 相同,其作用就是為了持久化數據,當系統崩潰后可以通過 wal 文件恢復還沒有寫入到 tsm 文件中的數據 。
     3) TSM File : 單個 tsm file 大小最大為 2GB,用于存放數據 。
     4) Compactor:compactor 組件在后臺持續運行,每隔 1 秒會檢查一次是否有需要壓縮合并的數據 。
   主要進行兩種操作 :
    - 一種是 cache 中的數據大小達到閥值后,進行快照,之后轉存到一個新的 tsm 文件中 。
    - 另外一種就是合并當前的 tsm 文件,將多個小的 tsm 文件合并成一個,使每一個文件盡量達到單個文件的最大大小,減少文件的數量,并且一些數據的刪除操作也是在這個時候完成 。
官網地址:https://dl.influxdata.com
#1.本地下載 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.1.0.x86_64.rpm yum localinstall influxdb-1.1.0.x86_64.rpm #2.在線yum 安裝 #2.1配置yum源 cat <<EOF | sudo tee /etc/yum.repos.d/influxdb.repo [influxdb] name = InfluxDB Repository - RHEL \$releasever baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable enabled = 1 gpgcheck = 1 gpgkey = https://repos.influxdata.com/influxdb.key EOF #2.2 安裝 yum install go sudo yum install influxdb
#啟動服務 systemctl start influxdb.service #查看服務是否正常 systemctl status influxdb #查看服務對應進程 ps aux | grep influx
Ps:
(1)8086端口:HTTP API的端口
(2)8088:備份和恢復時使用,默認是8088
  由于是使用yum 安裝的所以安裝后,Influx的目錄會分布在 /usr/bin 、 /var/lib/influxdb/ 、 /etc/influxdb/ 下,下面我們一一介紹:
(1)/usr/bin 該目錄下是存放相應命令操作的目錄:
influxd influxdb服務器 influx influxdb命令行客戶端 influx_inspect 查看工具 influx_stress 壓力測試工具 influx_tsm 數據庫轉換工具(將數據庫從b1或bz1格式轉換為tsm1格式)
(2)/var/lib/influxdb/ 存放數據的目錄
data 存放最終存儲的數據,文件以.tsm結尾 meta 存放數據庫元數據 wal 存放預寫日志文件
(3)/etc/influxdb/influxdb.conf 存放配置文件的目錄
  influxdb.conf 就是 influxdb的配置文件。
Ps:在閱讀配置參數的時,最好是對該數據庫各個概念以及原理有一些了解,再去細看配置參數。
#編輯配置文件 vim /etc/influxdb/influxdb.conf #以下=后面的都是默認值 reporting-disabled = false -- 該選項用于上報influxdb的使用信息給InfluxData公司 bind-address = "127.0.0.1:8088" -- 備份恢復時使用,默認值為8088 [meta]下 dir = "/var/lib/influxdb/meta" -- meta數據存放目錄 retention-autocreate = true -- 用于控制默認存儲策略 logging-enabled = true -- 是否開啟meta日志 [data]下 dir = "/var/lib/influxdb/data" -- 最終數據(TSM文件)存儲目錄 wal-dir = "/var/lib/influxdb/wal" -- 預寫日志存儲目錄 query-log-enabled = true -- 是否開啟tsm引擎查詢日志 cache-max-memory-size = "1g" -- 用于限定shard最大值,大于該值時會拒絕寫入 cache-snapshot-memory-size = "25m" -- 用于設置快照大小,大于該值時數據會刷新到tsm文件 cache-snapshot-write-cold-duration = "10m" -- tsm1引擎 snapshot寫盤延遲 compact-full-write-cold-duration = "4h" -- tsm文件在壓縮前可以存儲的最大時間 max-series-per-database = 1000000 -- 限制數據庫的級數,該值為0時取消限制 trace-logging-enabled = false -- 是否開啟trace日志 [coordinator] 下 write-timeout = "10s" -- 寫操作超時時間 max-concurrent-queries = 0 -- 最大并發查詢數,0無限制 query-timeout = "0s" -- 查詢操作超時時間,0無限制 log-queries-after = "0s" -- 慢查詢超時時間,0無限制 max-select-point = 0 -- SELECT語句可以處理的最大點數(points)0無限制 max-select-series = 0 -- SELECT語句可以處理的最大級數(series),0無限制 max-select-buckets = 0 -- SELECT語句可以處理的最大"GROUP BY time()"的時間周期,0無限制 [retention]下 ,舊數據的保留策略 enabled = true -- 是否開啟該模塊 check-interval = "30m" -- 檢查時間間隔 [http] 下,influxdb的http接口配置 enabled = true -- 是否開啟該模塊 bind-address = ":8086" --綁定地址 auth-enabled = false -- 是否開啟認證 log-enabled = true -- 是否開啟日志 max-row-limit = 0 -- 配置查詢返回最大行數 max-connection-limit = 0 -- 配置最大連接數,0無限制
以上是常見操作配置,具體細節請參考:
https://www.cnblogs.com/MikeZhang/p/InfluxDBInstall20170206.html
Ps:以下操作可能與關系型數據庫操作不同,如果對Influx不了解,請先閱讀Influx介紹之后,在繼續往下閱讀。
#進入Influx數據庫 [root@iZbp19ujl2isnn8zc1hqirZ ~]# influx > show databases #顯示所有數據庫 > create database tes #創建數據庫 > drop database test #刪除數據庫 > use test #進入數據庫 > insert disk_free,hostname=server01 value=442221834240i #創建&& 插入數據 > select * from disk_free #查詢數據 > show measurement #顯示庫中的所有表 > drop measurement disk_free #刪除表
我們發現以上猛如虎的操作中,進入沒有類似create table的命令,這是為什么呢?
  原來是因為: InfluxDB中沒有顯示的創建表的語句,只能通過insert數據的房還是來建立新表 。
insert disk_free,hostname=server01 value=442221834240i -- 剖析以上命令的含義 disk_free 就是表名,hostname 是索引(tag),value=xx 是記錄值(field),記錄值可以有多個,系統自帶追加時間戳。 -- 也可以手動添加時間戳 insert disk_free,hostname=server01 value=442221834240i 1435362189575692182
  介紹: InfluxDB 是沒有提供直接刪除數據記錄的方法,但是提供數據保存策略,主要用于指定數據保留時間,超過指定時間,就刪除這部分數據。
#查看當前數據庫中的Retention Policies >show retention policies on test name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false
解釋:
  - name:名稱,此示例名稱為 default。
  - duration:持續時間,0代表無限制。
  - shardGroupDuration:shardGroup的存儲時間,shardGroup是InfluxDB的一個基本儲存結構,應該大于這個時間的數據在查詢效率上應該有所降低。
  - replicaN:全稱是replication,副本個數。
  - default:是否是默認策略。
#創建新的Retention Policies > create retention policy "rp_name" on "test" duration 3w replication 1 default #修改Retention Policies > alter retention policy "rp_name" on "test" duration 30d default > show retention policies on test name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false rp_name 720h0m0s 24h0m0s 1 true #刪除Retention Policies drop retention policy "rp_name" on "test"
創建語句剖析:
create retention policy "rp_name" on "test" duration 3w replication 1 default
  - rp_name:保存策略名稱
  - test:所針對的數據庫
  - 3w : 保存3周,3周之前的數據將被刪除,influxdb 具備各種事件參數,持續時間必須至少為1小時;比如:h(小時)、d(天)、w(星期) 。
  - replication : 副本個數,一般為1即可。
  介紹: InfluxDB 的連續查詢是在數據庫中自動定時啟動的一組語句,語句中必須包含 select 關鍵字 和 group by time() 關鍵字。 InfluxDB 會將查詢結果放在指定的數據表中。
  目的:使用連續查詢是最優的降低采樣率的方式,連續查詢和存儲策略搭配使用將會大大降低 InfluxDB 的系統占用量。而且使用連續查詢后,數據會存放到指定的數據表中,這樣就為以后統計不同精度的數據提供了方便。
  創建語句:
CREATE CONTINUOUS QUERY <cq_name> ON <database_name> [RESAMPLE [EVERY <interval>] [FOR <interval>]] BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement> FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>] END
  舉例:
CREATE CONTINUOUS QUERY wj_30m ON test BEGIN SELECT mean(connected_clients), MEDIAN(connected_clients), MAX(connected_clients), MIN(connected_clients) INTO redis_clients_30m FROM redis_clients GROUP BY ip,port,time(30m) --解釋: 在test數據庫中新建了一個名為 wj_30m 的連續查詢,每三十分鐘取一個 connected_clients 字段的平均值、中位值、最大值、最小值從redis_clients表中并且插入到redis_clients_30m表中,使用的數據保留策略都是default。
  連續查詢的其他操作:
#查看庫中的連續查詢 > show continuous queries name: _internal name query ---- ----- name: test name query ---- ----- #刪除Continuous Queries > drop continuous query <cq_name> on <database_name>
  用戶管理:
#以xxx用戶登錄 $influx -username useer -password abcd #顯示所有用戶 > show users user admin ---- ----- zy true #創建普通用戶 > CREATE USER "username" WITH PASSWORD 'password' #創建管理員用戶 > CREATE USER "admin" WITH PASSWORD 'admin' WITH ALL PRIVILEGES #為用戶設置密碼 > SET PASSWORD FOR <username> = '<password>' #刪除用戶 > DROP USER "username"
  權限設置:
#為一個已有用戶授權管理員權限 > GRANT ALL PRIVILEGES TO <username> #取消用戶權限 > REVOKE ALL PRIVILEGES FROM <username> #展示用戶在不同數據庫上的權限 > SHOW GRANTS FOR <user_name>
   關于Influxdb支持兩種方式:類SQL查詢和Http接口查詢:
-- 類SQL查詢(詢最新的三條數據) SELECT * FROM weather ORDER BY time DESC LIMIT 3 #Http接口查詢 $curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=test" --data-urlencode "q=SELECT * FROM weather ORDER BY time DESC LIMIT 3"
這里小編以maven項目的結構,測試關于InfluxDB數據庫的增刪改查。
<!-- InfluxDB 需要的jar包 --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.5</version> </dependency>
InfluxDBUtils:
import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import java.util.Map; /** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/11/15 * * Time: 10:10 * * Description: */ public class InfluxDBConnect { private String username;//用戶名 private String password;//密碼 private String openurl;//連接地址 private String database;//數據庫 private InfluxDB influxDB; public InfluxDBConnect(String username, String password, String openurl, String database){ this.username = username; this.password = password; this.openurl = openurl; this.database = database; } /**連接時序數據庫;獲得InfluxDB**/ public InfluxDB getConnect(){ if(influxDB==null){ influxDB=InfluxDBFactory.connect(openurl,username,password); influxDB.createDatabase(database); } return influxDB; } /** * 設置數據保存策略 * defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數為1/ 結尾DEFAULT 表示 設為默認的策略 */ public void setRetentionPolicy(){ String command=String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "defalut", database, "30d", 1); this.query(command); } /** * 查詢 * @param command 查詢語句 * @return */ public QueryResult query(String command){ return influxDB.query(new Query(command,database)); } /** * 插入 * @param measurement 表 * @param tags 標簽 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields){ Point.Builder builder =Point.measurement(measurement); builder.tag(tags); builder.fields(fields); influxDB.write(database,"",builder.build()); } /** * 刪除 * @param command 刪除語句 * @return 返回錯誤信息 */ public String deleteMeasurementData(String command){ QueryResult query = influxDB.query(new Query(command, database)); return query.getError(); } /** * 創建數據庫 * @param dbName */ public void createDB(String dbName){ influxDB.createDatabase(dbName); } /** * 刪除數據庫 * @param dbName */ public void deleteDB(String dbName){ influxDB.deleteDatabase(dbName); } }
pojo:
import java.io.Serializable; /** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/11/15 * * Time: 10:07 * * Description: */ public class CodeInfo implements Serializable { private static final long serialVersionUID = 1L; private Long id; private String name; private String code; private String descr; private String descrE; private String createdBy; private Long createdAt; private String time; private String tagCode; private String tagName; public static long getSerialVersionUID() { return serialVersionUID; } } //set and get method ...
測試:
import org.influxdb.InfluxDB; import org.influxdb.dto.QueryResult; import java.util.*; /** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/11/15 * * Time: 11:45 * * Description: 測試influxDB的增刪改查 */ public class Client { public static void main(String[] args) { String username = "admin";//用戶名 String password = "admin";//密碼 String openurl = "http://192.168.254.100:8086";//連接地址 String database = "test";//數據庫 InfluxDBConnect influxDBConnect = new InfluxDBConnect(username, password, openurl, database); influxDBConnect.getConnect(); //insertInfluxDB(influxDBConnect); testQuery(influxDBConnect); } //向Measurement中插入數據 public static void insertInfluxDB(InfluxDBConnect influxDB) { Map<String, String> tags = new HashMap<String, String>(); Map<String, Object> fields = new HashMap<String, Object>(); List<CodeInfo> list = new ArrayList<CodeInfo>(); CodeInfo info1 = new CodeInfo(); info1.setId(1L); info1.setName("BANKS"); info1.setCode("ABC"); info1.setDescr("中國農業銀行"); info1.setDescrE("ABC"); info1.setCreatedBy("system"); info1.setCreatedAt(new Date().getTime()); CodeInfo info2 = new CodeInfo(); info2.setId(2L); info2.setName("BANKS"); info2.setCode("CCB"); info2.setDescr("中國建設銀行"); info2.setDescrE("CCB"); info2.setCreatedBy("system"); info2.setCreatedAt(new Date().getTime()); list.add(info1); list.add(info2); String measurement = "sys_code"; for (CodeInfo info : list) { tags.put("TAG_CODE", info.getCode()); tags.put("TAG_NAME", info.getName()); fields.put("ID", info.getId()); fields.put("NAME", info.getName()); fields.put("CODE", info.getCode()); fields.put("DESCR", info.getDescr()); fields.put("DESCR_E", info.getDescrE()); fields.put("CREATED_BY", info.getCreatedBy()); fields.put("CREATED_AT", info.getCreatedAt()); influxDB.insert(measurement, tags, fields); } } //查詢Measurement中的數據 public static void testQuery(InfluxDBConnect influxDB) { String command = "select * from sys_code"; QueryResult results = influxDB.query(command); if (results == null) { return; } for(QueryResult.Result result:results.getResults()){ List<QueryResult.Series> series = result.getSeries(); for(QueryResult.Series serie :series){ System.out.println("serie:"+serie.getName()); //表名 Map<String, String> tags =serie.getTags(); if(tags !=null){ System.out.println("tags:-------------------------"); tags.forEach((key, value)->{ System.out.println(key + ":" + value); }); } System.out.println("values:-----------------------"); List<List<Object>> values = serie.getValues(); //列出每個serie中所有的列--value 列為全大寫 List<String> columns =serie.getColumns(); //列出每個serie中所有的列 for(List<Object> list : values){ for(int i=0; i< list.size(); i++){ String propertyName = setColumns(columns.get(i));//字段名 Object value =list.get(i); System.out.println(value.toString()); } } System.out.println("columns:"); for(String column:columns){ System.out.println(column); } } } } //刪除Measurement中的數據 public static void deletMeasurementData(InfluxDBConnect influxDB){ String command = "delete from sys_code where TAG_CODE='ABC'"; String err =influxDB.deleteMeasurementData(command); System.out.println(err); } private static String setColumns(String column){ System.out.println(column); String[] cols = column.split("_"); StringBuffer sb = new StringBuffer(); for(int i=0; i< cols.length; i++){ String col = cols[i].toLowerCase(); if(i != 0){ String start = col.substring(0, 1).toUpperCase(); String end = col.substring(1).toLowerCase(); col = start + end; } sb.append(col); } System.out.println(sb.toString()); return sb.toString(); } }
(1)普通導出
$influx_inspect export -datadir "/var/lib/influxdb/data" -waldir "/var/lib/influxdb/wal" -out "test_sys" -database "test" -start 2019-07-21T08:00:01Z #命令解釋 influx_inspect export -datadir "/data/influxdb/data" # 勿動,influxdb 默認的數據存儲位置 -waldir "/data/influxdb/wal" # 勿動,influxdb 默認的數據交換位置 -out "telemetry_vcdu_time" # 導出數據文件的文件名 -database telemetry_vcdu_time # 指定要導出數據的數據庫 -start 2019-07-21T08:00:01Z # 指定要導出的數據的起始時間
此時在當前目錄下會出現一個名為test_sys的文件,查看文件內容:
(2)導出成CSV格式文件
$influx -database 'test' -execute 'select * from sys_code' -format='csv' > sys_code.csv
此時在當前目錄下就多出一個sys_code.csv的文件,查看文件內容:
$influx -import -path=telemetry_sat_time -precision=ns #命令解釋 influx -import # 無參,勿動 -path=telemetry_sat_time # 指定導入數據的文件 -precision=ns # 指定導入數據的時間精度
以上是“InfluxDB如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。