您好,登錄后才能下訂單哦!
olang的”database/sql”是操作數據庫時常用的包,這個包定義了一些sql操作的接口,具體的實現還需要不同數據庫的實現,mysql比較優秀的一個驅動是:github.com/go-sql-driver/mysql
,在接口、驅動的設計上”database/sql”的實現非常優秀,對于類似設計有很多值得我們借鑒的地方,比如beego框架cache的實現模式就是借鑒了這個包的實現;”database/sql”除了定義接口外還有一個重要的功能:連接池,我們在實現其他網絡通信時也可以借鑒其實現。
連接池的作用這里就不再多說了,我們先從一個簡單的示例看下”database/sql”怎么用:
package main import( "fmt" "database/sql" _ "github.com/go-sql-driver/mysql" ) func main(){ db, err := sql.Open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1") if err != nil { fmt.Println(err) return } defer db.Close() rows,err := db.Query("select * from test") for rows.Next(){ //row.Scan(...) } rows.Close() }
用法很簡單,首先Open打開一個數據庫,然后調用Query、Exec執行數據庫操作,github.com/go-sql-driver/mysql
具體實現了database/sql/driver
的接口,所以最終具體的數據庫操作都是調用github.com/go-sql-driver/mysql
實現的方法,同一個數據庫只需要調用一次Open即可,下面根據具體的操作分析下”database/sql”都干了哪些事。
import _ "github.com/go-sql-driver/mysql"
前面的”_”作用時不需要把該包都導進來,只執行包的init()
方法,mysql驅動正是通過這種方式注冊到”database/sql”中的:
//github.com/go-sql-driver/mysql/driver.go func init() { sql.Register("mysql", &MySQLDriver{}) } type MySQLDriver struct{} func (d MySQLDriver) Open(dsn string) (driver.Conn, error) { ... }
init()
通過Register()
方法將mysql驅動添加到sql.drivers
(類型:make(map[string]driver.Driver))中,MySQLDriver實現了driver.Driver
接口:
//database/sql/sql.go func Register(name string, driver driver.Driver) { driversMu.Lock() defer driversMu.Unlock() if driver == nil { panic("sql: Register driver is nil") } if _, dup := drivers[name]; dup { panic("sql: Register called twice for driver " + name) } drivers[name] = driver } //database/sql/driver/driver.go type Driver interface { // Open returns a new connection to the database. // The name is a string in a driver-specific format. // // Open may return a cached connection (one previously // closed), but doing so is unnecessary; the sql package // maintains a pool of idle connections for efficient re-use. // // The returned connection is only used by one goroutine at a // time. Open(name string) (Conn, error) }
假如我們同時用到多種數據庫,就可以通過調用sql.Register
將不同數據庫的實現注冊到sql.drivers
中去,用的時候再根據注冊的name將對應的driver取出。
先看下連接池整體處理流程:
db, err := sql.Open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")
sql.Open()
是取出對應的db,這時mysql還沒有建立連接,只是初始化了一個sql.DB
結構,這是非常重要的一個結構,所有相關的數據都保存在此結構中;Open同時啟動了一個connectionOpener
協程,后面再具體分析其作用。
type DB struct { driver driver.Driver //數據庫實現驅動 dsn string //數據庫連接、配置參數信息,比如username、host、password等 numClosed uint64 mu sync.Mutex //鎖,操作DB各成員時用到 freeConn []*driverConn //空閑連接 connRequests []chan connRequest //阻塞請求隊列,等連接數達到最大限制時,后續請求將插入此隊列等待可用連接 numOpen int //已建立連接或等待建立連接數 openerCh chan struct{} //用于connectionOpener closed bool dep map[finalCloser]depSet lastPut map[*driverConn]string // stacktrace of last conn's put; debug only maxIdle int //最大空閑連接數 maxOpen int //數據庫最大連接數 maxLifetime time.Duration //連接最長存活期,超過這個時間連接將不再被復用 cleanerCh chan struct{} }
maxIdle
(默認值2)、maxOpen
(默認值0,無限制)、maxLifetime(默認值0,永不過期)
可以分別通過SetMaxIdleConns
、SetMaxOpenConns
、SetConnMaxLifetime
設定。
上面說了Open
時是沒有建立數據庫連接的,只有等用的時候才會實際建立連接,獲取可用連接的操作有兩種策略:cachedOrNewConn(有可用空閑連接則優先使用,沒有則創建)、alwaysNewConn(不管有沒有空閑連接都重新創建),下面以一個query的例子看下具體的操作:
rows, err := db.Query("select * from test")
database/sql/sql.go
:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error //maxBadConnRetries = 2 for i := 0; i < maxBadConnRetries; i++ { rows, err = db.query(query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.query(query, args, alwaysNewConn) } return rows, err } func (db *DB) query(query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) { ci, err := db.conn(strategy) if err != nil { return nil, err } //到這已經獲取到了可用連接,下面進行具體的數據庫操作 return db.queryConn(ci, ci.releaseConn, query, args) }
數據庫連接由db.query()
獲取:
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } lifetime := db.maxLifetime //從freeConn取一個空閑連接 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } return conn, nil } //如果沒有空閑連接,而且當前建立的連接數已經達到最大限制則將請求加入connRequests隊列, //并阻塞在這里,直到其它協程將占用的連接釋放或connectionOpenner創建 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It's buffered so that the // connectionOpener doesn't block while waiting for the req to be read. req := make(chan connRequest, 1) db.connRequests = append(db.connRequests, req) db.mu.Unlock() ret, ok := <-req //阻塞 if !ok { return nil, errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { //連接過期了 ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } db.numOpen++ //上面說了numOpen是已經建立或即將建立連接數,這里還沒有建立連接,只是樂觀的認為后面會成功,失敗的時候再將此值減1 db.mu.Unlock() ci, err := db.driver.Open(db.dsn) //調用driver的Open方法建立連接 if err != nil { //創建連接失敗 db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() //通知connectionOpener協程嘗試重新建立連接,否則在db.connRequests中等待的請求將一直阻塞,知道下次有連接建立 db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, } db.addDepLocked(dc, dc) dc.inUse = true db.mu.Unlock() return dc, nil }
總結一下上面獲取連接的過程:
* step1:首先檢查下freeConn里是否有空閑連接,如果有且未超時則直接復用,返回連接,如果沒有或連接已經過期則進入下一步;
* step2:檢查當前已經建立及準備建立的連接數是否已經達到最大值,如果達到最大值也就意味著無法再創建新的連接了,當前請求需要在這等著連接釋放,這時當前協程將創建一個channel:chan connRequest
,并將其插入db.connRequests
隊列,然后阻塞在接收chan connRequest
上,等到有連接可用時這里將拿到釋放的連接,檢查可用后返回;如果還未達到最大值則進入下一步;
* step3:創建一個連接,首先將numOpen加1,然后再創建連接,如果等到創建完連接再把numOpen加1會導致多個協程同時創建連接時一部分會浪費,所以提前將numOpen占住,創建失敗再將其減掉;如果創建連接成功則返回連接,失敗則進入下一步
* step4:創建連接失敗時有一個善后操作,當然并不僅僅是將最初占用的numOpen數減掉,更重要的一個操作是通知connectionOpener協程根據db.connRequests
等待的長度創建連接,這個操作的原因是:
numOpen在連接成功創建前就加了1,這時候如果numOpen已經達到最大值再有獲取conn的請求將阻塞在step2,這些請求會等著先前進來的請求釋放連接,假設先前進來的這些請求創建連接全部失敗,那么如果它們直接返回了那些等待的請求將一直阻塞在那,因為不可能有連接釋放(極限值,如果部分創建成功則會有部分釋放),直到新請求進來重新成功創建連接,顯然這樣是有問題的,所以maybeOpenNewConnections
將通知connectionOpener根據db.connRequests
長度及可創建的最大連接數重新創建連接,然后將新創建的連接發給阻塞的請求。
注意:如果maxOpen=0
將不會有請求阻塞等待連接,所有請求只要從freeConn中取不到連接就會新創建。
另外Query
、Exec
有個重試機制,首先優先使用空閑連接,如果2次取到的連接都無效則嘗試新創建連接。
獲取到可用連接后將調用具體數據庫的driver處理sql。
數據庫連接在被使用完成后需要歸還給連接池以供其它請求復用,釋放連接的操作是:putConn()
:
func (db *DB) putConn(dc *driverConn, err error) { ... //如果連接已經無效,則不再放入連接池 if err == driver.ErrBadConn { db.maybeOpenNewConnections() dc.Close() //這里最終將numOpen數減掉 return } ... //正常歸還 added := db.putConnDBLocked(dc, nil) ... } func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } //有等待連接的請求則將連接發給它們,否則放入freeConn if c := len(db.connRequests); c > 0 { req := db.connRequests[0] // This copy is O(n) but in practice faster than a linked list. // TODO: consider compacting it down less often and // moving the base instead? copy(db.connRequests, db.connRequests[1:]) db.connRequests = db.connRequests[:c-1] if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } return false }
釋放的過程:
* step1:首先檢查下當前歸還的連接在使用過程中是否發現已經無效,如果無效則不再放入連接池,然后檢查下等待連接的請求數新建連接,類似獲取連接時的異常處理,如果連接有效則進入下一步;
* step2:檢查下當前是否有等待連接阻塞的請求,有的話將當前連接發給最早的那個請求,沒有的話則再判斷空閑連接數是否達到上限,沒有則放入freeConn空閑連接池,達到上限則將連接關閉釋放。
* step3:(只執行一次)啟動connectionCleaner協程定時檢查feeConn中是否有過期連接,有則剔除。
有個地方需要注意的是,Query
、Exec
操作用法有些差異:
a.Exec
(update、insert、delete等無結果集返回的操作)調用完后會自動釋放連接;
b.Query
(返回sql.Rows)則不會釋放連接,調用完后仍然占有連接,它將連接的所屬權轉移給了sql.Rows
,所以需要手動調用close歸還連接,即使不用Rows也得調用rows.Close(),否則可能導致后續使用出錯,如下的用法是錯誤的:
//錯誤 db.SetMaxOpenConns(1) db.Query("select * from test") row,err := db.Query("select * from test") //此操作將一直阻塞 //正確 db.SetMaxOpenConns(1) r,_ := db.Query("select * from test") r.Close() //將連接的所屬權歸還,釋放連接 row,err := db.Query("select * from test") //other op row.Close()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。