您好,登錄后才能下訂單哦!
這篇文章主要介紹“Mongodb代理程序如何實現”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Mongodb代理程序如何實現”文章能幫助大家解決問題。
根據一貫的風格,我們先來梳理下項目目錄結構,結構如下:
|__ bin/ # 用于存放編譯后生成的二進制文件
|__ config/ # 用于存放配置文件
|__ connection/ # 存放連接相關的文件
| |__ proxy.go # 代理組件
| |__ pool.go # 連接池組件
| |__ repl_set.go # 復制集組件
| |__ conn.go # 連接對象組件
|__ internal/ # 存放 mongo 內部協議相關文件
| |__ auth.go # 握手鑒權組件
| |__ protocol.go # 協議解析組件
| |__ request.go # 請求重寫組件
| |__ response.go # 響應重寫組件
|__ statistics/ # 存放指標統計上報組件
|__ test/ # 存放各種語言驅動測試代碼的文件夾
|__ utils/ # 工具函數文件夾
|__ glide.yaml # 依賴包配置文件
|__ main.go # 入口文件
proxy 實現
最簡單的 proxy 實現套路就像下面這樣:
// main.go
func main() {
// 傳入配置參數,實例化一個代理對象
p := NewProxy(conf)
// 卡住,循環接受客戶端請求
p.LoopAccept()
}
接著來實現 NewProxy、LoopAccept 方法:
// connection/proxy.go
type Proxy struct {
sync.RWMutex
listener net.Listener
writePool, readPool *pool
}
func NewProxy(conf config.UserConf) *Proxy {
// 開始監聽本地端口
listener, err := net.Listen("tcp", ":"+conf.GetString("port"))
if err != nil {
log.Fatalln(err)
}
p := &Proxy{
listener: listener,
}
// 實例化連接池
p.readPool, p.writePool, err = newPool(p)
if err != nil {
panic(err)
}
return p
}
func (p *Proxy) LoopAccept() {
for {
client, err := p.listener.Accept()
go func(c net.Conn) {
defer c.Close()
// 一個連接在多次 messageHandler 中共用一個 Reader 對象
cr := bufio.NewReader(c)
// 因為一個連接可能會進行多次讀或寫操作
for {
// 將客戶端請求代理給服務端,服務端響應代理回客戶端
// 同時中間對請求或響應進行重寫操作
err := p.messageHandler(cr, c)
if err != nil {
// 只要出現錯誤,就執行到上面的 defer c.Close() 來關閉連接
return
}
}
}(client)
}
}
接著來實現核心邏輯 messageHandler:
// connection/proxy.go
func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {
// 對請求報文進行解析操作
req, err := internal.Decode(clientReader)
if err != nil {
return errors.New("decode error")
}
// 將客戶端請求發送給數據庫服務器
res, err := p.clientToServer(req)
if err != nil {
return errors.New("request error")
}
// 將數據庫服務器響應返回給客戶端
return res.WriteTo(c)
}
func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {
var server net.Conn
// 如果是讀操作,就從讀池中取出連接
if req.IsReadOp() {
host := req.GetHost()
// 某些讀操作需要發送到指定的讀庫上,所以需要傳 host,來獲取指定讀庫連接
server = p.readPool.Acquire(host)
// 反之,寫操作從寫池中取出連接
} else {
// 由于寫庫只有一個,所以不用傳 host 參數了
server = p.writePool.Acquire()
}
// 將客戶端請求發送給數據庫服務器
err := req.WriteTo(server)
if err != nil {
return nil, err
}
// 獲取解析數據庫服務器響應
res, err := internal.Decode(bufio.NewReader(server))
return res, err
}
大致邏輯就是,客戶端通過代理把請求發給服務端,服務端響應也通過代理響應回客戶端。
------------ request ----------- request ------------
| | --------> | | --------> | |
| client | | proxy | | repl_set |
| | <-------- | | <-------- | |
------------ response ----------- response ------------
吶——,當然還有非常多的細節,由于篇幅原因不得不省略...
pool 實現
由 proxy 的代碼邏輯來看,我們取讀或寫庫連接是通過讀或寫池的 Acquire 方法來取的:
// connection/pool.go
type pool struct {
sync.RWMutex
connCh chan net.Conn
newConn func(string) (net.Conn, error)
freeConn func(net.Conn) error
}
func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {
host := ""
if len(opts) > 0 {
host, _ = (opts[0]).(string)
}
chLen := len(p.connCh)
// 從 channel 中遍歷剩余數量的 conn
for i := 0; i < chLen; i++ {
select {
case conn, ok := <- ch:
if ok {
if len(host) > 0 {
if conn.RemoteAddr().String() == host {
return conn, nil
}
// 沒有找到對應 host 的 conn,則把 conn 重新放回 channel
// 你可以簡單理解為只是執行了 p.connCh <- conn 操作
p.freeConn(conn)
} else {
return conn, nil
}
}
// 避免數量不足而導致 channel 阻塞等待
default:
}
}
// 若還沒有從 channel 中取到 conn,則立馬 new 一個
conn, err := p.newConn(host)
if err != nil {
return nil, err
}
return conn, nil
}
池的實現大致就是實現了一個循環隊列,連接從池中取,取出的連接在使用完后,可以放回池中。
關于“Mongodb代理程序如何實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。