您好,登錄后才能下訂單哦!
本篇內容主要講解“redis中的分布式鎖有哪些特點”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“redis中的分布式鎖有哪些特點”吧!
1.獨占性
不論在任何情況下都只能有一個線程持有鎖。
2.高可用
redis集群環境不能因為某一個節點宕機而出現獲取鎖或釋放鎖失敗。
3.防死鎖
必須有超時控制機制或者撤銷操作。
4.不亂搶
自己加鎖,自己釋放。不能釋放別人加的鎖。
5.重入性
同一線程可以多次加鎖。
一般情況下都是使用setnx+lua腳本實現。
直接貼代碼
package com.fandf.test.redis;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* redis 單機鎖
*
* @author fandongfeng
* @date 2023/3/29 06:52
*/
@Slf4j
@Service
public class RedisLock {
@Resource
RedisTemplate<String, Object> redisTemplate;
private static final String SELL_LOCK = "kill:";
/**
* 模擬秒殺
*
* @return 是否成功
*/
public String kill() {
String productId = "123";
String key = SELL_LOCK + productId;
//鎖value,解鎖時 用來判斷當前鎖是否是自己加的
String value = IdUtil.fastSimpleUUID();
//加鎖 十秒鐘過期 防死鎖
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
if (!flag) {
return "加鎖失敗";
}
try {
String productKey = "good123";
//獲取商品庫存
Integer stock = (Integer) redisTemplate.opsForValue().get(productKey);
if (stock == null) {
//模擬錄入數據, 實際應該加載時從數據庫讀取
redisTemplate.opsForValue().set(productKey, 100);
stock = 100;
}
if (stock <= 0) {
return "賣完了,下次早點來吧";
}
//扣減庫存, 模擬隨機賣出數量
int randomInt = RandomUtil.randomInt(1, 10);
redisTemplate.opsForValue().decrement(productKey, randomInt);
// 修改db,可以丟到隊列里慢慢處理
return "成功賣出" + randomInt + "個,庫存剩余" + redisTemplate.opsForValue().get(productKey) + "個";
} finally {
// //這種方法會存在刪除別人加的鎖的可能
// redisTemplate.delete(key);
// if(value.equals(redisTemplate.opsForValue().get(key))){
// //因為if條件的判斷和 delete不是原子性的,
// //if條件判斷成功后,恰好鎖到期自己解鎖
// //此時別的線程如果持有鎖了,就會把別人的鎖刪除掉
// redisTemplate.delete(key);
// }
//使用lua腳本保證判斷和刪除的原子性
String luaScript =
"if (redis.call('get',KEYS[1]) == ARGV[1]) then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value);
}
}
}
進行單元測試,模擬一百個線程同時進行秒殺
package com.fandf.test.redis;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
/**
* @Description:
* @author: fandongfeng
* @date: 2023-3-24 16:45
*/
@SpringBootTest
class SignServiceTest {
@Resource
RedisLock redisLock;
@RepeatedTest(100)
@Execution(CONCURRENT)
public void redisLock() {
String result = redisLock.kill();
if("加鎖失敗".equals(result)) {
}else {
System.out.println(result);
}
}
}
只有三個線程搶到了鎖
成功賣出5個,庫存剩余95個
成功賣出8個,庫存剩余87個
成功賣出7個,庫存剩余80個
總的來說有兩個:
1.無法重入。
2.我們為了防止死鎖,加鎖時都會加上過期時間,這個時間大部分情況下都是根據經驗對現有業務評估得出來的,但是萬一程序阻塞或者異常,導致執行了很長時間,鎖過期就會自動釋放了。此時如果別的線程拿到鎖,執行邏輯,就有可能出現問題。
那么這兩個問題有沒有辦法解決呢?有,接下來我們就來講講Redisson
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
集成很簡單,只需兩步
pom引入依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
application.yml增加redis配置
spring:
application:
name: test
redis:
host: 127.0.0.1
port: 6379
使用也很簡單,只需要注入RedissonClient即可
package com.fandf.test.redis;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author fandongfeng
*/
@Component
@Slf4j
public class RedissonTest {
@Resource
RedissonClient redissonClient;
public void test() {
RLock rLock = redissonClient.getLock("anyKey");
//rLock.lock(10, TimeUnit.SECONDS);
rLock.lock();
try {
// do something
} catch (Exception e) {
log.error("業務異常", e);
} finally {
rLock.unlock();
}
}
}
可能不了解redisson的小伙伴會不禁發出疑問。
what?加鎖時不需要加過期時間嗎?這樣會不會導致死鎖啊。解鎖不需要判斷是不是自己持有嗎?
哈哈,別著急,我們接下來一步步揭開redisson的面紗。
我們來一步步跟著lock()方法看下源碼(本地redisson版本為3.20.0)
//RedissonLock.class
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
查看lock(-1, null, false);方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//獲取當前線程id
long threadId = Thread.currentThread().getId();
//加鎖代碼塊, 返回鎖的失效時間
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
我們看下它是怎么上鎖的,也就是tryAcquire方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//真假加鎖方法 tryAcquireAsync
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//waitTime和leaseTime都是-1,所以走這里
//過期時間internalLockLeaseTime初始化的時候賦值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
//跟進去源碼發現默認值是30秒, private long lockWatchdogTimeout = 30 * 1000;
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
//加鎖成功,開啟子線程進行續約
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
//如果指定了過期時間,則不續約
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//沒指定過期時間,或者小于0,在這里實現鎖自動續約
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
上面代碼里面包含加鎖和鎖續約的邏輯,我們先來看看加鎖的代碼
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
這里就看的很明白了吧,redisson使用了lua腳本來保證了命令的原子性。
redis.call('hexists', KEYS[1], ARGV[2]) 查看 key value 是否存在。
Redis Hexists 命令用于查看哈希表的指定字段是否存在。
如果哈希表含有給定字段,返回 1 。 如果哈希表不含有給定字段,或 key 不存在,返回 0 。
127.0.0.1:6379> hexists 123 uuid
(integer) 0
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 1
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 2
127.0.0.1:6379> hincrby 123 uuid 1
(integer) 3
127.0.0.1:6379> hexists 123 uuid
(integer) 1
127.0.0.1:6379> hgetall 123
1) "uuid"
2) "3"
127.0.0.1:6379>
當key不存在,或者已經含有給定字段(也就是已經加過鎖了,這里是為了實現重入性),直接對字段的值+1
這個字段的值,也就是ARGV[2], 取得是getLockName(threadId)方法,我們再看看這個字段的值是什么
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getServiceManager().getId();
this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
//commandExecutor.getServiceManager() 的id默認值
private final String id = UUID.randomUUID().toString();
這里就明白了,字段名稱是 uuid + : + threadId
接下來我們看看鎖續約的代碼scheduleExpirationRenewal(threadId);
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//判斷該實例是否加過鎖
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
//重入次數+1
oldEntry.addThreadId(threadId);
} else {
//第一次加鎖
entry.addThreadId(threadId);
try {
//鎖續約核心代碼
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
//如果線程異常終止,則關閉鎖續約線程
cancelExpirationRenewal(threadId);
}
}
}
}
我們看看renewExpiration()方法
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//新建一個線程執行
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//設置鎖過期時間為30秒
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//檢查鎖是還否存在
if (res) {
// reschedule itself 10后調用自己
renewExpiration();
} else {
//關閉續約
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//注意上行代碼internalLockLeaseTime / 3,
//internalLockLeaseTime默認30s,那么也就是10s檢查一次
ee.setTimeout(task);
}
//設置鎖過期時間為internalLockLeaseTime 也就是30s lua腳本保證原子性
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
OK,分析到這里我們已經知道了,lock(),方法會默認加30秒過期時間,并且開啟一個新線程,每隔10秒檢查一下,鎖是否釋放,如果沒釋放,就將鎖過期時間設置為30秒,如果鎖已經釋放,那么就將這個新線程也關掉。
我們寫個測試類看看
package com.fandf.test.redis;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @Description:
* @author: fandongfeng
* @date: 2023-3-2416:45
*/
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redisson;
@Test
public void watchDog() throws InterruptedException {
RLock lock = redisson.getLock("123");
lock.lock();
Thread.sleep(1000000);
}
}
查看鎖的過期時間,及是否續約
127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 26
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> ttl 123
(integer) 22
127.0.0.1:6379> ttl 123
(integer) 21
127.0.0.1:6379> ttl 123
(integer) 20
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 28
127.0.0.1:6379>
我們再改改代碼,看看是否可重入和字段名稱是否和我們預期一致
package com.fandf.test.redis;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @Description:
* @author: fandongfeng
* @date: 2023-3-24 16:45
*/
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redisson;
@Test
public void watchDog() throws InterruptedException {
RLock lock = redisson.getLock("123");
lock.lock();
lock.lock();
lock.lock();
//加了三次鎖,此時重入次數為3
Thread.sleep(3000);
//解鎖一次,此時重入次數變為3
lock.unlock();
Thread.sleep(1000000);
}
}
127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379>
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "3"
127.0.0.1:6379>
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "2"
127.0.0.1:6379>
我們加鎖了三次,重入次數是3,字段值也是 uuid+:+threadId,和我們預期結果是一致的。
redisson是基于Redlock算法實現的,那么什么是Redlock算法呢?
假設當前集群有5個節點,那么運行redlock算法的客戶端會一次執行下面步驟
1.客戶端記錄當前系統時間,以毫秒為單位
2.依次嘗試從5個redis實例中,使用相同key獲取鎖
當redis請求獲取鎖時,客戶端會設置一個網絡連接和響應超時時間,避免因為網絡故障等原因導致阻塞。3.客戶端使用當前時間減去開始獲取鎖時間(步驟1的時間),得到獲取鎖消耗的時間
只有當半數以上redis節點加鎖成功,并且加鎖消耗的時間要小于鎖失效時間,才算鎖獲取成功4.如果獲取到了鎖,key的真正有效時間等于鎖失效時間 減去 獲取鎖消耗的時間
5.如果獲取鎖失敗,所有的redis實例都會進行解鎖
防止因為服務端響應消息丟失,但是實際數據又添加成功導致數據不一致問題
這里有下面幾個點需要注意:
1.我們都知道單機的redis是cp的,但是集群情況下redis是ap的,所以運行Redisson的節點必須是主節點,不能有從節點,防止主節點加鎖成功未同步從節點就宕機,而客戶端卻收到加鎖成功,導致數據不一致問題。
2.為了提高redis節點宕機的容錯率,可以使用公式2N(n指宕機數量)+1,假設宕機一臺,Redisson還要繼續運行,那么至少要部署2*1+1=3臺主節點。
到此,相信大家對“redis中的分布式鎖有哪些特點”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。