您好,登錄后才能下訂單哦!
本篇內容主要講解“如何深入理解Redis事務”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何深入理解Redis事務”吧!
Redis可以看成NoSQL類型的數據庫系統, Redis也提供了事務, 但是和傳統的關系型數據庫的事務既有相似性, 也存在區別.因為Redis的架構基于操作系統的多路復用的IO接口,主處理流程是一個單線程,因此對于一個完整的命令, 其處理都是原子性的, 但是如果需要將多個命令作為一個不可分割的處理序列, 就需要使用事務.
Redis事務有如下一些特點:
事務中的命令序列執行的時候是原子性的,也就是說,其不會被其他客戶端的命令中斷. 這和傳統的數據庫的事務的屬性是類似的.
盡管Redis事務中的命令序列是原子執行的, 但是事務中的命令序列執行可以部分成功,這種情況下,Redis事務不會執行回滾操作. 這和傳統關系型數據庫的事務是有區別的.
盡管Redis有RDB和AOF兩種數據持久化機制, 但是其設計目標是高效率的cache系統. Redis事務只保證將其命令序列中的操作結果提交到內存中,不保證持久化到磁盤文件. 更進一步的, Redis事務和RDB持久化機制沒有任何關系, 因為RDB機制是對內存數據結構的全量的快照.由于AOF機制是一種增量持久化,所以事務中的命令序列會提交到AOF的緩存中.但是AOF機制將其緩存寫入磁盤文件是由其配置的實現策略決定的,和Redis事務沒有關系.
Redis事務API
從宏觀上來講, Redis事務開始后, 會緩存后續的操作命令及其操作數據,當事務提交時,原子性的執行緩存的命令序列.
從版本2.2開始,Redis提供了一種樂觀的鎖機制, 配合這種機制,Redis事務提交時, 變成了事務的條件執行. 具體的說,如果樂觀鎖失敗了,事務提交時, 丟棄事務中的命令序列,如果樂觀鎖成功了, 事務提交時,才會執行其命令序列.當然,也可以不使用樂觀鎖機制, 在事務提交時, 無條件執行事務的命令序列.
Redis事務涉及到MULTI, EXEC, DISCARD, WATCH和UNWATCH這五個命令:
事務開始的命令是MULTI, 該命令返回OK提示信息. Redis不支持事務嵌套,執行多次MULTI命令和執行一次是相同的效果.嵌套執行MULTI命令時,Redis只是返回錯誤提示信息.
EXEC是事務的提交命令,事務中的命令序列將被執行(或者不被執行,比如樂觀鎖失敗等).該命令將返回響應數組,其內容對應事務中的命令執行結果.
WATCH命令是開始執行樂觀鎖,該命令的參數是key(可以有多個), Redis將執行WATCH命令的客戶端對象和key進行關聯,如果其他客戶端修改了這些key,則執行WATCH命令的客戶端將被設置樂觀鎖失敗的標志.該命令必須在事務開始前執行,即在執行MULTI命令前執行WATCH命令,否則執行無效,并返回錯誤提示信息.
UNWATCH命令將取消當前客戶端對象的樂觀鎖key,該客戶端對象的事務提交將變成無條件執行.
DISCARD命令將結束事務,并且會丟棄全部的命令序列.
需要注意的是,EXEC命令和DISCARD命令結束事務時,會調用UNWATCH命令,取消該客戶端對象上所有的樂觀鎖key.
無條件提交
如果不使用樂觀鎖, 則事務為無條件提交.下面是一個事務執行的例子:
multi +OK incr key1 +QUEUED set key2 val2 +QUEUED exec *2 :1 +OK
當客戶端開始事務后, 后續發送的命令將被Redis緩存起來,Redis向客戶端返回響應提示字符串QUEUED.當執行EXEC提交事務時,緩存的命令依次被執行,返回命令序列的執行結果.
事務的錯誤處理
事務提交命令EXEC有可能會失敗, 有三種類型的失敗場景:
在事務提交之前,客戶端執行的命令緩存失敗.比如命令的語法錯誤(命令參數個數錯誤, 不支持的命令等等).如果發生這種類型的錯誤,Redis將向客戶端返回包含錯誤提示信息的響應.
事務提交時,之前緩存的命令有可能執行失敗.
由于樂觀鎖失敗,事務提交時,將丟棄之前緩存的所有命令序列.
當發生第一種失敗的情況下,客戶端在執行事務提交命令EXEC時,將丟棄事務中所有的命令序列.下面是一個例子:
multi +OK incr num1 num2 -ERR wrong number of arguments for 'incr' command set key1 val1 +QUEUED exec -EXECABORT Transaction discarded because of previous errors.
命令incr num1 num2并沒有緩存成功, 因為incr命令只允許有一個參數,是個語法錯誤的命令.Redis無法成功緩存該命令,向客戶端發送錯誤提示響應.接下來的set key1 val1命令緩存成功.最后執行事務提交的時候,因為發生過命令緩存失敗,所以事務中的所有命令序列被丟棄.
如果事務中的所有命令序列都緩存成功,在提交事務的時候,緩存的命令中仍可能執行失敗.但Redis不會對事務做任何回滾補救操作.下面是一個這樣的例子:
multi +OK set key1 val1 +QUEUED lpop key1 +QUEUED incr num1 +QUEUED exec *3 +OK -WRONGTYPE Operation against a key holding the wrong kind of value :1
所有的命令序列都緩存成功,但是在提交事務的時候,命令set key1 val1和incr num1執行成功了,Redis保存了其執行結果,但是命令lpop key1執行失敗了.
樂觀鎖機制
Redis事務和樂觀鎖一起使用時,事務將成為有條件提交.
關于樂觀鎖,需要注意的是:
WATCH命令必須在MULTI命令之前執行. WATCH命令可以執行多次.
WATCH命令可以指定樂觀鎖的多個key,如果在事務過程中,任何一個key被其他客戶端改變,則當前客戶端的樂觀鎖失敗,事務提交時,將丟棄所有命令序列.
多個客戶端的WATCH命令可以指定相同的key.
WATCH命令指定樂觀鎖后,可以接著執行MULTI命令進入事務上下文,也可以在WATCH命令和MULTI命令之間執行其他命令. 具體使用方式取決于場景需求,不在事務中的命令將立即被執行.
如果WATCH命令指定的樂觀鎖的key,被當前客戶端改變,在事務提交時,樂觀鎖不會失敗.
如果WATCH命令指定的樂觀鎖的key具有超時屬性,并且該key在WATCH命令執行后, 在事務提交命令EXEC執行前超時, 則樂觀鎖不會失敗.如果該key被其他客戶端對象修改,則樂觀鎖失敗.
一個執行樂觀鎖機制的事務例子:
rpush list v1 v2 v3 :3 watch list +OK multi +OK lpop list +QUEUED exec *1 $2 v1
下面是另一個例子,樂觀鎖被當前客戶端改變, 事務提交成功:
watch num +OK multi +OK incr num +QUEUED exec *1 :2
Redis事務和樂觀鎖配合使用時, 可以構造實現單個Redis命令不能完成的更復雜的邏輯.
Redis事務的源碼實現機制
首先,事務開始的MULTI命令執行的函數為multiCommand, 其實現為(multi.c):
void multiCommand(redisClient *c) { if (c->flags & REDIS_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= REDIS_MULTI; addReply(c,shared.ok); }
該命令只是在當前客戶端對象上加上REDIS_MULTI標志, 表示該客戶端進入了事務上下文.
客戶端進入事務上下文后,后續執行的命令將被緩存. 函數processCommand是Redis處理客戶端命令的入口函數, 其實現為(redis.c):
int processCommand(redisClient *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= REDIS_CLOSE_AFTER_REPLY; return REDIS_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->ccmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return REDIS_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return REDIS_OK; } /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return REDIS_OK; } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ if (server.maxmemory) { int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return REDIS_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return REDIS_OK; } } /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == REDIS_ERR) || server.aof_last_write_status == REDIS_ERR) && server.masterhost == NULL && (c->cmd->flags & REDIS_CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == REDIS_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return REDIS_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & REDIS_CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return REDIS_OK; } /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !(c->flags & REDIS_MASTER) && c->cmd->flags & REDIS_CMD_WRITE) { addReply(c, shared.roslaveerr); return REDIS_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if (c->flags & REDIS_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); return REDIS_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & REDIS_CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return REDIS_OK; } /* Loading DB? Return an error if the command has not the * REDIS_CMD_LOADING flag. */ if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) { addReply(c, shared.loadingerr); return REDIS_OK; } /* Lua script too slow? Only allow a limited number of commands. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return REDIS_OK; } /* Exec the command */ if (c->flags & REDIS_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return REDIS_OK; }
Line145:151當客戶端處于事務上下文時, 如果接收的是非事務命令(MULTI, EXEC, WATCH, DISCARD), 則調用queueMultiCommand將命令緩存起來,然后向客戶端發送成功響應.
在函數processCommand中, 在緩存命令之前, 如果檢查到客戶端發送的命令不存在,或者命令參數個數不正確等情況, 會調用函數flagTransaction標命令緩存失敗.也就是說,函數processCommand中, 所有調用函數flagTransaction的條件分支,都是返回失敗響應.
緩存命令的函數queueMultiCommand的實現為(multi.c):
/* Add a new command into the MULTI commands queue */ void queueMultiCommand(redisClient *c) { multiCmd *mc; int j; c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); mc = c->mstate.commands+c->mstate.count; mc->ccmd = c->cmd; mc->argc = c->argc; mc->argv = zmalloc(sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++; }
在事務上下文中, 使用multiCmd結構來緩存命令, 該結構定義為(redis.h):
/* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd; } multiCmd;
其中argv字段指向命令的參數內存地址,argc為命令參數個數, cmd為命令描述結構, 包括名字和函數指針等.
命令參數的內存空間已經使用動態分配記錄于客戶端對象的argv字段了, multiCmd結構的argv字段指向客戶端對象redisClient的argv即可.
無法緩存命令時, 調用函數flagTransaction,該函數的實現為(multi.c):
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(redisClient *c) { if (c->flags & REDIS_MULTI) c->flags |= REDIS_DIRTY_EXEC; }
該函數在客戶端對象中設置REDIS_DIRTY_EXEC標志, 如果設置了這個標志, 事務提交時, 命令序列將被丟棄.
最后,在事務提交時, 函數processCommand中將調用call(c,REDIS_CALL_FULL);, 其實現為(redis.c):
/* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start, duration; int cclient_old_flags = c->flags; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); } /* Call the command. */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(&server.also_propagate); dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & REDIS_LUA_CLIENT) flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) { if (c->flags & REDIS_FORCE_REPL) server.lua_caller->flags |= REDIS_FORCE_REPL; if (c->flags & REDIS_FORCE_AOF) server.lua_caller->flags |= REDIS_FORCE_AOF; } /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c->argv,c->argc,duration); } if (flags & REDIS_CALL_STATS) { c->cmd->microseconds += duration; c->cmd->calls++; } /* Propagate the command into the AOF and replication link */ if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } /* Restore the old FORCE_AOF/REPL flags, since call can be executed * recursively. */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. */ if (server.also_propagate.numops) { int j; redisOp *rop; for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target); } redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++; }
在函數call中通過執行c->cmd->proc(c);調用具體的命令函數.事務提交命令EXEC對應的執行函數為execCommand, 其實現為(multi.c):
void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ if (!(c->flags & REDIS_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; } /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->ccmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first write op. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { execCommandPropagateMulti(c); must_propagate = 1; } call(c,REDIS_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) server.dirty++; handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
LINE8:11檢查EXEC命令和MULTI命令是否配對使用, 單獨執行EXEC命令是沒有意義的.
LINE19:24檢查客戶端對象是否具有REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC標志, 如果存在,則調用函數discardTransaction丟棄命令序列, 向客戶端返回失敗響應.
如果沒有檢查到任何錯誤,則先執行unwatchAllKeys(c);取消該客戶端上所有的樂觀鎖key.
LINE32:52依次執行緩存的命令序列,這里有兩點需要注意的是:
事務可能需要同步到AOF緩存或者replica備份節點中.如果事務中的命令序列都是讀操作, 則沒有必要向AOF和replica進行同步.如果事務的命令序列中包含寫命令,則MULTI, EXEC和相關的寫命令會向AOF和replica進行同步.根據LINE41:44的條件判斷,執行execCommandPropagateMulti(c);保證MULTI命令同步, LINE59檢查EXEC命令是否需要同步, 即MULTI命令和EXEC命令必須保證配對同步.EXEC命令的同步執行在函數的call中LINE62propagate(c->cmd,c->db->id,c->argv,c->argc,flags);, 具體的寫入命令由各自的執行函數負責同步.
這里執行命令序列時, 通過執行call(c,REDIS_CALL_FULL);所以call函數是遞歸調用.
所以,綜上所述, Redis事務其本質就是,以不可中斷的方式依次執行緩存的命令序列,將結果保存到內存cache中.
事務提交時, 丟棄命令序列會調用函數discardTransaction, 其實現為(multi.c):
void discardTransaction(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); unwatchAllKeys(c); }
該函數調用freeClientMultiState釋放multiCmd對象內存.調用initClientMultiState復位客戶端對象的緩存命令管理結構.調用unwatchAllKeys取消該客戶端的樂觀鎖.
WATCH命令執行樂觀鎖, 其對應的執行函數為watchCommand, 其實現為(multi.c):
void watchCommand(redisClient *c) { int j; if (c->flags & REDIS_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok); }
進而調用函數watchForKey, 其實現為(multi.c):
/* Watch for the specified key */ void watchForKey(redisClient *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* Check if we are already watching for this key */ listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; /* Key already watched */ } /* This key is not already watched in this DB. Let's add it */ clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); /* Add the new key to the list of keys watched by this client */ wk = zmalloc(sizeof(*wk)); wk->keykey = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk); }
關于樂觀鎖的key, 既保存于其客戶端對象的watched_keys鏈表中, 也保存于全局數據庫對象的watched_keys哈希表中.
LINE10:14檢查客戶端對象的鏈表中是否已經存在該key, 如果已經存在, 則直接返回.LINE16在全局數據庫中返回該key對應的客戶端對象鏈表, 如果鏈表不存在, 說明其他客戶端沒有使用該key作為樂觀鎖, 如果鏈表存在, 說明其他客戶端已經使用該key作為樂觀鎖. LINE22將當前客戶端對象記錄于該key對應的鏈表中. LINE28將該key記錄于當前客戶端的key鏈表中.
當前客戶端執行樂觀鎖以后, 其他客戶端的寫入命令可能修改該key值.所有具有寫操作屬性的命令都會執行函數signalModifiedKey, 其實現為(db.c):
void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); }
函數touchWatchedKey的實現為(multi.c):
/* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; if (dictSize(db->watched_keys) == 0) return; clients = dictFetchValue(db->watched_keys, key); if (!clients) return; /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); while((ln = listNext(&li))) { redisClient *c = listNodeValue(ln); c->flags |= REDIS_DIRTY_CAS; } }
語句if (dictSize(db->watched_keys) == 0) return;檢查全局數據庫中的哈希表watched_keys是否為空, 如果為空,說明沒有任何客戶端執行WATCH命令, 直接返回.如果該哈希表不為空, 取回該key對應的客戶端鏈表結構,并把該鏈表中的每個客戶端對象設置REDIS_DIRTY_CAS標志. 前面在EXEC的執行命令中,進行過條件判斷, 如果客戶端對象具有這個標志, 則丟棄事務中的命令序列.
在執行EXEC, DISCARD, UNWATCH命令以及在客戶端結束連接的時候,都會取消樂觀鎖, 最終都會執行函數unwatchAllKeys, 其實現為(multi.c):
/* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */ void unwatchAllKeys(redisClient *c) { listIter li; listNode *ln; if (listLength(c->watched_keys) == 0) return; listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { list *clients; watchedKey *wk; /* Lookup the watched key -> clients list and remove the client * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); redisAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ listDelNode(c->watched_keys,ln); decrRefCount(wk->key); zfree(wk); } }
語句if (listLength(c->watched_keys) == 0) return;判斷如果當前客戶端對象的watched_keys鏈表為空,說明當前客戶端沒有執行WATCH命令,直接返回.如果該鏈表非空, 則依次遍歷該鏈表中的key, 并從該鏈表中刪除key, 同時,獲得全局數據庫中的哈希表watched_keys中該key對應的客戶端鏈表, 刪除當前客戶端對象.
到此,相信大家對“如何深入理解Redis事務”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。