您好,登錄后才能下訂單哦!
作者:秦偉
1.ELK的背景介紹與應用場景
在項目應用運行的過程中,往往會產生大量的日志,我們往往需要根據日志來定位分析我們的服務器項目運行情況與BUG產生位置。一般情況下直接在日志文件中tailf、 grep、awk 就可以獲得自己想要的信息。但在規模較大的場景中,此方法效率低下,面臨問題包括日志量過大、文本搜索太慢、如何多維度查詢。這就需要對服務器上的日志收集匯總。常見解決思路是建立集中式日志收集系統,將所有節點上的日志統一收集,管理,訪問。
一般大型系統往往是一種分布式部署的架構,不同的服務模塊部署在不同的服務器上,問題出現時,大部分情況需要根據問題暴露的關鍵信息,定位到具體的服務器和服務模塊,所以構建一套集中式日志系統,可以提高定位問題的效率。一個完整的集中式日志系統,需要包含以下幾個主要特點:
收集-能夠采集多種來源的日志數據,服務日志與系統日志。
傳輸-能夠穩定的把日志數據傳輸到中央系統
存儲-如何存儲日志數據,持久化數據。
分析-可以支持 UI 分析,界面化定制查看日志操作。
ELK提供了一整套解決方案,并且都是開源軟件,之間互相配合使用,完美銜接,高效的滿足了很多場合的應用。是目前主流的一種日志系統。
2.ELK簡介:
ELK是三個開源軟件的縮寫,分別表示:Elasticsearch, Logstash, Kibana , 它們都是開源軟件。其中后來新增了一個FileBeat。
即ELK主要由Elasticsearch(搜索)、Logstash(收集與分析)和Kibana(展示)三部分組件組成;
其中各組件說明如下:
Filebeat:輕量級數據收集引擎。早期的ELK架構中使用Logstash收集、解析日志,但是Logstash對內存、cpu、io等資源消耗比較高。如果用它來對服務器進行日志收集,將加重服務器的負載。相比 Logstash,Beats所占系統的CPU和內存幾乎可以忽略不計,所以filebeat作為一個輕量級的日志收集處理工具(Agent),它可以用來替代Logstash,由于其占用資源少,所以更適合于在各個服務器上搜集日志后傳輸給Logstash,這也是官方推薦的一種做法。【收集日志】
Logstash:數據收集處理引擎。支持動態的從各種數據源搜集數據,并對數據進行過濾、分析、豐富、統一格式等操作,然后存儲以供后續使用。【對日志進行過濾、分析】
Elasticsearch:分布式搜索引擎。是基于Lucene的開源分布式搜索服務器,具有高可伸縮、高可靠、易管理等特點。可以用于全文檢索、結構化檢索和分析,并能將這三者結合起來。【搜集、分析、存儲數據】
Kibana:可視化平臺。它能夠搜索、展示存儲在 Elasticsearch 中索引數據。使用它可以很方便的用圖表、表格、地圖展示和分析數據。【圖形化展示日志】
結合以上,常見的日志系統架構圖如下:
如上圖所示,日志文件分別由filebeat在服務器上進行收集,收集的日志文件匯總到logstash上并對文件數據進行過濾、分析、豐富、統一格式等操作,然后發送到Elasticsearch,進一步對日志進行結構化檢索和分析,并存儲下來,最后由kibana進行展示。
這只是日志系統的一種最初級結構,生產環境中需要對此結構進行進一步的優化。
3.ELK系統的部署(配置將在生產環境架構中進行說明)
環境:CentOS7.5部署ELK 7版本
準備工作: CentOS7.5
elasticsearch7, logstash7, kibana7
關閉防火墻和SELinux并更新yum源(非必須):yum -y update
本次分布式部署的ELK版本為2019年五月份左右最新發布的版本,更新了許多新特性,后面將詳細說明。在安裝上面本次極大簡化了安裝步驟,可以源碼,組件安裝,本次采用YUM+插件docker安裝,能達到相同的效果。
3.1Filebeat安裝與配置
#!/bin/bash
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
cat>/etc/yum.repos.d/elk-elasticsearch.repo<<EOF
[elastic-7.x]
name=Elasticrepository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
yum -yinstall filebeat
systemctl enablefilebeat
systemctl startfilebeat
3.2 整套軟件:elasticsearch,Logstash,Kibana和Filebeat的安裝(分開部署請分別安裝三個組件)
##########關于安裝環境,也可以參照以下二種方式,也比較簡便#################
----即通過RPM包的方式,此處給出Filebeat的方式,其他組件雷同
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.0.0-x86_64.rpm
sudo rpm -vifilebeat-7.0.0-x86_64.rpm
----Logstash實例另一種方式(優先采用)
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.rpm
yum install -y java
yum install -y logstash-7.0.0.rpm
服務啟動前必看,由于配置文件在下一章節給出,此處是前提準備,等配置完成后再啟動
3.3 關于elasticsearch需要重點關注的地方
1.elasticsearch在分詞方面,需要添加中文分詞的插件。在其安裝代碼的plugins目錄,即/usr/share/elasticsearch/plugins,需要增加中文分詞插件。
[root@192-168-108-35plugins]# ll
total 3192
[root@192-168-108-35 plugins]# pwd
/usr/share/elasticsearch/plugins
[root@192-168-108-35 plugins]# wget https://codeload.github.com/medcl/elasticsearch-analysis-ik/tar.gz/v7.0.0
下載后重啟時需要刪除插件源文件,否則報錯如下:
Caused by: java.nio.file.FileSystemException:
/usr/share/elasticsearch/plugins/v7.0.0/plugin-descriptor.properties: Not adirectory
所以安裝完成后必須刪除源文件v7.0.0
2.elasticsearch服務的啟動問題與前提準備
-- 首先在安裝完畢后會生成很多文件,包括配置文件日志文件等等,下面幾個是最主要的配置文件路徑
/etc/elasticsearch/elasticsearch.yml # els的配置文件
/etc/elasticsearch/jvm.options # JVM相關的配置,內存大小等等
/etc/elasticsearch/log4j2.properties # 日志系統定義
/usr/share/elasticsearch # elasticsearch 默認安裝目錄
/var/lib/elasticsearch # 數據的默認存放位置
-- 創建用于存放數據與日志的目錄
數據文件會隨著系統的運行飛速增長,所以默認的日志文件與數據文件的路徑不能滿足我們的需求,那么手動創建日志與數據文件路徑
mkdir -p /data/elkdata
mkdir -p /data/elklogs
-- JVM配置 (7.0版本針對此做出優化,可以基本保障溢出問題,但最好設置一下)
由于Elasticsearch是Java開發的,所以可以通過/etc/elasticsearch/jvm.options配置文件來設定JVM的相關設定。如果沒有特殊需求按默認即可。
不過其中還是有兩項最重要的-Xmx1g與-Xms1gJVM的最大最小內存。如果太小會導致Elasticsearch剛剛啟動就立刻停止。太大會拖慢系統本身。
vim /etc/elasticsearch/jvm.options #JVM最大、最小使用內存
-Xms1g
-Xmx1g
-- 使用ROOT賬戶執行命令
elasticsearch的相關配置已經完成,下面需要啟動elasticsearch集群。但是由于安全的考慮,elasticsearch不允許使用root用戶來啟動,所以需要創建一個新的用戶,并為這個賬戶賦予相應的權限來啟動elasticsearch集群。
創建ES運行用戶
passwd elasticsearch
同時修改ES目錄權限,以下操作都是為了賦予es用戶操作權限
-- 安裝源碼文件目錄
[root@192-168-108-35 share]# chown -R elk :elk/usr/share/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /var/log/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /data
-- 運行常見的報錯信息
[1]文件數目不足
elk soft memlock unlimited
elk hard memlock unlimited
elk soft nofile 65536
elk hard nofile 131072
退出用戶重新登錄,使配置生效
[2]: max virtual memoryareas vm.max_map_count [65530] is too low, increase to at least [262144]
解決辦法:
在 /etc/sysctl.conf文件最后添加一行
vm.max_map_count=262144
即可永久修改
su elk
./bin/elasticsearch
后臺運行ES
可以加入-p 命令 讓es在后臺運行, -p 參數 記錄進程ID為一個文件
./bin/elasticsearch -p ./elasticsearch-pid -d
一般情況下,直接執行一下命令即可
./bin/elasticsearch-d
結束進程
kill -SIGTERM {pid}
kill -9 `ps -ef |grep elasticsearch|awk '{print $2}'
驗證一下服務是否正常
curl -i "http://192.168.60.200:9200"
4.安裝elasticsearch-head插件(支持前端界面查看數據)
安裝docker鏡像或者通過github下載elasticsearch-head項目都是可以的,1或者2兩種方式選擇一種安裝使用即可
【1】使用docker的集成好的elasticsearch-head
docker容器下載成功并啟動以后,運行瀏覽器打開http://localhost:9100/
【2】使用git安裝elasticsearch-head
檢查端口是否起來
netstat -antp |grep 9100
瀏覽器訪問測試是否正常
http://IP:9100/
【注意】由于elasticsearch-head:5鏡像對elasticsearch的7版本好像適配性不夠,所以部分顯示可能會有空白。推薦另外一個鏡像lmenezes/cerebro,下載后執行docker run -d -p 9000:9000lmenezes/cerebro就可以在9000端口查看了。
最后,filebeat logstash kibana可以在配置文件后,正常啟動,如果需要切換用戶的話,也可以參照上面。本次這三個組件全部默認用root用戶啟動。由于分開部署,與ES互不影響。
附:kafka三節點集群搭建
環境準備
1.zookeeper集群環境,本次采用Kafka自帶的Zookeeper環境。
kafka是依賴于zookeeper注冊中心的一款分布式消息對列,所以需要有zookeeper單機或者集群環境。
2.三臺服務器:
192.168.108.200 ELK-kafka-cluster
192.168.108.165 ELK-kafka-salve1
192.168.108.103 ELK-kafka-salve2
3.下載kafka安裝包
在 http://kafka.apache.org/downloads 中下載,目前最新版本的kafka已經到2.2.0,這里下載的是kafka_2.11-2.2.0.tgz
安裝kafka集群
1.上傳壓縮包到三臺服務器解壓縮到/opt/目錄下
tar -zxvf kafka_2.11-2.2.0.tgz -C /opt/
ls -s kafka_2.11-2.2.0 kafka
2.修改server.properties (/opt/kafka/config目錄下)
############################# Server Basics#############################
broker.id=0
######################## Socket Server Settings########################
listeners=PLAINTEXT://192.168.108.200:9092
advertised.listeners=PLAINTEXT://192.168.108.200:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics#############################
log.dirs=/var/log/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
######################## Internal Topic Settings#########################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
######################### Log Retention Policy########################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper#############################
zookeeper.connect=192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
######################## Group CoordinatorSettings ##########################
group.initial.rebalance.delay.ms=0
3.拷貝兩份到192.168.108.165和192.168.108.103
[root@192.168.108.165 config]# catserver.properties
broker.id=1
listeners=PLAINTEXT://192.168.108.165:9092
advertised.listeners=PLAINTEXT://192.168.108.165:9092
[root@k8s-n3 config]# cat server.properties
broker.id=2
listeners=PLAINTEXT://192.168.108.103:9092
advertised.listeners=PLAINTEXT://192.168.108.103:9092
然后添加環境變量 在/etc/profile 中添加
export ZOOKEEPER_HOME=/opt/kafka
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile 重載生效
4.修改zookeeper.properties:
1.設置連接參數,添加如下配置
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
2.設置broker Id的服務地址
server.0=192.168.108.200:2888:3888
server.1=192.168.108.165:2888:3888
server.2=192.168.108.103:2888:3888
總結就在Kafka源碼目錄的/kafka/config/zookeeper.properties文件設置如下
dataDir=/opt/zookeeper # 這時需要在/opt/zookeeper文件夾下,新建myid文件,把broker.id填寫進去,本次本節點為0。
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.108.200:2888:3888
server.1=192.168.108.165:2888:3888
server.2=192.168.108.103:2888:3888
然后在zookeeper數據目錄添加id配置(dataDir=/opt/zookeeper)
在zookeeper.properties的數據目錄中創建myid文件
在各臺服務的zookeeper數據目錄添加myid文件,寫入服務broker.id屬性值,如這里的目錄是/usr/local/zookeeper/data
第一臺broker.id為0的服務到該目錄下執行:echo 0 > myid
[root@192-168-108-165 kafka]# cd /opt/zookeeper/
[root@192-168-108-165 zookeeper]# ls
myid version-2
[root@192-168-108-165 zookeeper]# cat myid
1
[root@192-168-108-165 zookeeper]#
集群啟動
集群啟動:cd /opt/kafka
先分別啟動zookeeper
kafka使用到了zookeeper,因此你要首先啟動一個zookeeper服務,如果你沒有zookeeper服務。kafka中打包好了一個簡潔版的單節點zookeeper實例。
kafka啟動時先啟動zookeeper,再啟動kafka;關閉時相反,先關閉kafka,再關閉zookeep
前臺啟動:bin/zookeeper-server-start.sh config/zookeeper.properties
[root@192-168-108-200 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODENAME
java 5197 root 98u IPv6 527171 0t0 TCP *:eforward(LISTEN)
可以看到已經啟動
后臺啟動:bin/zookeeper-server-start.sh-daemon config/zookeeper.properties
再分別啟動kafka
前臺啟動:bin/kafka-server-start.sh config/server.properties
[root@192-168-108-200 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5519 root 157u IPv6 528511 0t0 TCP 192-168-108-200:XmlIpcRegSvc (LISTEN)
java 5519 root 167u IPv6 528515 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.191:56656(ESTABLISHED)
java 5519 root 169u IPv6 528516 0t0 TCP192-168-108-200:XmlIpcRegSvc->192.168.108.187:49368 (ESTABLISHED)
java 5519 root 176u IPv6 528096 0t0 TCP192-168-108-200:48062->192-168-108-200:XmlIpcRegSvc (ESTABLISHED)
java 5519 root 177u IPv6 528518 0t0 TCP192-168-108-200:XmlIpcRegSvc->192-168-108-200:48062 (ESTABLISHED)
java 5519 root 178u IPv6 528097 0t0 TCP192-168-108-200:XmlIpcRegSvc->192.168.108.187:49370 (ESTABLISHED)
后臺啟動:bin/kafka-server-start.sh -daemon config/server.properties
服務啟動腳本
cd /opt/kafka
kill -9 ps -ef |grep kafka|awk '{print $2}'
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemonconfig/server.properties
Zookeeper+Kafka集群測試
/opt/kafka/bin
1.創建topic:
kafka-topics.sh --create--zookeeper
192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181--replication-factor 3 --partitions 3 --topic test
2.顯示topic
kafka-topics.sh --describe--zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181--topic test
[root@192-168-108-200 bin]# kafka-topics.sh--describe --zookeeper192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --topic tyun
Topic:tyun PartitionCount:3 ReplicationFactor:3Configs:
Topic: tyun Partition: 0 Leader: 1 Replicas: 1,0,2Isr: 1,0,2
Topic: tyun Partition: 1 Leader: 2 Replicas: 2,1,0Isr: 2,1,0
Topic: tyun Partition: 2 Leader: 0 Replicas: 0,2,1Isr: 0,2,1
PartitionCount:partition個數 ??
ReplicationFactor:副本個數 ??
Partition:partition編號,從0開始遞增 ??
Leader:當前partition起作用的broker.id ??
Replicas: 當前副本數據所在的broker.id,是一個列表,排在最前面的其作用??
Isr:當前kakfa集群中可用的broker.id列表
3.列出topic
kafka-topics.sh --list --zookeeper192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181
test
創建 producer(生產者);
kafka-console-producer.sh --broker-list192.168.108.200:9092 --topic test
hello
創建 consumer(消費者)
kafka-console-consumer.sh --bootstrap-server192.168.108.200:9092 --topic test --from-beginning
hello
4.查看寫入kafka集群中的消息(重要命令,判斷日志是否寫入Kafka的重要依據)
bin/kafka-console-consumer.sh--bootstrap-server 192.168.108.200:9092 --topic tiops --from-beginning
5.刪除 Topic----命令標記刪除后,再次刪除對應的數據目錄
bin/kafka-topics.sh--delete --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name
若 delete.topic.enable=true
直接徹底刪除該 Topic。
若delete.topic.enable=false
如果當前 Topic 沒有使用過即沒有傳輸過信息:可以徹底刪除。
如果當前 Topic 有使用過即有過傳輸過信息:并沒有真正刪除 Topic 只是把這個 Topic 標記為刪除(marked for deletion),重啟 Kafka Server 后刪除。
注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(較新的版本中無顯式配置,默認為 true)。
整個系統一共含有10臺主機(filebeat部署在客戶端,不計算在內),其中Logstash有四臺,Elasticsearch有二臺,Kafka集群三臺,kibana一臺并配置Nginx代理。
架構解釋:
(1)首先用戶通過nginx代理訪問ELK日志統計平臺,這里的Nginx可以設置界面密碼。
(2)Nginx將請求轉發到kibana
(3)kibana到Elasticsearch中去獲取數據,這里的Elasticsearch是兩臺做的集群,日志數據會隨機保存在任意一臺Elasticsearch服務器。
(4)Logstash2從Kafka中取出數據并發送到Elasticsearch中。
(5)Kafka服務器做日志數據的持久化保存,避免web服務器日志量過大的時候造成的數據收集與保存不一致而導致日志丟失,其中Kafka可以做集群,然后再由Logstash服務器從Kafka持續的取出數據。
(6)logstash3從Filebeat取出的日志信息,并放入Kafka中進行保存。
(7)Filebeat在客戶端進行日志的收集。
注1:【Kafka的加入原因與作用】
整個架構加入Kafka,是為了讓整個系統更好的分層,Kafka作為一個消息流處理與持久化存儲軟件,能夠幫助我們在主節點上屏蔽掉多個從節點之間不同日志文件的差異,負責管理日志端(從節點)的人可以專注于向 Kafka里生產數據,而負責數據分析聚合端的人則可以專注于從 Kafka內消費數據。所以部署時要把Kafka加進去。
而且使用Kafka進行日志傳輸的原因還在于其有數據緩存的能力,并且它的數據可重復消費,Kafka本身具有高可用性,能夠很好的防止數據丟失,它的吞吐量相對來說比較好并且使用廣泛。可以有效防止日志丟失和防止logsthash掛掉。綜合來說:它均衡了網絡傳輸,從而降低了網絡閉塞,尤其是丟失數據的可能性,
注2:【雙層的Logstash作用】
這里為什么要在Kafka前面增加二臺logstash呢?是因為在大量的日志數據寫入時,容易導致數據的丟失和混亂,為了解決這一問題,增加二臺logstash可以通過類型進行匯總分類,降低數據傳輸的臃腫。
如果只有一層的Logstash,它將處理來自不同客戶端Filebeat收集的日志信息匯總,并且進行處理分析,在一定程度上會造成在大規模日志數據下信息的處理混亂,并嚴重加深負載,所以有二層的結構進行負載均衡處理,并且職責分工,一層匯聚簡單分流,一層分析過濾處理信息,并且內層都有二臺Logstash來保障服務的高可用性,以此提升整個架構的穩定性。
接下來分別說明原理與各個組件之間的交互(配置文件)。
1.Filebeat與Logstash-collect連接配置
這里為了方便記憶,把此處的Logstash稱為Logstash-collect,首先看Filebeat的配置文件:
其中,filebeat配置輸出到logstash:5044端口,在logstash上啟動5044端口作為logstash與filebeat的通信agent,而logstash本身服務起在9600端口.
[root@192-168-108-191 logstash]# ss -lnt
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 128 :::5044 :::
LISTEN 0 50 ::ffff:192.168.108.191:9600 :::
vim /etc/filebeat/filebeat.yml # 對幾個重要配置進行設置如下
type: log
enabled: true
paths:
exclude_lines: ['^DBG']
output.logstash:
hosts:["192.168.108.191:5044", "192.168.108.87:5044"] # 發往二臺Logstash-collect
loadbalance: true
worker: 2
#output.elasticsearch:
#hosts: ["http://192.168.108.35:9200"]
#username: "elk"
#password: ""
#setup.kibana:
#host: "192.168.108.182:5601"
注1:【Filebeat與Logstash-collect的日志消息流轉】
注意事項:Logstash-collect怎么接收到Filebeat發送的消息,并再次發向下一級呢?
不僅僅配置發往二臺Logstash-collect的ip地址加端口,還要注意fields字段,可以理解為是一個Key,當Logstash-collect收到多個Key時,它可以選擇其中一個或者多個Key來進行下一級發送,達到日志消息的轉發。
在使用了6.2.3版本的ELK以后,如果使用if [type]配置,則匹配不到在filebeat里面使用document_type定義的字符串。因為6.0版本以上已經取消了document_type的定義。如果要實現以上的配置只能使用如下配置:
fields:
service: filebeat
service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷,條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略。
注2:【Filebeat負載平衡主機的輸出】
這里還有一點需要說明一下,就是關于Filebeat與Logstash-collect連接的負載均衡設置。
logstash是一個無狀態的流處理軟件。logstash怎么集群配置只能橫向擴展,然后自己用配置管理工具分發,因為他們內部并沒有交流的。
Filebeat提供配置選項,可以使用它來調整負載平衡時發送消息到多個主機。要啟用負載均衡,您指定loadbalance的值為true。
output.logstash:
hosts: ["192.168.108.191:5044","192.168.108.87:5044"] # 發往二臺Logstash-collect
loadbalance:true
worker: 2
loadbalance: false # 消息只是往一個logstash里發,如果這個logstash掛了,就會自動將數據發到另一個logstash中。(主備模式)
loadbalance: true # 如果為true,則將數據均分到各個logstash中,掛了就不發了,往存活的logstash里面發送。
即logstash地址如果為一個列表,如果loadbalance開啟,則負載到里表中的服務器,當一個logstash服務器不可達,事件將被分發到可到達的logstash服務器(雙活模式)
loadbalance選項可供Redis,Logstash, Elasticsearch輸出。Kafka的輸出可以在其自身內部處理負載平衡。
同時每個主機負載平衡器還支持多個workers。默認是1。如果你增加workers的數量, 將使用額外的網絡連接。workers參與負載平衡的總數=主機數量*workers。
在這個小節,配置總體結構,與日志數據流向如下圖:
2.Logstash-collect與Kafka連接配置
首先看/etc/logstash/logstash.yml文件,作為Logstash-collect的公共配置文件,我們需要做一下的更改:
vim/etc/logstash/logstash.yml
path.data: /var/lib/logstash #數據存放路徑
path.config: /etc/logstash/conf.d #配置文件的讀取路徑
path.logs: /var/log/logstash #日志文件的保存路徑
pipeline.workers: 2 # 默認為CPU的核數
http.host: "192.168.108.186"
http.port: 9600-9700 # logstash will pick up thefirst available ports
注意上面配置文件的讀取路徑 /etc/logstash/conf.d,在這個文件夾里面,我們可以設置Logstash的輸入輸出。logstash支持把配置寫入文件/etc/logstash/conf.d/xxx.conf,然后通過讀取配置文件來采集數據。
logstash收集日志基本流程:input–>codec–>filter–>codec–>output ,所以配置文件可以設置輸入輸出與過濾的基本格式如下:(這里暫時忽略filter,因為這一層的Logstash主要匯聚數據,暫不分析匹配)
關于codec => json # json處理
logstash最終會把數據封裝成json類型,默認會添加@timestamp時間字段、host主機字段、type字段。原消息數據會整個封裝進message字段。如果數據處理過程中,用戶解析添加了多個字段,則最終結果又會多出多個字段。也可以在數據處理過程中移除多個字段,總之,logstash最終輸出的數據格式是json格式。所以數據經過Logstash會增加額外的字段,可以選擇過濾。
Logstash主要由 input,filter,output三個組件去完成采集數據。
input {
指定輸入
}
filter {
}
output {
指定輸出
解釋說明如下:
input
input組件負責讀取數據,可以采用file插件讀取本地文本文件,stdin插件讀取標準輸入數據,tcp插件讀取網絡數據,log4j插件讀取log4j發送過來的數據等等。本次用beats插件讀取Filebeat發送過來的日志消息。
filter
filter插件負責過濾解析input讀取的數據,可以用grok插件正則解析數據,date插件解析日期,json插件解析json等等。
output
output插件負責將filter處理過的數據輸出。可以用elasticsearch插件輸出到es,redis插件輸出到redis,stdout插件標準輸出,kafka插件輸出到kafka等等
在實際的Tiops平臺日志處理流程中,配置如下:
二個Logstash-collect均采用此相同的配置
在/etc/logstash/conf.d目錄下,創建filebeat-kafka.conf文件,內容如下:
input {
beats {
port => 5044 # logstash上啟動5044端口作為logstash與filebeat的通信agent
codec => json # json處理
}
}
#此處附加Redis作為緩存消息的配置,本次采用Kafka,所以此部分暫時注釋
#output {
#}
output {
if [fields][service] == "filebeat"{ # 參照第一部分,Logstash-collect怎么接收到Filebeat發送的消息,并再次發向下一級呢?這里就是取filebeat的Fields字段的Key值,進行向下發送
kafka {
bootstrap_servers =>"192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092" # kafka集群配置,全部IP:端口都要加上
topic_id => "tiops" # 設置Kafka的topic,這樣發送到kafka中時,kafka就會自動創建該topic
}
}
}
在這個小節,配置總體結構與日志數據流向如下圖:
3.Kafka與Logstash-grok連接配置
在上面第二部分,我們已經成功把數據從Logstash-collect發送到Kafka,而Kafka本身的輸入輸出并沒有配置。而是Logstash-collect指定了一個topic,這并不代表kafka不需要配置參數,只是kafka本身不需要配置pull/push目標參數,它被動接受
別的生產者發送過來的數據,因為Logstash-collect指定了Kafka集群的IP地址和端口。那么數據將會被發送到Kafka中,那么Kafka如何處理這些日志數據,那么這時候就需要Kafka配置消息處理參數了。
這個我們現在暫時先給出參數,至于具體原因涉及到MQ的高可用、消息過期策略、如何保證消息不重復消費、如何保證消息不丟失、如何保證消息按順序執行以及消息積壓在消息隊列里怎么辦等問題,我們之后詳細分析并解決。
以下是Kafka集群中,某一臺的配置文件,在Kafka源碼目錄的/kafka/config/server.properties文件設置如下:(加粗的為重要配置文件)
broker.id=0
listeners=PLAINTEXT://192.168.108.200:9092
advertised.listeners=PLAINTEXT://192.168.108.200:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 # zookeeper 集群
delete.topic.enable=true
同時kafka是依賴于zookeeper注冊中心的一款分布式消息隊列,所以需要有zookeeper單機或者集群環境。本次采用Kafka自帶的Zookeeper環境。
在Kafka源碼目錄的/kafka/config/zookeeper.properties文件設置如下:
dataDir=/opt/zookeeper # 這時需要在/opt/zookeeper文件夾下,新建myid文件,把broker.id填寫進去,本次本節點為0。
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.108.200:2888:3888
server.1=192.168.108.165:2888:3888
server.2=192.168.108.103:2888:3888
其他注意事項,請看之前的Kafka集群安裝步驟,這里主要給出集群某一節點的配置文件參數。
到目前為止,我們配置了kafka的參數,對消息進行了處理(持久化...),接下來就需要對Kafka中的消息進行下一步的傳輸,即傳送到Logstash-grok。
這里需要注意的是:并不是Kafka主動把消息發送出去的,當然Kafka也支持這種操作,但是在這里,采用的是Logstash-grok作為消費者,主動拉取Kafka中的消息,來進行消費。
所以就需要像第二步Logstash-collect那樣進行輸入輸出設置,這里Logstash-grok不僅僅配置輸入輸出,最重要的是它的過濾filter作用。
在/etc/logstash/conf.d目錄下,創建logstash-es.conf文件,這里給出Logstash-grok中一個的內容如下(二個Logstash-grok配置一樣):
#input {
#}
input{
kafka{
bootstrap_servers =>["192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092"] #配置拉取的kafka集群消息地址
group_id => "tyun" # 設置group_id,對于這個消費組而言會有一個消費 offset,消費掉是 auto_commit 部分控制的,這個參數會定期將你消費的 offset 提交到 kafka,下次啟動的時候已經消費過的就不會重復消費了;
auto_offset_reset =>"earliest"
consumer_threads => "3" #多個實例的consumer_threads數之和應該等于topic分區數近似達到logstash多實例的效果。
decorate_events => "false"
topics => ["tiops"] # kafka topic 名稱,獲取指定topic內的消息
type => "log"
codec => json
}
}
filter { # 此處Logstash最主要的過濾作用,可以自定義正則表達式,選擇性輸出消息
grok {
match => {
#截取
"message" =>"(?<LOG>(?<=architecture=x86_64}|containerized=true})(.*)/?)"
}
}
}
output { #過濾后的消息,發送到ES集群
elasticsearch {
hosts =>["192.168.108.35:9200","192.168.108.194:9200"]
codec => json
index =>"tiops-%{+YYYY.MM.dd}" # 自定義索引名稱
}
在這個小節,配置總體結構與日志數據流向如下圖:
4.Logstash-grok與Elasticsearch(ES)集群連接配置
ElasticSearch集群
在第三步中,我們已經將Logstash-grok過濾后的消息,發送到了ES集群,這里的ES集群被動接受發送過來的消息,即ES集群本身不需要配置輸入源,已經由其他組件發送決定了。
ES的配置將集中體現在集群自身的配置上。
Elasticsearch 可以橫向擴展至數百(甚至數千)的服務器節點,同時可以處理PB級數據
Elasticsearch 天生就是分布式的,并且在設計時屏蔽了分布式的復雜性。
ES功能:(1)分布式的搜索引擎和數據分析引擎
(2)全文檢索,結構化檢索,數據分析
(3)對海量數據進行近實時的處理
安裝后的ES集群,配置在/etc/elasticsearch/elasticsearch.yml文件,集群中,各個節點配置大體相同,不同之處下面有紅字標出,主要就是節點名稱與網絡監聽地址,需要每個節點自行按實際填寫。
cluster.name: elk-cluster #集群名稱
node.name: elk-node1 #節點名稱,一個集群之內節點的名稱不能重復
path.data: /data/elkdata #數據路徑
path.logs: /data/elklogs #日志路徑
network.host: 192.168.108.35 #網絡監聽地址,可以訪問elasticsearch的ip, 默認只有本機
http.port: 9200 #用戶訪問查看的端口,9300是ES組件訪問使用
discovery.seed_hosts: ["192.168.108.35","192.168.108.194"] 單播(配置一臺即可,生產可以使用組播方式)
cluster.initial_master_nodes: ["192.168.108.35"] # Master
http.cors.enabled: true # 這二個是ES插件的web顯示設置,head插件防止跨域
http.cors.allow-origin: "*"
可見集群配置中最重要的兩項是node.name與network.host,每個節點都必須不同。其中node.name是節點名稱主要是在Elasticsearch自己的日志加以區分每一個節點信息。
discovery.seed_hosts是集群中的節點信息,可以使用IP地址、可以使用主機名(必須可以解析)。
當ElasticSearch的節點啟動后,它會利用多播(multicast)(或者單播),如果用戶更改了配置)尋找集群中的其它節點,并與之建立連接。(節點發現)
這里沒有配置分片的數目,在ES的7版本中,默認為1,這個包括replicas可后來自定義設置。
在這個小節,配置總體結構與日志數據流向如下圖:
5.Elasticsearch(ES)集群與Kibana連接配置
Kibana是一個開源的分析和可視化平臺,和Elasticsearch一起工作時,就可以用Kibana來搜索,查看,并和存儲在Elasticsearch索引中的數據進行交互。
可以輕松地執行高級數據分析,并且以各種圖標、表格和地圖的形式可視化數據。
同時Kibana使得理解大量數據變得很容易。它簡單的、基于瀏覽器的界面使你能夠快速創建和共享動態儀表板,實時顯示Elasticsearch查詢的變化。
在第四步中,我們已經將ES集群中的數據進行了處理,建立了索引(相當于MySQL的database概念),并且分片、副本等都存在,而且處理后的數據更加方便于檢索。
Kibana的配置在/etc/kibana/kibana.yml文件中,如下:
server.port: 5601 #監聽端口
server.host: "192.168.108.182" #監聽IP地址,建議內網ip
#kibana.index: ".newkibana"
elasticsearch.hosts: ["http://192.168.108.35:9200","http://192.168.108.194:9200"] # ES機器的IP,這里的所有host必須來自于同一個ES集群
#xpack.security.enabled: false #添加這條,這條是配置kibana的安全機制,暫時關閉。
可以看出,這里的Kibana會主動去ES集群中去拉取數據,最后在瀏覽器Web端進行實時展示。
這時,我們打開瀏覽器界面如下,看看最終的結果:
注:Nginx代理配置,可以通過自定義域名訪問并設置密碼,提升安全性。(可選)
upstream kibana_server {
server 192.168.108.182:5601weight=1 max_fails=3 fail_timeout=60;
}
server {
listen 80;
server_name www.kibana.com;
auth_basic "RestrictedAccess";
auth_basic_user_file/etc/nginx/conf.d/htpasswd.users;
location / {
proxy_pass http://kibana_server;
proxy_http_version 1.1;
在這個小節,配置總體結構與日志數據流向如下圖:
6.總結
再將日志流向的配置文件輸入輸出標注一下:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。