您好,登錄后才能下訂單哦!
這篇文章主要介紹“Redisson分布式鎖之加解鎖源碼分析”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Redisson分布式鎖之加解鎖源碼分析”文章能幫助大家解決問題。
我們都知道,Java中synchronized和lock都支持可重入,synchronized的鎖關聯一個線程持有者和一個計數器。當一個線程請求成功后,JVM會記下持有鎖的線程,并將計數器計為1。此時其他線程請求該鎖,則必須等待。而該持有鎖的線程如果再次請求這個鎖,就可以再次拿到這個鎖,同時計數器會遞增。當線程退出一個synchronized方法/塊時,計數器會遞減,如果計數器為0則釋放該鎖;在ReentrantLock中,底層的 AQS 對應的state 同步狀態值表示線程獲取該鎖的可重入次數,通過CAS方式進行設置,在默認情況下,state的值為0 表示當前鎖沒有被任何線程持有,原理類似。所以如果想要實現可重入性,可能須有一個計數器來控制重入次數,實際Redisson確實是這么做的。
好的我們通過Redisson客戶端進行設置,并循環3次,模擬鎖重入:000
for(int i = 0; i < 3; i++) { RedissonLockUtil.tryLock("distributed:lock:distribute_key", TimeUnit.SECONDS, 20, 100); }
連接Redis客戶端進行查看:
可以看到,我們設置的分布式鎖是存在一個hash結構中,value看起來是循環的次數3,key就不怎么認識了,那這個key是怎么設置進去的呢,另外為什么要設置成為Hash類型呢?
我們先來看看普通的分布式鎖的上鎖流程:
說明:
客戶端在進行加鎖時,會校驗如果業務上沒有設置持有鎖時長leaseTime,會啟動看門狗來每隔10s進行續命,否則就直接以leaseTime作為持有的時長;
并發場景下,如果客戶端1鎖還未釋放,客戶端2嘗試獲取,加鎖必然失敗,然后會通過發布訂閱模式來訂閱Key的釋放通知,并繼續進入后續的搶鎖流程。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } else { // 訂閱分布式Key對應的消息,監聽其它鎖持有者釋放,鎖沒有釋放的時候則會等待,直到鎖釋放的時候會執行下面的while循環 CompletableFuture subscribeFuture = this.subscribe(threadId); subscribeFuture.get(time, TimeUnit.MILLISECONDS); try { do { // 嘗試獲取鎖 ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); // 競爭獲取鎖成功,退出循環,不再競爭。 if (ttl == null) { return true; } // 利用信號量機制阻塞當前線程相應時間,之后再重新獲取鎖 if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); } finally { // 競爭鎖成功后,取消訂閱該線程Id事件 this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId); } } } }
RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 如果設置了持有鎖的時長,直接進行嘗試加鎖操作 if (leaseTime != -1L) { return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 未設置加鎖時長,在加鎖成功后,啟動續期任務,初始默認持有鎖時間是30s RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { public void operationComplete(Future<Long> future) throws Exception { if (future.isSuccess()) { Long ttlRemaining = (Long)future.getNow(); if (ttlRemaining == null) { RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture; } }
我們都知道Redis執行Lua腳本具有原子性,所以在嘗試加鎖的下層,Redis主要執行了一段復雜的lua腳本:
-- 不存在該key時 if (redis.call('exists', KEYS[1]) == 0) then -- 新增該鎖并且hash中該線程id對應的count置1 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 設置過期時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 存在該key 并且 hash中線程id的key也存在 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 線程重入次數++ redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
參數說明:
KEYS[1]:對應我們設置的分布式key,即:distributed:lock:distribute_key
ARGV[1]:業務自定義的加鎖時長或者默認的30s;
ARGV[2]: 具體的客戶端初始化連接UUID+線程ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1
我們從上面的腳本中可以看出核心邏輯其實不難:
如果分布式鎖Key未被任何端持有,直接根據“客戶端連接ID+線程ID” 進行初始化設置,并設置重入次數為1,并設置Key的過期時間;
否則重入次數+1,并重置過期時間;
接下來看看scheduleExpirationRenewal續命是怎么做的呢?
private void scheduleExpirationRenewal(final long threadId) { if (!expirationRenewalMap.containsKey(this.getEntryName())) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { // 執行續命操作 RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName()); ... // 續命成功,繼續 if ((Boolean)future.getNow()) { RedissonLock.this.scheduleExpirationRenewal(threadId); } } }); } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); } }
Tip小知識點:
續期是用的什么定時任務執行的?
Redisson用netty的HashedWheelTimer做命令重試機制,原因在于一條redis命令的執行不論成功或者失敗耗時都很短,而HashedWheelTimer是單線程的,系統性能開銷小。
而在上面的renewExpirationAsync中續命操作的執行核心Lua腳本要做的事情也非常的簡單,就是給這個Key的過期時間重新設置為指定的30s.
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
釋放鎖主要是除了解鎖本省,另外還要考慮到如果存在續期的情況,要將續期任務刪除:
public RFuture<Void> unlockAsync(long threadId) { // 解鎖 RFuture<Boolean> future = this.unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { // 解除續期 this.cancelExpirationRenewal(threadId); ... }); return new CompletableFutureWrapper(f); }
在unlockInnerAsync內部,Redisson釋放鎖其實核心也是執行了如下一段核心Lua腳本:
// 校驗是否存在 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; // 獲取加鎖次數,校驗是否為重入鎖 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // 如果為重入鎖,重置過期時間,鎖本身不釋放 if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; // 刪除Key else redis.call('del', KEYS[1]); // 通知阻塞的客戶端可以搶鎖啦 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
其中:
KEYS[1]: 分布式鎖
KEYS[2]: redisson_lock_channel:{分布式鎖} 發布訂閱消息的管道名稱
ARGV[1]: 發布的消息內容
ARGV[2]: 鎖的過期時間
ARGV[3]: 線程ID標識名稱
關于“Redisson分布式鎖之加解鎖源碼分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。