您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何接入異步任務及使用log”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何接入異步任務及使用log”吧!
日常任務開放中,我們會有很多異步、批量、定時、延遲任務要處理,go-zero中有 go-queue
,推薦使用 go-queue
去處理,go-queue
本身也是基于 go-zero
開發的,其本身是有兩種模式:
dq
: 依賴于 beanstalkd
,分布式,可存儲,延遲、定時設置,關機重啟可以重新執行,消息會丟失,使用非常簡單,go-queue中使用了redis setnx保證了每個消息只被消費一次,使用場景主要是用來做日常任務使用
kq
:依賴于 kafka
,這個就不多介紹啦,大名鼎鼎的 kafka
,使用場景主要是做日志用
我們主要說一下dq,kq使用也一樣的,只是依賴底層不同,如果沒使用過beanstalkd,沒接觸過beanstalkd的可以先google一下,使用起來還是挺容易的。
我在jobs下使用goctl新建了一個message-job.api服務
info( title: //消息任務 desc: // 消息任務 author: "Mikael" email: "13247629622@163.com" ) type BatchSendMessageReq {} type BatchSendMessageResp {} service message-job-api { @handler batchSendMessageHandler // 批量發送短信 post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp) }
因為不需要使用路由,所以handler下的routes.go被我刪除了,在handler下新建了一個jobRun.go,內容如下:
package handler import ( "fishtwo/lib/xgo" "fishtwo/app/jobs/message/internal/svc" ) /** * @Description 啟動job * @Author Mikael * @Date 2021/1/18 12:05 * @Version 1.0 **/ func JobRun(serverCtx *svc.ServiceContext) { xgo.Go(func() { batchSendMessageHandler(serverCtx) //...many job }) }
其實xgo.Go就是 go batchSendMessageHandler(serverCtx)
,封裝了一下go攜程,防止野生goroutine panic
然后修改一下啟動文件message-job.go
package main import ( "flag" "fmt" "fishtwo/app/jobs/message/internal/config" "fishtwo/app/jobs/message/internal/handler" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/rest" ) var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file") func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile, &c) ctx := svc.NewServiceContext(c) server := rest.MustNewServer(c.RestConf) defer server.Stop() handler.JobRun(ctx) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() }
主要是handler.RegisterHandlers(server, ctx) 修改為handler.JobRun(ctx)
接下來,我們就可以引入dq了,首先在etc/xxx.yaml下添加dqConf
..... DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: node
我這里本地用不同端口,模擬開了2個節點,7771、7772
在internal/config/config.go添加配置解析對象
type Config struct { .... DqConf dq.DqConf }
修改handler/batchsendmessagehandler.go
package handler import ( "context" "fishtwo/app/jobs/message/internal/logic" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/logx" ) func batchSendMessageHandler(ctx *svc.ServiceContext){ rootCxt:= context.Background() l := logic.NewBatchSendMessageLogic(context.Background(), ctx) err := l.BatchSendMessage() if err != nil{ logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err) } }
修改logic下batchsendmessagelogic.go,寫我們的consumer消費邏輯
package logic import ( "context" "fishtwo/app/jobs/message/internal/svc" "fmt" "github.com/tal-tech/go-zero/core/logx" ) type BatchSendMessageLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic { return BatchSendMessageLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *BatchSendMessageLogic) BatchSendMessage() error { fmt.Println("job BatchSendMessage start") l.svcCtx.Consumer.Consume(func(body []byte) { fmt.Printf("job BatchSendMessage %s \n" + string(body)) }) fmt.Printf("job BatchSendMessage finish \n") return nil }
這樣就大功告成了,啟動message-job.go就ok課
go run message-job.go
之后我們就可以在業務代碼中向dq添加任務,它就可以自動消費了
producer.Delay 向dq中投遞5個延遲任務:
producer := dq.NewProducer([]dq.Beanstalk{ { Endpoint: "localhost:7771", Tube: "tube1", }, { Endpoint: "localhost:7772", Tube: "tube2", }, }) for i := 1000; i < 1005; i++ { _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1) if err != nil { fmt.Println(err) } }
producer.At
可以指定某個時間執行,非常好用,感興趣的朋友自己可以研究下。
在前面說到gateway改造時候,如果眼神好的童鞋,在上面的httpresult.go中已經看到了log的身影:
我們在來看下rpc中怎么處理的
是的,我在每個rpc啟動的main中加入了grpc攔截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那讓我們看看grpc攔截器里面做了什么
然后我代碼里面使用github/pkg/errors這個包去處理錯誤的,這個包還是很好用的
所以呢:
我們在 grpc
中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)
;
在api
中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)
go-zero
中打印日志,使用logx.WithContext
會把trace-id帶入,這樣一個請求下來,比如
user-api --> user-srv --> message-srv
那如果 messsage-srv
出錯,他們三個是同一個 trace-id
,是不是就可以在elk通過輸入這個trace-id一次性搜索出來這條請求報錯堆棧信息呢?當然你也可以接入 jaeger、zipkin、skywalking
等,這個我暫時還沒接入。
感謝各位的閱讀,以上就是“如何接入異步任務及使用log”的內容了,經過本文的學習后,相信大家對如何接入異步任務及使用log這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。