您好,登錄后才能下訂單哦!
這篇文章主要介紹了Redisson如何實現分布式鎖、鎖續約的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Redisson如何實現分布式鎖、鎖續約文章都會有所收獲,下面我們一起來看看吧。
使用當前(2022年12月初)最新的版本:3.18.1;
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.18.1</version> </dependency>
案例
案例采用redis-cluster集群的方式;
public class Main { public static void main(String[] args) throws Exception { // 1.配置Redis-Cluster集群節點的ip和port Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002") .addNodeAddress("redis://127.0.0.1:7003") .addNodeAddress("redis://127.0.0.1:7004"); // 2.創建Redisson的客戶端 RedissonClient redisson = Redisson.create(config); // 3.測試Redisson可重?鎖的加鎖、釋放鎖 testLock(redisson); } private static void testLock(RedissonClient redisson) throws InterruptedException { // 1.獲取key為"anyLock"的鎖對象 final RLock lock = redisson.getLock("test_lock"); boolean locked = true; try { //2.1:加鎖 lock.lock(); // 2.2:加鎖,并設置嘗試獲取鎖超時時間30s、鎖超時?動釋放的時間10s // locked = lock.tryLock(30, 10, TimeUnit.SECONDS); if (locked) System.out.println("加鎖成功!" + new Date()); Thread.sleep(20 * 1000); System.out.println("鎖邏輯執行完畢!" + new Date()); } finally { // 3.釋放鎖 lock.unlock(); } } }
redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群;在分布式鎖的實現上區別在于hash槽的獲取方式。
具體配置方式見Redisson的GitHub
分布式鎖主要需要以下redis命令:
EXISTS key:當 key 存在,返回1;不存在,返回0。
GETSET key value:將給定 key 的值設為 value ,并返回 key 的舊值 (old value);當 key 存在但不是字符串類型時,返回一個錯誤;當key不存在時,返回nil。
GET key:返回 key 所關聯的字符串值,如果 key 不存在那么返回 nil。
DEL key [KEY …]:刪除給定的一個或多個 key(不存在的 key 會被忽略),返回實際刪除的key的個數(integer)。
DEL key1 key2 key3
HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值并返回1;如果field已有,那么就更新value的值,并返回0。
HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field有一個不存在返回0。
HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment;
如果鍵key不存在,一個保存了哈希對象{field=value}的key將被創建;如果字段field不存在,在進行當前操作前,feild將被創建,且對應的值被置為0;返回值是increment。PEXPIRE key milliseconds:設置存活時間,單位是毫秒。EXPIRE操作單位是秒。
PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。
Redisson源碼中,執行redis命令的是lua腳本,其中主要有如下幾個概念:
redis.call():執行redis命令。
KEYS[n]:指腳本中第n個參數,比如KEYS[1]指腳本中的第一個參數。
ARGV[n]:指腳本中第n個參數的值,比如ARGV[1]指腳本中的第一個參數的值。
返回值中nil與false同一個意思。
在redis執行lua腳本時,相當于一個redis級別的鎖,不能執行其他操作,類似于原子操作,這也是redisson實現的一個關鍵點。
另外,如果lua腳本執行過程中出現了異常或者redis服務器宕機了,會將腳本中已經執行的命令在AOF、RDB日志中刪除;即LUA腳本執行報錯會進行回滾操作。
RLock接口主要繼承了Lock接口,并擴展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime參數,用來設置鎖的過期時間,如果超過leaseTime還沒有解鎖的話,redis就強制解鎖;leaseTime的默認時間是30s。
獲取RLock對象
RLock lock = redissonClient.getLock("test_lock");
RLock對象表示?個鎖對象,我們要某一個key加鎖時,需要先獲取?個鎖對象。
這里并沒有具體請求Redis進行加鎖的邏輯,而只是調用RedissonLock的構造函數,設置一些變量。
進入到Rlock#lock()方法,先看主流程;關于競爭鎖等待時間、鎖超時釋放時間的配置、使用,在流程中穿插著聊。
0)加鎖流程圖
1)加鎖到哪臺機器
lock()方法執行鏈路:
走到這里,已經可以看到加鎖的底層邏輯:LUA腳本。
而lua腳本只是??串字符串,作為evalWriteAsync()?法的?個參數?已;所以下?步進到evalWriteAsync()?法中:
走到這里會調用ConnectionManager#getEntry(String)方法;
在創建RedissonClient時,筆者配置的是Redis-Cluster,而走到這里卻會進入到MasterSlaveConnectionManager
,實際上實例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager
繼承自MasterSlaveConnectionManager
,并且ClusterConnectionManager
沒有重寫getEntry(String)方法,所以會進入到MasterSlaveConnectionManager
#getEntry(String)方法。
ConnectionManager#getEntry(String)方法會根據傳入的key名稱找到相應的Redis節點、目標master。
Redis-Cluster集群中的數據分布式是 通過?個?個的hash slot來實現的,Redis-Cluster集群總共16384個hash slot,它們都 會被均勻分布到所有的master節點上;這里ClusterConnectionManager通過key名稱計算出相應的hash slot方式如下:
?先通過key計算出CRC16值,然后 CRC16值對16384進?取模,進?得到hash slot。
@Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); if (end != -1 && start + 1 < end) { key = key.substring(start + 1, end); } } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; log.debug("slot {} for {}", result, key); return result; }
這?計算出key的hash slot之后,就可以通過hash slot 去看?看哪個master上有這個hash slot,如果某個master上有個這個hash slot,那么這個 key當然就會落到該master節點上,執?加鎖指令也就應該在該master上執?。
下面進入本文重點,可重入鎖的各種加鎖、釋放鎖。
2)Client第一次加鎖
在尋找應該在哪臺Redis機器上加鎖時,在RedissonLock#tryLockInnerAsync()方法中我們看到了一堆LUA腳本:
LUA腳本參數解析:
KEYS[1] 表示的是 getName() ,即鎖key的名稱,比如案例中的 test_lock;
ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s;
ARGV[2] 表示的是 getLockName(threadId) ,唯一標識當前訪問線程,使用鎖對象id+線程id(UUID:ThreadId)方式表示,用于區分不同服務器上的線程。
UUID用來唯?標識?個客戶端,因為會有多個客戶端的多個線程加鎖;
結合起來的UUID:ThreadId 表示:具體哪個客戶端上的哪個線程過來加鎖,通 過這樣的組合?式唯?標識?個線程。
LUA腳本邏輯:
如果鎖名稱不存在;
則向redis中添加一個key為test_lock的HASH結構、添加一個field為線程id,值=1的鍵值對{field:increment},表示此線程的重入次數為1;
設置test_lock的過期時間,防止當前服務器出問題后導致死鎖,然后return nil; end;返回nil,lua腳本執行完畢;
如果鎖存在,檢測當前線程是否持有鎖;
如果是當前線程持有鎖,hincrby將該線程重入的次數++;并重新設置鎖的過期時間;返回nil,lua腳本執行完畢;
如果不是當前線程持有鎖,pttl返回鎖的過期時間,單位ms。
第一次加鎖時,key肯定不存在與master節點上;
會執行下列LUA腳本對應的Redis指令:
hset test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時,Redis中多一個Hash結構的key(test_lock):
test_lock : { UUID:ThreadId:1 }
這里的1使用來做鎖重入的。
pexpire
指令為test_lock這個key設置過期時間為30s,即:30s后這個key會?動過期被刪除,key對應的鎖在那時也就被釋放了。
總體來看,加鎖的邏輯很簡單:
在key對應的hash數據結構中記錄了? 下當前是哪個客戶端的哪個線程過來加鎖了,然后設置了?下key的過期時間為30s。 3)加鎖成功之后的鎖續約
成功加鎖后,lua腳本返回nil,即null。
加鎖成功之后,tryLockInnerAsync()?法返回;再結合Java8的Stream,對加鎖結果進一步處理;
因為加鎖成功后返回的是nil,這是lua腳本的返回形式,體現到java代碼中的返回值為:null。
又由于RLock#lock()方法傳入的leaseTime是-1,所以進入到scheduleExpirationRenewal(long)
方法做鎖續約。
renewExpirationAsync()方法負責做具體的鎖續約:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
這里LUA腳本的邏輯很簡單:
判斷當前key中,是否還被線程UUID:ThreadId持有鎖,持有則設置過期時間為30s(續命)。
鎖續約(看門狗機制)其實就是每次加鎖成功后,會?上開啟?個后臺線程, 每隔10s檢查?下key是否存在,如果存在就為key續期30s。
這里的10s,取自配置的lockWatchdogTimeout
參數,默認為30 * 1000 ms;
所以?個key往往當過期時間慢慢消逝到20s左右時就?會被定時任務重置為了30s,這樣就能保證:只要這個定時任務還在、這個key還在,就?直維持加鎖。
如果當前持有鎖的線程被中斷了,會停止鎖續約,即殺死看門狗;
protected void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
所謂的停止鎖續約,實際就是將當前線程的threadId從看門狗緩存中移除,后續在執行鎖續約時,如果發現看門狗緩存中已經沒有了當前線程threadId,則直接退出鎖續約 并且 不再延時10s開啟一個定時任務。
如果加鎖時指定了leaseTime > 0,則不會開門狗機制,表示強制鎖leaseTime 毫秒后過期。一共有三種加鎖方式可以做到,如下:
RLock#lock(long leaseTime, TimeUnit unit)
RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
4)重入加鎖(相同線程多次加鎖)
再次回到加鎖的LUA腳本:
同一個線程對分布式鎖多次加鎖時,會走以下邏輯:
判斷當前key是否被當前線程持有,如果是則增加鎖重入的次數,并重新設置鎖的過期時間為30s;
對應的Redis命令為:
hexists test_lock UUID:ThreadId hincrby test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時Redis中test_key對應的數據結構從
test_lock : { UUID:ThreadId:1 }
變成:
test_lock : { UUID:ThreadId:2 }
并將key的過期時間重新設置為30s。
鎖重入成功之后,后臺也會開啟?個watchdog后臺線程做鎖續約,每隔10s檢查?下key,如果key存在就將key的過期時間重新設置為30s。
Redisson可重?加鎖的語義,實際是通過Hash結構的key中某個線程(UUID:ThreadId)對應的加鎖次數來表示的。
5)鎖競爭(其他線程加鎖失敗)
再再次回到加鎖的LUA腳本:
如果分布式鎖已經被其他線程持有,LUA腳本會執行以下邏輯:
返回當前key的剩余存活時間,因為不是返回nil,也就代表著加鎖失敗;
對應的Redis的命令為:
pttl test_lock
針對加鎖方式的不同,加鎖失敗的邏輯也不同;可以分兩大類:指定了加鎖失敗的等待時間waitTime和未指定waitTime。
未執行加鎖失敗的等待時間waitTime
:獲取分布式鎖失敗會一直重試,直到獲取鎖成功。比如下列加鎖方法:
Rlock#lock()
:一直嘗試獲取分布式鎖,直到獲取鎖成功。
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
RLock#lock(long leaseTime, TimeUnit unit)
指定了加鎖失敗的等待時間waitTime
:獲取分布式鎖會超時,超時之后返回加鎖失敗;
Rlock#tryLock(long waitTime, TimeUnit unit)
:指定獲取鎖失敗的等待時間。在等待時間范圍之內進行重試,超時則返回加鎖失敗。
Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
:同樣是指定獲取鎖失敗的等待時間,并且強制指定鎖過期的時間(不開啟看門狗)。在等待時間范圍之內進行重試,超時則返回加鎖失敗。
可以簡單的概述為RLock接口下的tryLock()
方法獲取鎖會失敗,lock()
方法獲取鎖一定會成功。
1> 一直重試直到加鎖成功
以Rlock#lock()
方法為例:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // lock() 或 lockInterruptibly()為入口走到這里時。leaseTime為-1,表示會開始開門狗;如果leaseTime大于0,則不會開啟開門狗; ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // 因為Semaphore的可用資源為0,所以這里就等價于Thread.sleep(ttl); entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } }
首先訂閱解鎖channel(命名格式:
redisson_lock__channel:{keyName}
),其他線程解鎖后,會發布解鎖的消息;這里收到消息會立即嘗試獲取鎖;訂閱解鎖channel的超時時間默認為7.5s。也就說獲取鎖失敗7.5s之內,如果其他線程釋放鎖,當前線程可以立即嘗試獲取到鎖。獲取鎖失敗之后會進??個while死循環中:
每休息鎖的存活時間ttl
之后,就嘗試去獲取鎖,直到成功獲取到鎖才會跳出while死循環。
2> 等待鎖超時返回加鎖失敗
以Rlock#tryLock(long waitTime, TimeUnit unit)
為例:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 獲取鎖剩余的等待時長 time -= System.currentTimeMillis() - current; if (time <= 0) { // 獲取鎖超時,返回獲取分布式鎖失敗 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { // 訂閱解鎖channel的超時時長為 獲取鎖剩余的等待時長 subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + time + "ms. " + "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { // 收到解鎖channel的消息之后,走到這里,再次判斷獲取鎖等待時長是否超時 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // while循環中嘗試去獲取鎖 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { // 如果獲取鎖失敗后,鎖存活時長 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 如果獲取鎖失敗后,鎖存活時間 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
加鎖存在超時時間 相比于 一直重試直到加鎖成功,只是多一個時間限制,具體差異體現在:訂閱解鎖channel的超時時長、獲取鎖失敗后線程的睡眠時長、重試獲取鎖次數的限制;
獲取分布式鎖失敗之后,立即判斷當前獲取鎖是否超時,如果超時,則返回加鎖失敗;
否者,訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}
),其他線程解鎖后,會發布解鎖的消息;
訂閱解鎖channel的超時時間為 獲取鎖剩余的等待時長
。 在這個時間范圍之內,如果其他線程釋放鎖,當前線程收到解鎖channel的消息之后再次判斷獲取鎖是否超時,如果不超時,嘗試獲取鎖。
獲取鎖之后會進??個while死循環中: 如果獲取鎖超時,則返回加鎖失敗;
否者讓線程睡眠: 如果鎖存活時長ttl
小于 剩余鎖等待時長,則線程睡眠 鎖存活時長;
如果鎖存活時間ttl
大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長;
線程睡眠完之后,判斷獲取鎖是否超時,不超時則嘗試去獲取鎖。
1)Client主動嘗試釋放鎖
進入到Rlock#unlock()方法;
和加鎖的方式?樣,釋放鎖也是通過lua腳本來完成的;
LUA腳本參數解析:
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock;
KEYS[2] 表示getChanelName() 表示的是發布訂閱過程中使用的Chanel;
ARGV[1] 表示的是LockPubSub.unLockMessage,解鎖消息,實際代表的是數字 0,代表解鎖消息;
ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s;
ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用鎖對象id+線程id, 表示當前訪問線程,用于區分不同服務器上的線程。
LUA腳本邏輯:
如果鎖名稱不存在;
可能是因為鎖過期導致鎖不存在,也可能是并發解鎖。
則發布鎖解除的消息,返回1,lua腳本執行完畢;
如果鎖存在,檢測當前線程是否持有鎖;
如果是當前線程持有鎖,定義變量counter,接收執行incrby將該線程重入的次數–的結果;
如果重入次數大于0,表示該線程還有其他任務需要執行;重新設置鎖的過期時間;返回0,lua腳本執行完畢;
否則表示該線程執行結束,del刪除該鎖;并且publish發布該鎖解除的消息;返回1,lua腳本執行完畢;
如果不是當前線程持有鎖 或 其他情況,都返回nil,lua腳本執行完畢。
腳本執行結束之后,如果返回值不是0或1,即當前線程去釋放其他線程的加鎖時,拋出異常。
通過LUA腳本釋放鎖成功之后,會將看門狗殺死;
2)Client主動強制釋放鎖
forceUnlockAsync()
方法被調用的地方很多,大多都是在清理資源時刪除鎖。
@Override public RFuture<Boolean> forceUnlockAsync() { cancelExpirationRenewal(null); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }
LUA腳本邏輯:
邏輯比較簡單粗暴:刪除鎖成功則并發布鎖被刪除的消息,返回1結束,否則返回0結束。
3)Client宕機,鎖超時釋放
如果Redisson客戶端剛加鎖成功,并且未指定releaseTime,后臺會啟動一個定時任務watchdog每隔10s檢查key:key如果存在就為它?動續命到30s;在watchdog定時任務存在的情況下,如果不是主動釋放鎖,那么key將會?直的被watchdog這個定時任務維持加鎖。
但是如果客戶端宕機了,定時任務watchdog也就沒了,也就沒有鎖續約機制了,那么過完30s之后,key會?動被刪除、key對應的鎖也自動被釋放了。
4)不啟動鎖續約的超時釋放鎖
如果在加鎖時指定了leaseTime,加鎖成功之后,后臺并不會啟動一個定時任務watchdog做鎖續約;key存活leaseTime 毫秒之后便會自動被刪除、key對應的鎖也就自動被釋放了;無論當前線程的業務邏輯是否執行完畢。
比如使用如下方式加鎖:
RLock#lock(long leaseTime, TimeUnit unit)
RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
關于“Redisson如何實現分布式鎖、鎖續約”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Redisson如何實現分布式鎖、鎖續約”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。