您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何用Redis實現分布式鎖,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
我們學習 Java 都知道鎖的概念,例如基于 JVM 實現的同步鎖 synchronized,以及 jdk 提供的一套代碼級別的鎖機制 lock,我們在并發編程中會經常用這兩種鎖去保證代碼在多線程環境下運行的正確性。但是這些鎖機制在分布式場景下是不適用的,原因是在分布式業務場景下,我們的代碼都是跑在不同的JVM甚至是不同的機器上,synchronized 和 lock 只能在同一個 JVM 環境下起作用。所以這時候就需要用到分布式鎖了。
例如,現在有個場景就是整點搶消費券(疫情的原因,支付寶最近在8點、12點整點開放搶消費券),消費券有一個固定的量,先到先得,搶完就沒了,線上的服務都是部署多個的,大致架構如下:
所以這個時候我們就得用分布式鎖來保證共享資源的訪問的正確性。
回到頂部
假設不使用分布式鎖,我們看看 synchronized 能不能保證?其實是不能的,我們來演示一下。
下面我寫了一個簡單的 springboot 項目來模擬這個搶消費券的場景,代碼很簡單,大致意思是先從 Redis 獲取剩余消費券數,然后判斷大于0,則減一模擬被某個用戶搶到一個,然后減一后再修改 Redis 的剩余消費券數量,打印扣減成功,剩余還有多少,否則扣減失敗,就沒搶到。整塊代碼被 synchronized 包裹,Redis 設置的庫存數量為50。
//假設庫存編號是00001private String key = "stock:00001"; @Autowiredprivate StringRedisTemplate stringRedisTemplate;/** * 扣減庫存 synchronized同步鎖*/@RequestMapping("/deductStock")public String deductStock(){ synchronized (this){ //獲取當前庫存 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); if(stock>0){ int afterStock = stock-1; stringRedisTemplate.opsForValue().set(key,afterStock+"");//修改庫存 System.out.println("扣減庫存成功,剩余庫存"+afterStock); }else { System.out.println("扣減庫存失敗"); } } return "ok"; }
然后啟動兩個springboot項目,端口分別為8080,8081,然后在nginx里配置負載均衡
upstream redislock{ server 127.0.0.1:8080; server 127.0.0.1:8081; } server { listen 80; server_name 127.0.0.1; location / { root html; index index.html index.htm; proxy_pass http://redislock; } }
然后用jmeter壓測工具進行測試
然后我們看一下控制臺輸出,可以看到我們運行的兩個web實例,很多同樣的消費券被不同的線程搶到,證明synchronized在這樣的情況下是不起作用的,所以就需要使用分布式鎖來保證資源的正確性。
回到頂部
在實現分布式鎖之前,我們先考慮如何實現,以及都要實現鎖的哪些功能。
1、分布式特性(部署在多個機器上的實例都能夠訪問這把鎖)
2、排他性(同一時間只能有一個線程持有鎖)
3、超時自動釋放的特性(持有鎖的線程需要給定一定的持有鎖的最大時間,防止線程死掉無法釋放鎖而造成死鎖)
4、...
基于以上列出的分布式鎖需要擁有的基本特性,我們思考一下使用Redis該如何實現?
1、第一個分布式的特性Redis已經支持,多個實例連同一個Redis即可
2、第二個排他性,也就是要實現一個獨占鎖,可以使用Redis的setnx命令實現
3、第三個超時自動釋放特性,Redis可以針對某個key設置過期時間
4、執行完畢釋放分布式鎖
科普時間
Redis Setnx 命令
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在時,為 key 設置指定的值
語法
redis Setnx 命令基本語法如下:
redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
可用版本:>= 1.0.0
返回值:設置成功,返回1, 設置失敗,返回0
@RequestMapping("/stock_redis_lock")public String stock_redis_lock(){ //底層使用setnx命令 Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true"); stringRedisTemplate.expire(lock_key,10, TimeUnit.SECONDS);//設置過期時間10秒 if (!aTrue) {//設置失敗則表示沒有拿到分布式鎖 return "error";//這里可以給用戶一個友好的提示 } //獲取當前庫存 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); if(stock>0){ int afterStock = stock-1; stringRedisTemplate.opsForValue().set(key,afterStock+""); System.out.println("扣減庫存成功,剩余庫存"+afterStock); }else { System.out.println("扣減庫存失敗"); } stringRedisTemplate.delete(lock_key);//執行完畢釋放分布式鎖 return "ok"; }
仍然設置庫存數量為50,我們再用jmeter測試一下,把jmeter的測試地址改為127.0.0.1/stock_redis_lock,同樣的設置再來測一次。
測試了5次沒有出現臟數據,把發送時間改為0,測了5次也沒問題,然后又把線程數改為600,時間為0 ,循環4次,測了幾次也是正常的。
上面實現分布式鎖的代碼已經是一個較為成熟的分布式鎖的實現了,對大多數軟件公司來說都已經滿足需求了。但是上面代碼還是有優化的空間,例如:
1)上面的代碼我們是沒有考慮異常情況的,實際情況下代碼沒有這么簡單,可能還會有別的很多復雜的操作,都有可能會出現異常,所以我們釋放鎖的代碼需要放在finally塊里來保證即使是代碼拋異常了釋放鎖的代碼他依然會被執行。
2)還有,你有沒有注意到,上面我們的分布式鎖的代碼的獲取和設置過期時間的代碼是兩步操作第4行和第5行,即非原子操作,就有可能剛執行了第4行還沒來得及執行第5行這臺機器掛了,那么這個鎖就沒有設置超時時間,其他線程就一直無法獲取,除非人工干預,所以這是一步優化的地方,Redis也提供了原子操作,那就是SET key value EX seconds NX
科普時間
SET key value [EX seconds] [PX milliseconds] [NX|XX] 將字符串值 value 關聯到 key
可選參數
從 Redis 2.6.12 版本開始, SET 命令的行為可以通過一系列參數來修改:
EX second :設置鍵的過期時間為 second 秒。SET key value EX second 效果等同于 SETEX key second value
PX millisecond :設置鍵的過期時間為 millisecond 毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value
NX :只在鍵不存在時,才對鍵進行設置操作。SET key value NX 效果等同于 SETNX key value
XX :只在鍵已經存在時,才對鍵進行設置操作
SpringBoot的StringRedisTemplate也有對應的方法實現,如下代碼:
//假設庫存編號是00001private String key = "stock:00001";private String lock_key = "lock_key:00001"; @Autowiredprivate StringRedisTemplate stringRedisTemplate; @RequestMapping("/stock_redis_lock")public String stock_redis_lock() { String uuid = UUID.randomUUID().toString(); try { //原子的設置key及超時時間 Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key, "true", 30, TimeUnit.SECONDS); if (!aTrue) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); if (stock > 0) { int afterStock = stock - 1; stringRedisTemplate.opsForValue().set(key, afterStock + ""); System.out.println("扣減庫存成功,剩余庫存" + afterStock); } else { System.out.println("扣減庫存失敗"); } } catch (NumberFormatException e) { e.printStackTrace(); } finally { //避免死鎖 if (uuid.equals(stringRedisTemplate.opsForValue().get(lock_key))) { stringRedisTemplate.delete(lock_key); } } return "ok"; }
這樣實現是否就完美了呢?嗯,對于并發量要求不高或者非大并發的場景的話這樣實現已經可以了。但是對于搶購 ,秒殺這樣的場景,當流量很大,這時候服務器網卡、磁盤IO、CPU負載都可能會達到極限,那么服務器對于一個請求的的響應時間勢必變得比正常情況下慢很多,那么假設就剛才設置的鎖的超時時間為10秒,如果某一個線程拿到鎖之后因為某些原因沒能在10秒內執行完畢鎖就失效了,這時候其他線程就會搶占到分布式鎖去執行業務邏輯,然后之前的線程執行完了,會去執行 finally 里的釋放鎖的代碼就會把正在占有分布式鎖的線程的鎖給釋放掉,實際上剛剛正在占有鎖的線程還沒執行完,那么其他線程就又有機會獲得鎖了...這樣整個分布式鎖就失效了,將會產生意想不到的后果。如下圖模擬了這個場景。
所以這個問題總結一下,就是因為鎖的過期時間設置的不合適或因為某些原因導致代碼執行時間大于鎖過期時間而導致并發問題以及鎖被別的線程釋放,以至于分布式鎖混亂。在簡單的說就是兩個問題,1)自己的鎖被別人釋放 2)鎖超時無法續時間。
第一個問題很好解決,在設置分布式鎖時,我們在當前線程中生產一個唯一串將value設置為這個唯一值,然后在finally塊里判斷當前鎖的value和自己設置的一樣時再去執行delete,如下:
String uuid = UUID.randomUUID().toString();try { //原子的設置key及超時時間,鎖唯一值 Boolean aTrue = stringRedisTemplate.opsForValue().setIfAbsent(lock_key,uuid,30,TimeUnit.SECONDS); //...} finally { //是自己設置的鎖再執行delete if(uuid.equals(stringRedisTemplate.opsForValue().get(lock_key))){ stringRedisTemplate.delete(lock_key);//避免死鎖 } }
問題一解決了(設想一下上述代碼還有什么問題,一會兒講),那鎖的超時時間就很關鍵了,不能太大也不能太小,這就需要評估業務代碼的執行時間,比如設置個10秒,20秒。即使是你的鎖設置了合適的超時時間,也避免不了可能會發生上述分析的因為某些原因代碼沒在正常評估的時間內執行完畢,所以這時候的解決方案就是給鎖續超時時間。大致思路就是,業務線程單獨起一個分線程,定時去監聽業務線程設置的分布式鎖是否還存在,存在就說明業務線程還沒執行完,那么就延長鎖的超時時間,若鎖已不存在則業務線程執行完畢,然后就結束自己。
“鎖續命”的這套邏輯屬實有點復雜啊,要考慮的問題太多了,稍不注意就會有bug。不要看上面實現分布式鎖的代碼沒有幾行,就認為實現起來很簡單,如果說自己去實現的時候沒有實際高并發的經驗,肯定也會踩很多坑,例如,
1)鎖的設置和過期時間的設置是非原子操作的,就可能會導致死鎖。
2)還有上面遺留的一個,在finally塊里判斷鎖是否是自己設置的,是的話再刪除鎖,這兩步操作也不是原子的,假設剛判斷完為true服務就掛了,那么刪除鎖的代碼不會執行,就會造成死鎖,即使是設置了過期時間,在沒過期這段時間也會死鎖。所以這里也是一個注意的點,要保證原子操作的話,Redis提供了執行Lua腳本的功能來保證操作的原子性,具體怎么使用不再展開。
所以,“鎖續命”的這套邏輯實現起來還是有點復雜的,好在市面上已經有現成的開源框架幫我們實現了,那就是Redisson。
回到頂部
實現原理:
1、首先Redisson會嘗試進行加鎖,加鎖的原理也是使用類似Redis的setnx命令原子的加鎖,加鎖成功的話其內部會開啟一個子線程
2、子線程主要負責監聽,其實就是一個定時器,定時監聽主線程是否還持有鎖,持有則將鎖的時間延時,否則結束線程
3、如果加鎖失敗則自旋不斷嘗試加鎖
4、執行完代碼主線程主動釋放鎖
那我們看一下使用后Redisson后的代碼是什么樣的。
1、首先在pom.xml文件添加Redisson的maven坐標
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.5</version></dependency>
2、我們要拿到Redisson的這個對象,如下配置Bean
@SpringBootApplicationpublic class RedisLockApplication { public static void main(String[] args) { SpringApplication.run(RedisLockApplication.class, args); } @Bean public Redisson redisson(){ Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379") .setDatabase(0); return (Redisson) Redisson.create(config); } }
3、然后我們獲取Redisson的實例,使用其API進行加鎖釋放鎖操作
//假設庫存編號是00001private String key = "stock:00001";private String lock_key = "lock_key:00001"; @Autowiredprivate StringRedisTemplate stringRedisTemplate;/** * 使用Redisson實現分布式鎖 * @return */@RequestMapping("/stock_redisson_lock")public String stock_redisson_lock() { RLock redissonLock = redisson.getLock(lock_key); try { redissonLock.lock(); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key)); if (stock > 0) { int afterStock = stock - 1; stringRedisTemplate.opsForValue().set(key, afterStock + ""); System.out.println("扣減庫存成功,剩余庫存" + afterStock); } else { System.out.println("扣減庫存失敗"); } } catch (NumberFormatException e) { e.printStackTrace(); } finally { redissonLock.unlock(); } return "ok"; }
看這個Redisson的分布式鎖提供的API是不是非常的簡單?就像Java并發變成里AQS那套Lock機制一樣,如下獲取一把RedissonLock
RLock redissonLock = redisson.getLock(lock_key);
默認返回的是RedissonLock的對象,該對象實現了RLock接口,而RLock接口繼承了JDK并發編程報包里的Lock接口
在使用Redisson加鎖時,它也提供了很多API,如下
現在我們選擇使用的是最簡單的無參lock方法,簡單的點進去跟一下看看他的源碼,我們找到最終的執行加鎖的代碼如下:
我們可以看到其底層使用了Lua腳本來保證原子性,使用Redis的hash結構實現的加鎖,以及可重入鎖。
比我們自己實現分布式鎖看起來還要簡單,但是我們自己寫的鎖功能他都有,我們沒有的他也有。比如,他實現的分布式鎖是支持可重入的,也支持可等待,即嘗試等待一定時間,沒拿到鎖就返回false。上述代碼中的redissonLock.lock();是一直等待,內部自旋嘗試加鎖。
Distributed Java locks and synchronizers
Lock
FairLock
MultiLock
RedLock
ReadWriteLock
Semaphore
PermitExpirableSemaphore
CountDownLatch
redisson.org
Redisson提供了豐富的API,內部運用了大量的Lua腳本保證原子操作,篇幅原因redisson實現鎖的代碼暫不分析了。
注意:在上述示例代碼中,為了方便演示,查詢redis庫存、修改庫存并非原子操作,實際這兩部操作也得保證原子行,可以用redis自帶的Lua腳本功能去實現
關于“如何用Redis實現分布式鎖”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。