您好,登錄后才能下訂單哦!
這篇文章主要講解了“Golang怎么監聽日志文件并發送到kafka中”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Golang怎么監聽日志文件并發送到kafka中”吧!
go-ini
,sarama
,tail
其中:
go-ini
:用于讀取配置文件,統一管理配置項,有利于后其的維護
sarama
:是一個go操作kafka的客戶端。目前我用于向kefka發送消息
tail
:類似于linux的tail命令了,讀取文件的后幾行。如果文件有追加數據,會檢測到。就是通過它來監聽日志文件
可視化工具:
offsetexplorer
:是kafka的可視化工具,這里用來查看消息是否投遞成功
加載配置,初始化sarama
和kafka
。
起一個的協程,利用tail
不斷去監聽日志文件的變化。
主協程中一直阻塞等待tail
發送消息,兩者通過一個管道通訊。一旦主協程接收到新日志,組裝格式,然后發送到kafka中
環境的話,確保zookeeper
和kafka
正常運行。因為還沒有使用sarama
讀取數據,使用offsetexplorer
來查看任務是否真的投遞成功了。
serve來存放寫tail
服務類和sarama
服務類,conf存放ini配置文件
main函數為程序入口
main函數做的有:構建配置結構體,映射配置文件。調用和初始化tail
,srama
服務。
package main import ( "fmt" "sarama/serve" "github.com/go-ini/ini" ) type KafkaConfig struct { Address string `ini:"address"` ChannelSize int `ini:"chan_size"` } type TailConfig struct { Path string `ini:"path"` Filename string `ini:"fileName"` // 如果是結構體,則指明分區名 Children `ini:"tailfile.children"` } type Config struct { KafkaConfig `ini:"kafka"` TailConfig `ini:"tailfile"` } type Children struct { Name string `ini:"name"` } func main() { // 加載配置 var cfg = new(Config) err := ini.MapTo(cfg, "./conf/go-conf.ini") if err != nil { fmt.Print(err) } // 初始化kafka ks := &serve.KafukaServe{} // 啟動kafka消息監聽。異步 ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize)) // 關閉主協程時,關閉channel defer ks.Destruct() // 初始化tail ts := &serve.TailServe{} ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename) // 阻塞 ts.Listener(ks.MsgChan) }
有3個方法 :
InitKafka
,組裝配置項以及初始化接收消息的管道,
Listener
,監聽管道消息,收到消息后,將消息組裝,發送到kafka
Destruct
, 關閉管道
package serve import ( "fmt" "github.com/Shopify/sarama" ) type KafukaServe struct { MsgChan chan string //err error } func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) { // 讀取配置 config := sarama.NewConfig() // 1. 初始化生產者配置 config.Producer.RequiredAcks = sarama.WaitForAll // 選擇分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 成功交付的信息 config.Producer.Return.Successes = true ks.MsgChan = make(chan string, chanSize) go ks.Listener(addr, chanSize, config) } func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) { // 連接kafka var kafkaClient, _ = sarama.NewSyncProducer(addr, config) defer kafkaClient.Close() for { select { case content := <-ks.MsgChan: // msg := &sarama.ProducerMessage{ Topic: "weblog", Value: sarama.StringEncoder(content), } partition, offset, err := kafkaClient.SendMessage(msg) if err != nil { fmt.Println(err) } fmt.Println("分區,偏移量:") fmt.Println(partition, offset) fmt.Println("___") } } } func (ks *KafukaServe) Destruct() { close(ks.MsgChan) }
主要包括了兩個方法:
TailInit
初始化,組裝tail
配置。Listener
Listener
,保存kafka
服務類初始化之后的管道。監聽日志文件,如果有新日志,就往管道里發送
package serve import ( "fmt" "github.com/hpcloud/tail" ) type TailServe struct { tails *tail.Tail } func (ts *TailServe) TailInit(filenName string) { config := tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, } // 打開文件開始讀取數據 ts.tails, _ = tail.TailFile(filenName, config) // if err != nil { // fmt.Println("tails %s failed,err:%v\n", filenName, err) // return nil, err // } fmt.Println("啟動," + filenName + "監聽") } func (ts *TailServe) Listener(MsgChan chan string) { for { msg, ok := <-ts.tails.Lines if !ok { // todo fmt.Println("數據接收失敗") return } fmt.Println(msg.Text) MsgChan <- msg.Text } } // 測試案例 func Demo() { filename := `E:\xx.log` config := tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, } // 打開文件開始讀取數據 tails, err := tail.TailFile(filename, config) if err != nil { fmt.Println("tails %s failed,err:%v\n", filename, err) return } var ( msg *tail.Line ok bool ) fmt.Println("啟動") for { msg, ok = <-tails.Lines if !ok { fmt.Println("tails file close reopen,filename:$s\n", tails.Filename) } fmt.Println("msg:", msg.Text) } }
感謝各位的閱讀,以上就是“Golang怎么監聽日志文件并發送到kafka中”的內容了,經過本文的學習后,相信大家對Golang怎么監聽日志文件并發送到kafka中這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。