您好,登錄后才能下訂單哦!
這篇文章主要介紹“Golang如何實現 pipeline 模式的 redis 客戶端”,在日常操作中,相信很多人在Golang如何實現 pipeline 模式的 redis 客戶端問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Golang如何實現 pipeline 模式的 redis 客戶端”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
本文的完整代碼在github.com/hdt3213/godis/redis/client
通常 TCP 客戶端的通信模式都是阻塞式的: 客戶端發送請求 -> 等待服務端響應 -> 發送下一個請求。因為需要等待網絡傳輸數據,完成一次請求循環需要等待較多時間。
我們能否不等待服務端響應直接發送下一條請求呢?答案是肯定的。
TCP 作為全雙工協議可以同時進行上行和下行通信,不必擔心客戶端和服務端同時發包會導致沖突。
p.s. 打電話的時候兩個人同時講話就會沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺發送到收音機不能反向傳輸,這種方式稱為單工。
我們為每一個 tcp 連接分配了一個 goroutine 可以保證先收到的請求先先回復。另一個方面,tcp 協議會保證數據流的有序性,同一個 tcp 連接上先發送的請求服務端先接收,先回復的響應客戶端先收到。因此我們不必擔心混淆響應所對應的請求。
這種在服務端未響應時客戶端繼續向服務端發送請求的模式稱為 Pipeline 模式。因為減少等待網絡傳輸的時間,Pipeline 模式可以極大的提高吞吐量,減少所需使用的 tcp 鏈接數。
pipeline 模式的 redis 客戶端需要有兩個后臺協程程負責 tcp 通信,調用方通過 channel 向后臺協程發送指令,并阻塞等待直到收到響應,這是一個典型的異步編程模式。
我們先來定義 client 的結構:
type Client struct { conn net.Conn // 與服務端的 tcp 連接 pendingReqs chan *Request // 等待發送的請求 waitingReqs chan *Request // 等待服務器響應的請求 ticker *time.Ticker // 用于觸發心跳包的計時器 addr string ctx context.Context cancelFunc context.CancelFunc writing *sync.WaitGroup // 有請求正在處理不能立即停止,用于實現 graceful shutdown } type Request struct { id uint64 // 請求id args [][]byte // 上行參數 reply redis.Reply // 收到的返回值 heartbeat bool // 標記是否是心跳請求 waiting *wait.Wait // 調用協程發送請求后通過 waitgroup 等待請求異步處理完成 err error }
調用者將請求發送給后臺協程,并通過 wait group 等待異步處理完成:
func (client *Client) Send(args [][]byte) redis.Reply { request := &request{ args: args, heartbeat: false, waiting: &wait.Wait{}, } request.waiting.Add(1) client.working.Add(1) defer client.working.Done() client.pendingReqs <- request // 請求入隊 timeout := request.waiting.WaitWithTimeout(maxWait) // 等待響應或者超時 if timeout { return reply.MakeErrReply("server time out") } if request.err != nil { return reply.MakeErrReply("request failed") } return request.reply }
client 的核心部分是后臺的讀寫協程。先從寫協程開始:
// 寫協程入口 func (client *Client) handleWrite() { for req := range client.pendingReqs { client.doRequest(req) } } // 發送請求 func (client *Client) doRequest(req *request) { if req == nil || len(req.args) == 0 { return } // 序列化請求 re := reply.MakeMultiBulkReply(req.args) bytes := re.ToBytes() _, err := client.conn.Write(bytes) i := 0 // 失敗重試 for err != nil && i < 3 { err = client.handleConnectionError(err) if err == nil { _, err = client.conn.Write(bytes) } i++ } if err == nil { // 發送成功等待服務器響應 client.waitingReqs <- req } else { req.err = err req.waiting.Done() } }
讀協程是我們熟悉的協議解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。
// 收到服務端的響應 func (client *Client) finishRequest(reply redis.Reply) { defer func() { if err := recover(); err != nil { debug.PrintStack() logger.Error(err) } }() request := <-client.waitingReqs if request == nil { return } request.reply = reply if request.waiting != nil { request.waiting.Done() } } // 讀協程是個 RESP 協議解析器 func (client *Client) handleRead() error { ch := parser.ParseStream(client.conn) for payload := range ch { if payload.Err != nil { client.finishRequest(reply.MakeErrReply(payload.Err.Error())) continue } client.finishRequest(payload.Data) } return nil }
最后編寫 client 的構造器和啟動異步協程的代碼:
func MakeClient(addr string) (*Client, error) { conn, err := net.Dial("tcp", addr) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &Client{ addr: addr, conn: conn, sendingReqs: make(chan *Request, chanSize), waitingReqs: make(chan *Request, chanSize), ctx: ctx, cancelFunc: cancel, writing: &sync.WaitGroup{}, }, nil } func (client *Client) Start() { client.ticker = time.NewTicker(10 * time.Second) go client.handleWrite() go func() { err := client.handleRead() logger.Warn(err) }() go client.heartbeat() }
關閉 client 的時候記得等待請求完成:
func (client *Client) Close() { // 先阻止新請求進入隊列 close(client.sendingReqs) // 等待處理中的請求完成 client.writing.Wait() // 釋放資源 _ = client.conn.Close() // 關閉與服務端的連接,連接關閉后讀協程會退出 client.cancelFunc() // 使用 context 關閉讀協程 close(client.waitingReqs) // 關閉隊列 }
測試一下:
func TestClient(t *testing.T) { client, err := MakeClient("localhost:6379") if err != nil { t.Error(err) } client.Start() result = client.Send([][]byte{ []byte("SET"), []byte("a"), []byte("a"), }) if statusRet, ok := result.(*reply.StatusReply); ok { if statusRet.Status != "OK" { t.Error("`set` failed, result: " + statusRet.Status) } } result = client.Send([][]byte{ []byte("GET"), []byte("a"), }) if bulkRet, ok := result.(*reply.BulkReply); ok { if string(bulkRet.Arg) != "a" { t.Error("`get` failed, result: " + string(bulkRet.Arg)) } } }
Keep working, we will find a way out.This is Finley, welcome to join us.
到此,關于“Golang如何實現 pipeline 模式的 redis 客戶端”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。