您好,登錄后才能下訂單哦!
這篇文章主要介紹“GO怎么實現Redis的AOF持久化”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“GO怎么實現Redis的AOF持久化”文章能幫助大家解決問題。
GO實現Redis的AOF持久化
將用戶發來的指令以RESP協議的形式存儲在本地的AOF文件,重啟Redis后執行此文件恢復數據
本文涉及以下文件:redis.conf:配置文件
aof:實現aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int } func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } } func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer file.Close() ch := parser.ParseStream(file) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } }
AofHandler:1.從管道中接收數據 2.寫入AOF文件
AddAof:用戶的指令包裝成payload放入管道
handleAof:將管道中的payload寫入磁盤
LoadAof:重啟Redis后加載aof文件
database/database.go
type Database struct { dbSet []*DB aofHandler *aof.AofHandler } func NewDatabase() *Database { mdb := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } mdb.dbSet = make([]*DB, config.Properties.Databases) for i := range mdb.dbSet { singleDB := makeDB() singleDB.index = i mdb.dbSet[i] = singleDB } if config.Properties.AppendOnly { aofHandler, err := aof.NewAOFHandler(mdb) if err != nil { panic(err) } mdb.aofHandler = aofHandler for _, db := range mdb.dbSet { singleDB := db singleDB.addAof = func(line CmdLine) { mdb.aofHandler.AddAof(singleDB.index, line) } } } return mdb }
將AOF加入到database里
使用singleDB的原因:因為在循環中獲取返回變量的地址都完全相同,因此當我們想要訪問數組中元素所在的地址時,不應該直接獲取 range 返回的變量地址 db,而應該使用 singleDB := db
database/db.go
type DB struct { index int data dict.Dict addAof func(CmdLine) } func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), addAof: func(line CmdLine) {}, } return db }
由于分數據庫db引用不到aof,所以添加一個addAof匿名函數,在NewDatabase中用這個匿名函數調用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { ...... if deleted > 0 { db.addAof(utils.ToCmdLine2("del", args...)) } return reply.MakeIntReply(int64(deleted)) } func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() db.addAof(utils.ToCmdLine2("flushdb", args...)) return &reply.OkReply{} } func execRename(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("rename", args...)) return &reply.OkReply{} } func execRenameNx(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("renamenx", args...)) return reply.MakeIntReply(1) }
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("set", args...)) return &reply.OkReply{} } func execSetNX(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("setnx", args...)) return reply.MakeIntReply(int64(result)) } func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, exists := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{Data: value}) db.addAof(utils.ToCmdLine2("getset", args...)) ...... }
添加addAof方法
測試命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
關于“GO怎么實現Redis的AOF持久化”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。