您好,登錄后才能下訂單哦!
這篇文章主要介紹了怎么在SpringBoot中使用Redis實現分布式鎖的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么在SpringBoot中使用Redis實現分布式鎖文章都會有所收獲,下面我們一起來看看吧。
在聊分布式鎖之前,有必要先解釋一下,為什么需要分布式鎖
。
與分布式鎖相對就的是單機鎖,我們在寫多線程程序時,避免同時操作一個共享變量產生數據問題,通常會使用一把鎖來互斥以保證共享變量的正確性,其使用范圍是在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥呢?現在的業務應用通常是微服務架構,這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序導致臟數據,此時就需要引入分布式鎖了。
想要實現分布式鎖,必須借助一個外部系統,所有進程都去這個系統上申請加鎖。而這個外部系統,必須要實現互斥能力,即兩個請求同時進來,只會給一個進程加鎖成功,另一個失敗。這個外部系統可以是數據庫,也可以是Redis或Zookeeper,但為了追求性能,我們通常會選擇使用Redis或Zookeeper來做。
Redis本身可以被多個客戶端共享訪問,正好就是一個共享存儲系統,可以用來保存分布式鎖。而且 Redis 的讀寫性能高,可以應對高并發的鎖操作場景。本文主要探討如何基于Redis實現分布式鎖以及實現過程中可能面臨的問題。
作為分布式鎖實現過程中的共享存儲系統,Redis可以使用鍵值對來保存鎖變量,在接收和處理不同客戶端發送的加鎖和釋放鎖的操作請求。那么,鍵值對的鍵和值具體是怎么定的呢?我們要賦予鎖變量一個變量名,把這個變量名作為鍵值對的鍵,而鎖變量的值,則是鍵值對的值,這樣一來,Redis就能保存鎖變量了,客戶端也就可以通過Redis的命令操作來實現鎖操作。
想要實現分布式鎖,必須要求Redis有互斥的能力。可以使用SETNX命令,其含義是SET IF NOT EXIST,即如果key不存在,才會設置它的值,否則什么也不做。兩個客戶端進程可以執行這個命令,達到互斥,就可以實現一個分布式鎖。
以下展示了Redis使用key/value對保存鎖變量,以及兩個客戶端同時請求加鎖的操作過程。
加鎖操作完成后,加鎖成功的客戶端,就可以去操作共享資源,例如,修改MySQL的某一行數據。操作完成后,還要及時釋放鎖,給后來者讓出操作共享資源的機會。如何釋放鎖呢?直接使用DEL命令刪除這個key即可。這個邏輯非常簡單,整體的流程寫成偽代碼就是下面這樣。
// 加鎖 SETNX lock_key 1 // 業務邏輯 DO THINGS // 釋放鎖 DEL lock_key
但是,以上實現存在一個很大的問題,當客戶端1拿到鎖后,如果發生下面的場景,就會造成死鎖。
程序處理業務邏輯異常,沒及時釋放鎖進程掛了,沒機會釋放鎖
以上情況會導致已經獲得鎖的客戶端一直占用鎖,其他客戶端永遠無法獲取到鎖。
為了解決以上死鎖問題,最容易想到的方案是在申請鎖時,在Redis中實現時,給鎖設置一個過期時間,假設操作共享資源的時間不會超過10s,那么加鎖時,給這個key設置10s過期即可。
但以上操作還是有問題,加鎖、設置過期時間是2條命令,有可能只執行了第一條,第二條卻執行失敗
,例如:
1.SETNX執行成功,執行EXPIRE時由于網絡問題,執行失敗
2.SETNX執行成功,Redis異常宕機,EXPIRE沒有機會執行
3.SETNX執行成功,客戶端異常崩潰,EXPIRE沒有機會執行
總之這兩條命令如果不能保證是原子操作,就有潛在的風險導致過期時間設置失敗,依舊有可能發生死鎖問題
。幸好在Redis 2.6.12之后,Redis擴展了SET命令的參數,可以在SET的同時指定EXPIRE時間,這條操作是原子的,例如以下命令是設置鎖的過期時間為10秒。
SET lock_key 1 EX 10 NX
至此,解決了死鎖問題,但還是有其他問題。想像下面這個這樣一種場景:
客戶端1加鎖成功,開始操作共享資源
客戶端1操作共享資源耗時太久,超過了鎖的過期時間,鎖失效(鎖被自動釋放)
客戶端2加鎖成功,開始操作共享資源
客戶端1操作共享資源完成,在finally塊中手動釋放鎖,但此時它釋放的是客戶端2的鎖。
這里存在兩個嚴重的問題:
鎖過期
釋放了別人的鎖
第1個問題是評估操作共享資源的時間不準確導致的,如果只是一味增大過期時間,只能緩解問題降低出現問題的概率,依舊無法徹底解決問題。原因在于客戶端在拿到鎖之后,在操作共享資源時,遇到的場景是很復雜的,既然是預估的時間,也只能是大致的計算,不可能覆蓋所有導致耗時變長的場景
。
第2個問題是釋放了別人的鎖,原因在于釋放鎖的操作是無腦操作,并沒有檢查這把鎖的歸屬,這樣解鎖不嚴謹。如何解決呢?
解決辦法是,客戶端在加鎖時,設置一個只有自己知道的唯一標識進去,例如可以是自己的線程ID
,如果是redis實現,就是SET key unique_value EX 10 NX。之后在釋放鎖時,要先判斷這把鎖是否歸自己持有,只有是自己的才能釋放它。
//釋放鎖 比較unique_value是否相等,避免誤釋放 if redis.get("key") == unique_value then return redis.del("key")
這里釋放鎖使用的是GET + DEL兩條命令,這時又會遇到原子性
問題了。
客戶端1執行GET,判斷鎖是自己的
客戶端2執行了SET命令,強制獲取到鎖(雖然發生概念很低,但要嚴謹考慮鎖的安全性)
客戶端1執行DEL,卻釋放了客戶端2的鎖
由此可見,以上GET + DEL兩個命令還是必須原子的執行才行。怎樣原子執行兩條命令呢?答案是Lua腳本,可以把以上邏輯寫成Lua腳本,讓Redis執行。因為Redis處理每個請求是單線程執行的,在執行一個Lua腳本時其它請求必須等待,直到這個Lua腳本處理完成
,這樣一來GET+DEL之間就不會有其他命令執行了。
以下是使用Lua腳本(unlock.script)實現的釋放鎖操作的偽代碼,其中,KEYS[1]表示lock_key,ARGV[1]是當前客戶端的唯一標識,這兩個值都是我們在執行 Lua腳本時作為參數傳入的。
//Lua腳本語言,釋放鎖 比較unique_value是否相等,避免誤釋放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
最后我們執行以下命令,即可
redis-cli --eval unlock.script lock_key , unique_value
這樣一路優先下來,整個加鎖、解鎖流程就更嚴謹了,先小結一下,基于Redis實現的分布式鎖,一個嚴謹的流程如下:
加鎖時要設置過期時間SET lock_key unique_value EX expire_time NX
操作共享資源
釋放鎖:Lua腳本,先GET判斷鎖是否歸屬自己,再DEL釋放鎖
有了這個嚴謹的鎖模型,我們還需要重新思考之前的那個問題,鎖的過期時間不好評估怎么辦。
前面提到過,過期時間如果評估得不好,這個鎖就會有提前過期的風險,一種妥協的解決方案是,盡量冗余過期時間,降低鎖提前過期的概率,但這個方案并不能完美解決問題。是否可以設置這樣的方案,加鎖時,先設置一個預估的過期時間,然后開啟一個守護線程,定時去檢測這個鎖的失效時間,如果鎖快要過期了,操作共享資源還未完成,那么就自動對鎖進行續期,重新設置過期時間
。
這是一種比較好的方案,已經有一個庫把這些工作都封裝好了,它就是Redisson。Redisson是一個Java語言實現的Redis SDK客戶端,在使用分布式鎖時,它就采用了自動續期的方案來避免鎖過期,這個守護線程我們一般叫它看門狗線程。這個SDK提供的API非常友好,它可以像操作本地鎖一樣操作分布式鎖。客戶端一旦加鎖成功,就會啟動一個watch dog看門狗線程,它是一個后臺線程,會每隔一段時間(這段時間的長度與設置的鎖的過期時間有關)檢查一下,如果檢查時客戶端還持有鎖key(也就是說還在操作共享資源),那么就會延長鎖key的生存時間。
那如果客戶端在加鎖成功后就宕機了呢?宕機了那么看門狗任務就不存在了,也就無法為鎖續期了,鎖到期自動失效。
上面討論的情況,都是鎖在單個Redis 實例中可能產生的問題,并沒有涉及到Redis的部署架構細節。
Redis發展到現在,幾種常見的部署架構有:
單機模式;
主從模式;
哨兵(sentinel)模式;
集群模式;
我們使用Redis時,一般會采用主從集群+哨兵的模式部署,哨兵的作用就是監測redis節點的運行狀態。普通的主從模式,當master崩潰時,需要手動切換讓slave成為master,使用主從+哨兵結合的好處在于,當master異常宕機時,哨兵可以實現故障自動切換,把slave提升為新的master,繼續提供服務,以此保證可用性
。那么當主從發生切換時,分布式鎖依舊安全嗎?
想像這樣的場景:
客戶端1在master上執行SET命令,加鎖成功
此時,master異常宕機,SET命令還未同步到slave上(主從復制是異步的)
哨兵將slave提升為新的master,但這個鎖在新的master上丟失了,導致客戶端2來加鎖成功了,兩個客戶端共同操作共享資源
可見,當引入Redis副本后,分布式鎖還是可能受到影響。即使Redis通過sentinel保證高可用,如果這個master節點由于某些原因發生了主從切換,那么就會出現鎖丟失的情況。
集群模式+Redlock實現高可靠的分布式鎖
為了避免Redis實例故障而導致的鎖無法工作的問題,Redis的開發者 Antirez提出了分布式鎖算法Redlock。Redlock算法的基本思路,是讓客戶端和多個獨立的Redis實例依次請求加鎖,如果客戶端能夠和半數以上的實例成功地完成加鎖操作,那么我們就認為,客戶端成功地獲得分布式鎖了,否則加鎖失敗
。這樣一來,即使有單個Redis實例發生故障,因為鎖變量在其它實例上也有保存,所以,客戶端仍然可以正常地進行鎖操作,鎖變量并不會丟失。
來具體看下Redlock算法的執行步驟。Redlock算法的實現要求Redis采用集群部署模式,無哨兵節點,需要有N個獨立的Redis實例(官方推薦至少5個實例)。接下來,我們可以分成3步來完成加鎖操作。
第一步是,客戶端獲取當前時間。
第二步是,客戶端按順序依次向N個Redis實例執行加鎖操作。
這里的加鎖操作和在單實例上執行的加鎖操作一樣,使用SET命令,帶上NX、EX/PX選項,以及帶上客戶端的唯一標識。當然,如果某個Redis實例發生故障了,為了保證在這種情況下,Redlock算法能夠繼續運行,我們需要給加鎖操作設置一個超時時間。如果客戶端在和一個Redis實例請求加鎖時,一直到超時都沒有成功,那么此時,客戶端會和下一個Redis實例繼續請求加鎖。加鎖操作的超時時間需要遠遠地小于鎖的有效時間,一般也就是設置為幾十毫秒。
第三步是,一旦客戶端完成了和所有Redis實例的加鎖操作,客戶端就要計算整個加鎖過程的總耗時。
客戶端只有在滿足兩個條件時,才能認為是加鎖成功,條件一是客戶端從超過半數(大于等于 N/2+1)的Redis實例上成功獲取到了鎖;條件二是客戶端獲取鎖的總耗時沒有超過鎖的有效時間。
為什么大多數實例加鎖成功才能算成功呢?多個Redis實例一起來用,其實就組成了一個分布式系統。在分布式系統中總會出現異常節點,所以在談論分布式系統時,需要考慮異常節點達到多少個,也依舊不影響整個系統的正確運行。這是一個分布式系統的容錯問題,這個問題的結論是:如果只存在故障節點,只要大多數節點正常,那么整個系統依舊可以提供正確服務。
在滿足了這兩個條件后,我們需要重新計算這把鎖的有效時間,計算的結果是鎖的最初有效時間減去客戶端為獲取鎖的總耗時。如果鎖的有效時間已經來不及完成共享數據的操作了,我們可以釋放鎖,以免出現還沒完成共享資源操作,鎖就過期了的情況
。
當然,如果客戶端在和所有實例執行完加鎖操作后,沒能同時滿足這兩個條件,那么,客戶端就要向所有Redis節點發起釋放鎖的操作
。為什么釋放鎖,要操作所有的節點呢,不能只操作那些加鎖成功的節點嗎?因為在某一個Redis節點加鎖時,可能因為網絡原因導致加鎖失敗,例如一個客戶端在一個Redis實例上加鎖成功,但在讀取響應結果時由于網絡問題導致讀取失敗,那這把鎖其實已經在Redis上加鎖成功了。所以釋放鎖時,不管之前有沒有加鎖成功,需要釋放所有節點上的鎖以保證清理節點上的殘留的鎖
。
在Redlock算法中,釋放鎖的操作和在單實例上釋放鎖的操作一樣,只要執行釋放鎖的 Lua腳本就可以了。這樣一來,只要N個Redis實例中的半數以上實例能正常工作,就能保證分布式鎖的正常工作了。所以,在實際的業務應用中,如果你想要提升分布式鎖的可靠性,就可以通過Redlock算法來實現。
<!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
package com.example.redisdemo.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @description: Redis配置類 * @author Keson * @date 21:20 2022/11/14 * @Param * @return * @version 1.0 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 設置序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer);// key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化 redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
package com.example.redisdemo.service; import com.example.redisdemo.entity.CustomerBalance; import java.util.concurrent.Callable; /** * @author Keson * @version 1.0 * @description: TODO * @date 2022/11/14 15:12 */ public interface RedisService { <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception; }
package com.example.redisdemo.service.impl; import com.example.redisdemo.entity.CustomerBalance; import com.example.redisdemo.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author Keson * @version 1.0 * @description: TODO Redis實現分布式鎖 * @date 2022/11/14 15:13 */ @Service @Slf4j public class RedisServiceImpl implements RedisService { //設置默認過期時間 private final static int DEFAULT_LOCK_EXPIRY_TIME = 20; //自定義lock key前綴 private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE"; @Autowired private RedisTemplate redisTemplate; @Override public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{ //自定義lock key String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode()); //將UUID當做value,確保唯一性 String lockReference = UUID.randomUUID().toString(); try { if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) { throw new Exception("lock加鎖失敗"); } return callable.call(); } finally { unlock(lockKey, lockReference); } } //定義lock key String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) { return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode); } //redis加鎖 private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) { Boolean locked; try { //SET_IF_ABSENT --> NX: Only set the key if it does not already exist. //SET_IF_PRESENT --> XX: Only set the key if it already exist. locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT)); } catch (Exception e) { log.error("Lock failed for redis key: {}, value: {}", key, value); locked = false; } return locked != null && locked; } //redis解鎖 private boolean unlock(String key, String value) { try { //使用lua腳本保證刪除的原子性,確保解鎖 String script = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then return redis.call('del', KEYS[1]) " + "else return 0 end"; Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8))); return unlockState == null || !unlockState; } catch (Exception e) { log.error("unLock failed for redis key: {}, value: {}", key, value); return false; } } }
@Override public int updateById(CustomerBalance customerBalance) throws Exception { return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance)); }
關于“怎么在SpringBoot中使用Redis實現分布式鎖”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么在SpringBoot中使用Redis實現分布式鎖”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。