您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么使用nsq消息中間件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么使用nsq消息中間件”吧!
組成
nsq是一款輕量級的消息中間件,查看nsq官網給出的解釋,可知nsq的組成和分工:
nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.
從上面可以看出:
nsqd是一個守護進程,負責與客戶端打交道,負責緩存來自客戶端的消息
nsqd可以作為一個單實例獨自運行,通常在nsqlookupd實例的協同下組成集群(集群場景下,nsqd能用于發現topics和channels)
nsqd監聽兩個TCP端口,分別用于客戶端(默認4150)和HTTP API(默認4151),另外可監聽用于HTTPS的端口
nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.
從上面可以看出:
nsqlookupd是一個守護進程,負責管理拓撲信息,可供客戶端查詢得到nsqd節點(nsqd廣播topics和channels信息)
nsqlookupd提供兩種接口,TCP接口(默認4160)被nsqd用來發送廣播,HTTP接口(默認4161)被客戶端用于發現nsqd和連接nsqadmin
nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.
從上面可以看出:
nsqadmin是一個后臺管控Web進程,可實時瀏覽集群狀態,可發起多種管理任務(nsqadmin依賴nsqlookupd來處理用戶操作)
安裝
這里為了快速搭建,使用docker compose方式安裝(docker-compose.yaml見附件)
拷貝docker-compose.yaml到虛擬機,相關命令如下:
分別啟動 nsqlookupd/nsqadmin/nsqd,對應三個容器和端口映射
瀏覽器中可打開 http://192.168.1.91:32770 訪問 nsqadmin(虛擬機IP為192.168.1.91)
測試
package main import ( "bufio" "fmt" "github.com/bitly/go-nsq" "nsq-demo/src/config" "os" ) var producer *nsq.Producer func InitProducer(addr string) { var err error producer, err = nsq.NewProducer(addr, nsq.NewConfig()) if err != nil { panic(err) } fmt.Println("connect to ", producer.String()) } func Publish(topic, msg string) error { if producer == nil {// check producer return fmt.Errorf("producer is nil") } if msg == "" {// void empty msg return nil } return producer.Publish(topic, []byte(msg))// publish msg } func main() { InitProducer(config.Nsqd01) running := true reader := bufio.NewReader(os.Stdin) for running { data, _, _ := reader.ReadLine() command := string(data) if command == "stop" { running = false } for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) { config.ExchangeNsqdIPs() InitProducer(config.Nsqd01) } } producer.Stop() }// producer直連nsqd后,接收來自控制臺的輸入,然后將消息發送給nsqd
package main import ( "fmt" "github.com/bitly/go-nsq" "nsq-demo/src/config" "time" ) type MyConsumer struct{} func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body)) return nil } func InitConsumer(topic, channel, addr string) { conf := nsq.NewConfig() conf.LookupdPollInterval = time.Second c, err := nsq.NewConsumer(topic, channel, conf) if err != nil { panic(err) } c.SetLogger(nil, 0)// set system log c.AddHandler(&MyConsumer{})// set Hander to handle msg //if err := c.ConnectToNSQLookupd(addr); err != nil { // panic(err) //} //if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil { // panic(err) //} if err := c.ConnectToNSQD(config.Nsqd01); err != nil { panic(err) } } func main() { InitConsumer(config.Topic, config.Channel, config.Lookupd) select {} }// consumer直連nsqd后,通過自定義的Handler來處理消息
附錄
version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160" # for the nsqd - "4161" # for the nsqadmin nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd depends_on: - nsqlookupd ports: - "4150" # for clients - "4151" # for the HTTP API nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd depends_on: - nsqlookupd ports: - "4171"# docker-compose.yaml of simple nsq
到此,相信大家對“怎么使用nsq消息中間件”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。