您好,登錄后才能下訂單哦!
分布式鎖
分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。在分布式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分布式鎖。
在分布式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,這個時候,便需要使用到分布式鎖。
針對共享資源要求串行化處理,才能保證安全且合理的操作。
用一張圖來體驗一下:
此時,使用Java提供的Synchronized、ReentrantLock、ReentrantReadWriteLock...,僅能在單個JVM進程內對多線程對共享資源保證線程安全,在分布式系統環境下統統都不好使,心情是不是拔涼呀。這個問題得請教 分布式鎖
家族來支持一下,聽說他們家族內有很多成員,每個成員都有這個分布式鎖功能,接下來就開始探索一下。
聽聽 Martin 大佬們給出的說法:
Martin kleppmann 是英國劍橋大學的分布式系統的研究員,曾經跟 Redis 之父 Antirez 進行過關于 RedLock (Redis里分布式鎖的實現算法)是否安全的激烈討論。
都能單獨寫篇文章了
效率:
使用分布式鎖可以避免多個客戶端重復相同的工作,這些工作會浪費資源。比如用戶支付完成后,可能會收到多次短信或郵件提醒。
比如業務場景二,重復獲取access_token。
對共享資源的操作是冪等性操作,無論你操作多少次都不會出現不同結果。
本質上就是為了避免對共享資源重復操作,從而提高效率。
正確性:
使用分布式鎖同樣可以避免鎖失效的發生,一旦發生會引起正確性的破壞,可能會導致數據不一致,數據缺失或者其他嚴重的問題。
比如業務場景一,商品庫存超賣問題。
對共享資源的操作是非冪等性操作,多個客戶端操作共享資源會導致數據不一致。
以下是分布式鎖的一些特點,分布式鎖家族成員并不一定都滿足這個要求,實現機制不大一樣。
互斥性: 分布式鎖要保證在多個客戶端之間的互斥。
可重入性:同一客戶端的相同線程,允許重復多次加鎖。
鎖超時:和本地鎖一樣支持鎖超時,防止死鎖。
非阻塞: 能與 ReentrantLock 一樣支持 trylock() 非阻塞方式獲得鎖。
支持公平鎖和非公平鎖:公平鎖是指按照請求加鎖的順序獲得鎖,非公平鎖真好相反請求加鎖是無序的。
分布式鎖家族實現者一覽:
思維導圖做了一個簡單分類,不一定特別準確,幾乎包含了分布式鎖各個組件實現者。
下面讓他們分別來做下自我介紹:
1、數據庫
排它鎖(悲觀鎖):基于 select * from table where xx=yy for update
SQL語句來實現,有很多缺陷,一般不推薦使用,后文介紹。
樂觀鎖:表中添加一個時間戳或者版本號的字段來實現,update xx set version = new... where id = y and version = old
當更新不成功,客戶端重試,重新讀取最新的版本號或時間戳,再次嘗試更新,類似 CAS
機制,推薦使用。
2、Redis
特點:CAP模型屬于AP | 無一致性算法 | 性能好
開發常用,如果你的項目中正好使用了redis,不想引入額外的分布式鎖組件,推薦使用。
業界也提供了多個現成好用的框架予以支持分布式鎖,比如Redisson、spring-integration-redis、redis自帶的setnx命令,推薦直接使用。
另外,可基于redis命令和redis lua支持的原子特性,自行實現分布式鎖。
3、Zookeeper
特點:CAP模型屬于CP | ZAB一致性算法實現 | 穩定性好
開發常用,如果你的項目中正好使用了zk集群,推薦使用。
業界有Apache Curator框架提供了現成的分布式鎖功能,現成的,推薦直接使用。
另外,可基于Zookeeper自身的特性和原生Zookeeper API自行實現分布式鎖。
4、其他
Chubby,Google開發的粗粒度分布鎖的服務,但是并沒有開源,開放出了論文和一些相關文檔可以進一步了解,出門百度一下獲取文檔,不做過多討論。
Tair,是阿里開源的一個分布式KV存儲方案,沒有用過,不做過多討論。
Etcd,CAP模型中屬于CP,Raft一致性算法實現,沒有用過,不做過多討論。
Hazelcast,是基于內存的數據網格開源項目,提供彈性可擴展的分布式內存計算,并且被公認是提高應用程序性能和擴展性最好的方案,聽上去很牛逼,但是沒用過,不做過多討論。
當然了,上面推薦的常用分布式鎖Zookeeper和Redis,使用時還需要根據具體的業務場景,做下權衡,實現功能上都能達到你要的效果,原理上有很大的不同。
以「悲觀的心態」操作資源,無法獲得鎖成功,就一直阻塞著等待。
1、有一張資源鎖表
CREATE TABLE `resource_lock` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的資源名',
`owner` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖擁有者',
`desc` varchar(1024) NOT NULL DEFAULT '備注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的資源';
resource_name 鎖資源名稱必須有唯一索引。
2、使用姿勢
必須添加事務
,查詢和更新操作保證原子性,在一個事務里完成。
偽代碼實現:
@Transaction
public void lock(String name) {
ResourceLock rlock = exeSql("select * from resource_lock where resource_name = name for update");
if (rlock == null) {
exeSql("insert into resource_lock(reosurce_name,owner,count) values (name, 'ip',0)");
}
}
使用 for update
鎖定的資源。
如果執行成功,會立即返回,執行插入數據庫,后續再執行一些其他業務邏輯,直到事務提交,執行結束;
如果執行失敗,就會一直阻塞著。
你也可以在數據庫客戶端工具上測試出來這個效果,當在一個終端執行了 for update,不提交事務。
在另外的終端上執行相同條件的 for update,會一直卡著,轉圈圈...
雖然也能實現分布式鎖的效果,但是會存在性能瓶頸。
3、悲觀鎖優缺點
優點:簡單易用,好理解,保障數據強一致性。
缺點一大堆,羅列一下:
1)在 RR 事務級別,select 的 for update 操作是基于間隙鎖(gap lock)
實現的,是一種悲觀鎖的實現方式,所以存在阻塞問題
。
2)高并發情況下,大量請求進來,會導致大部分請求進行排隊,影響數據庫穩定性,也會耗費
服務的CPU等資源
。
當獲得鎖的客戶端等待時間過長時,會提示:
[40001][1205] Lock wait timeout exceeded; try restarting transaction
高并發情況下,也會造成占用過多的應用線程,導致業務無法正常響應。
3)如果優先獲得鎖的線程因為某些原因,一直沒有釋放掉鎖,可能會導致死鎖
的發生。
4)鎖的長時間不釋放,會一直占用數據庫連接,可能會將數據庫連接池撐爆
,影響其他服務。
5) MySql數據庫會做查詢優化,即便使用了索引,優化時發現全表掃效率更高,則可能會將行鎖升級為表鎖,此時可能就更悲劇了。
6)不支持可重入特性,并且超時等待時間是全局的,不能隨便改動。
樂觀鎖,以「樂觀的心態」來操作共享資源,無法獲得鎖成功,沒關系過一會重試一下看看唄,再不行就直接退出,嘗試一定次數還是不行?也可以以后再說,不用一直阻塞等著。
1、有一張資源表
為表添加一個字段,版本號或者時間戳都可以。通過版本號或者時間戳,來保證多線程同時間操作共享資源的有序性和正確性。
CREATE TABLE `resource` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '資源名',
`share` varchar(64) NOT NULL DEFAULT '' COMMENT '狀態',
`version` int(4) NOT NULL DEFAULT '' COMMENT '版本號',
`desc` varchar(1024) NOT NULL DEFAULT '備注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='資源';
2、使用姿勢
偽代碼實現:
Resrouce resource = exeSql("select * from resource where resource_name = xxx");
boolean succ = exeSql("update resource set version= 'newVersion' ... where resource_name = xxx and version = 'oldVersion'");
if (!succ) {
// 發起重試
}
實際代碼中可以寫個while循環不斷重試,版本號不一致,更新失敗,重新獲取新的版本號,直到更新成功。
3、樂觀鎖優缺點
優點:簡單易用,保障數據一致性。
缺點:
1)加行鎖的性能上有一定的開銷
2)高并發場景下,線程內的自旋操作
會耗費一定的CPU資源。
另外,比如在更新數據狀態的一些場景下,不考慮冪等性的情況下,可以直接利用 行鎖
來保證數據一致性,示例:update table set state = 1 where id = xxx and state = 0;
樂觀鎖就類似 CAS
Compare And Swap 更新機制,推薦閱讀 <<一文徹底搞懂CAS>>
基于Redis實現的分布式鎖,性能上是最好的,實現上也是最復雜的。
前文中提到的 RedLock 是 Redis 之父 Antirez 提出來的分布式鎖的一種 「健壯」 的實現算法,但爭議也較多,一般不推薦使用。
Redis 2.6.12 之前的版本中采用 setnx + expire 方式實現分布式鎖,示例代碼如下所示:
public static boolean lock(Jedis jedis, String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
//設置鎖
if (result == 1) {
//獲取鎖成功
//若在這里程序突然崩潰,則無法設置過期時間,將發生死鎖
//通過過期時間刪除鎖
jedis.expire(lockKey, expireTime);
return true;
}
return false;
}
如果 lockKey 存在,則返回失敗,否則返回成功。設置成功之后,為了能在完成同步代碼之后成功釋放鎖,方法中使用 expire() 方法給 lockKey 設置一個過期時間,確認 key 值刪除,避免出現鎖無法釋放,導致下一個線程無法獲取到鎖,即死鎖問題。
但是 setnx + expire 兩個命令放在程序里執行,不是原子操作,容易出事。
如果程序設置鎖之后,此時,在設置過期時間之前,程序崩潰了,如果 lockKey 沒有設置上過期時間,將會出現死鎖問題
。
解決以上問題 ,有兩個辦法:
1)方式一:lua腳本
我們也可以通過 Lua 腳本來實現鎖的設置和過期時間的原子性,再通過 jedis.eval() 方法運行該腳本:
// 加鎖腳本,KEYS[1] 要加鎖的key,ARGV[1]是UUID隨機值,ARGV[2]是過期時間
private static final String SCRIPT_LOCK = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('pexpire', KEYS[1], ARGV[2]) return 1 else return 0 end";
// 解鎖腳本,KEYS[1]要解鎖的key,ARGV[1]是UUID隨機值
private static final String SCRIPT_UNLOCK = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
2)方式二:set原生命令
在 Redis 2.6.12 版本后 SETNX 增加了過期時間參數:
SET lockKey anystring NX PX max-lock-time
程序實現代碼如下:
public static boolean lock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
}
雖然 SETNX 方式能夠保證設置鎖和過期時間的原子性,但是如果我們設置的過期時間比較短,而執行業務時間比較長,就會存在鎖代碼塊失效的問題,失效后其他客戶端也能獲取到同樣的鎖,執行同樣的業務,此時可能就會出現一些問題。
我們需要將過期時間設置得足夠長,來保證以上問題不會出現,但是設置多長時間合理,也需要依具體業務來權衡。如果其他客戶端必須要阻塞拿到鎖,需要設計循環超時等待機制等問題,感覺還挺麻煩的是吧。
除了使用Jedis客戶端之外,完全可以直接用Spring官方提供的企業集成模式
框架,里面提供了很多分布式鎖的方式,Spring提供了一個統一的分布式鎖抽象,具體實現目前支持:
早期,分布式鎖的相關代碼存在于Spring Cloud的子項目Spring Cloud Cluster中,后來被遷到Spring Integration中。
Spring Integration 項目地址 :https://github.com/spring-projects/spring-integration
Spring強大之處在于此,對Lock
分布式鎖做了全局抽象。
抽象結構如下所示:
LockRegistry
作為頂層抽象接口:
/**
* Strategy for maintaining a registry of shared locks
*
* @author Oleg Zhurakousky
* @author Gary Russell
* @since 2.1.1
*/
@FunctionalInterface
public interface LockRegistry {
/**
* Obtains the lock associated with the parameter object.
* @param lockKey The object with which the lock is associated.
* @return The associated lock.
*/
Lock obtain(Object lockKey);
}
定義的 obtain()
方法獲得具體的 Lock
實現類,分別在對應的 XxxLockRegitry 實現類來創建。
RedisLockRegistry 里obtain()方法實現類為 RedisLock
,RedisLock內部,在Springboot2.x(Spring5)版本中是通過SET + PEXIPRE 命令結合lua腳本實現的,在Springboot1.x(Spring4)版本中,是通過SETNX命令實現的。
ZookeeperLockRegistry 里obtain()方法實現類為 ZkLock
,ZkLock內部基于 Apache Curator 框架實現的。
JdbcLockRegistry 里obtain()方法實現類為 JdbcLock
,JdbcLock內部基于一張INT_LOCK
數據庫鎖表實現的,通過JdbcTemplate來操作。
客戶端使用方法:
private final String registryKey = "sb2";
RedisLockRegistry lockRegistry = new RedisLockRegistry(getConnectionFactory(), this.registryKey);
Lock lock = lockRegistry.obtain("foo");
lock.lock();
try {
// doSth...
}
finally {
lock.unlock();
}
}
下面以目前最新版本的實現,說明加鎖和解鎖的具體過程。
RedisLockRegistry$RedisLock類lock()加鎖流程:
加鎖步驟:
1)lockKey為registryKey:path,本例中為sb2:foo,客戶端C1優先申請加鎖。
2)執行lua腳本,get lockKey不存在,則set lockKey成功,值為clientid(UUID),過期時間默認60秒。
3)客戶端C1同一個線程重復加鎖,pexpire lockKey,重置過期時間為60秒。
4)客戶端C2申請加鎖,執行lua腳本,get lockKey已存在,并且跟已加鎖的clientid不同,加鎖失敗
5)客戶端C2掛起,每隔100ms再次嘗試加鎖。
RedisLock#lock()加鎖源碼實現:
大家可以對照上面的流程圖配合你理解。
@Override
public void lock() {
this.localLock.lock();
while (true) {
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
break;
}
catch (InterruptedException e) {
/*
* This method must be uninterruptible so catch and ignore
* interrupts and only break out of the while loop when
* we get the lock.
*/
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
// 基于Spring封裝的RedisTemplate來操作的
private boolean obtainLock() {
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
boolean result = Boolean.TRUE.equals(success);
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}
執行的lua腳本代碼:
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
RedisLockRegistry$RedisLock類unlock()解鎖流程:
RedisLock#unlock()源碼實現:
@Override
public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
return;
}
try {
if (!isAcquiredInThisProcess()) {
throw new IllegalStateException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(this::removeLockKey);
}
else {
removeLockKey();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Released lock; " + this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
this.localLock.unlock();
}
}
// 刪除緩存Key
private void removeLockKey() {
if (this.unlinkAvailable) {
try {
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
catch (Exception ex) {
LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
"falling back to the regular DELETE command", ex);
this.unlinkAvailable = false;
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
unlock()解鎖方法里發現,并不是直接就調用Redis的DEL命令刪除Key,這也是在Springboot2.x版本中做的一個優化,Redis4.0版本以上提供了UNLINK命令。
換句話說,最新版本分布式鎖實現,要求是Redis4.0以上版本才能使用。
看下Redis官網給出的一段解釋:
This command is very similar to DEL: it removes the specified keys.
Just like DEL a key is ignored if it does not exist. However the
command performs the actual memory reclaiming in a different thread,
so it is not blocking, while DEL is. This is where the command name
comes from: the command just unlinks the keys from the keyspace. The
actual removal will happen later asynchronously.
DEL始終在阻止模式下釋放值部分。但如果該值太大,如對于大型LIST或HASH的分配太多,它會長時間阻止Redis,為了解決這個問題,Redis實現了UNLINK命令,即「非阻塞」刪除。如果值很小,則DEL一般與UNLINK效率上差不多。
本質上,這種加鎖方式還是使用的SETNX實現的,而且Spring只是做了一層薄薄的封裝,支持可重入加鎖,超時等待,可中斷加鎖。
但是有個問題,鎖的過期時間不能靈活設置,客戶端初始化時,創建RedisLockRegistry時允許設置,但是是全局的。
/**
* Constructs a lock registry with the supplied lock expiration.
* @param connectionFactory The connection factory.
* @param registryKey The key prefix for locks.
* @param expireAfter The expiration in milliseconds.
*/
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
Assert.notNull(registryKey, "'registryKey' cannot be null");
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.registryKey = registryKey;
this.expireAfter = expireAfter;
}
expireAfter參數是全局的,同樣會存在問題,可能是鎖過期時間到了,但是業務還沒有處理完,這把鎖又被另外的客戶端獲得,進而會導致一些其他問題。
經過對源碼的分析,其實我們也可以借鑒RedisLockRegistry實現的基礎上,自行封裝實現分布式鎖,比如:
1、允許支持按照不同的Key設置過期時間,而不是全局的?
2、當業務沒有處理完成,當前客戶端啟動個定時任務探測,自動延長過期時間?
自己實現?嫌麻煩?別急別急!業界已經有現成的實現方案了,那就是 Redisson
框架!
從Redis主從
架構上來考慮,依然存在問題。因為 Redis 集群數據同步到各個節點時是異步的,如果在 Master 節點獲取到鎖后,在沒有同步到其它節點時,Master 節點崩潰了,此時新的 Master 節點依然可以獲取鎖,所以多個應用服務可以同時獲取到鎖。
基于以上的考慮,Redis之父Antirez提出了一個RedLock算法
。
RedLock算法實現過程分析:
假設Redis部署模式是Redis Cluster,總共有5個master節點,通過以下步驟獲取一把鎖:
1)獲取當前時間戳,單位是毫秒
2)輪流嘗試在每個master節點上創建鎖,過期時間設置較短,一般就幾十毫秒
3)嘗試在大多數節點上建立一個鎖,比如5個節點就要求是3個節點(n / 2 +1)
4)客戶端計算建立好鎖的時間,如果建立鎖的時間小于超時時間,就算建立成功了
5)要是鎖建立失敗了,那么就依次刪除這個鎖
6)只要有客戶端創建成功了分布式鎖,其他客戶端就得不斷輪詢去嘗試獲取鎖
以上過程前文也提到了,進一步分析RedLock算法的實現依然可能存在問題,也是Martain和Antirez兩位大佬爭論的焦點。
問題1:節點崩潰重啟
節點崩潰重啟,會出現多個客戶端持有鎖。
假設一共有5個Redis節點:A、B、 C、 D、 E。設想發生了如下的事件序列:
1)客戶端C1成功對Redis集群中A、B、C三個節點加鎖成功(但D和E沒有鎖住)。
2)節點C Duang的一下,崩潰重啟了,但客戶端C1在節點C加鎖未持久化完,丟了。
3)節點C重啟后,客戶端C2成功對Redis集群中C、D、 E嘗試加鎖成功了。
這樣,悲劇了吧!客戶端C1和C2同時獲得了同一把分布式鎖。
為了應對節點重啟引發的鎖失效問題,Antirez提出了延遲重啟
的概念,即一個節點崩潰后,先不立即重啟它,而是等待一段時間再重啟,等待的時間大于鎖的有效時間。
采用這種方式,這個節點在重啟前所參與的鎖都會過期,它在重啟后就不會對現有的鎖造成影響。
這其實也是通過人為補償措施,降低不一致發生的概率。
問題2:時鐘跳躍
假設一共有5個Redis節點:A、B、 C、 D、 E。設想發生了如下的事件序列:
1)客戶端C1成功對Redis集群中A、B、 C三個節點成功加鎖。但因網絡問題,與D和E通信失敗。
2)節點C上的時鐘發生了向前跳躍,導致它上面維護的鎖快速過期。
3)客戶端C2對Redis集群中節點C、 D、 E成功加了同一把鎖。
此時,又悲劇了吧!客戶端C1和C2同時都持有著同一把分布式鎖。
為了應對時鐘跳躍
引發的鎖失效問題,Antirez提出了應該禁止人為修改系統時間,使用一個不會進行「跳躍式」調整系統時鐘的ntpd程序。這也是通過人為補償措施,降低不一致發生的概率。
但是...,RedLock算法并沒有解決,操作共享資源超時,導致鎖失效的問題。
存在這么大爭議的算法實現,還是不推薦使用的。
一般情況下,本文鎖介紹的框架提供的分布式鎖實現已經能滿足大部分需求了。
上述,我們對spring-integration-redis實現原理進行了深入分析,還對RedLock存在爭議的問題做了分析。
除此以外,我們還提到了spring-integration中集成了 Jdbc、Zookeeper、Gemfire實現的分布式鎖,Gemfire和Jdbc大家感興趣可以自行去看下。
為啥還要提供個Jdbc分布式鎖實現?
猜測一下,當你的應用并發量也不高,比如是個后臺業務,而且還沒依賴Zookeeper、Redis等額外的組件,只依賴了數據庫。
但你還想用分布式鎖搞點事兒,那好辦,直接用spring-integration-jdbc即可,內部也是基于數據庫行鎖來實現的,需要你提前建好鎖表
,創建表的SQL長這樣:
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE DATETIME(6) NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
) ENGINE=InnoDB;
具體實現邏輯也非常簡單,大家自己去看吧。
集成的Zookeeper實現的分布式鎖,因為是基于Curator框架實現的,不在本節展開,后續會有分析。
Redisson 是 Redis 的 Java 實現的客戶端,其 API 提供了比較全面的 Redis 命令的支持。
Jedis 簡單使用阻塞的 I/O 和 Redis 交互,Redission 通過 Netty 支持非阻塞 I/O。
Redisson 封裝了鎖的實現,讓我們像操作我們的本地 Lock 一樣去使用,除此之外還有對集合、對象、常用緩存框架等做了友好的封裝,易于使用。
截止目前,Github上 Star 數量為 11.8k,說明該開源項目值得關注和使用。
Redisson分布式鎖Github:
https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers
Redisson 可以便捷的支持多種Redis部署架構:
1) Redis 單機
2) Master-Slave + Sentinel 哨兵
3) Redis-Cluster集群
// Master-Slave配置
Config config = new Config();
MasterSlaveServersConfig serverConfig = config.useMasterSlaveServers()
.setMasterAddress("")
.addSlaveAddress("")
.setReadMode(ReadMode.SLAVE)
.setMasterConnectionPoolSize(maxActiveSize)
.setMasterConnectionMinimumIdleSize(maxIdleSize)
.setSlaveConnectionPoolSize(maxActiveSize)
.setSlaveConnectionMinimumIdleSize(maxIdleSize)
.setConnectTimeout(CONNECTION_TIMEOUT_MS) // 默認10秒
.setTimeout(socketTimeout)
;
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");
// 獲得鎖
lock.lock();
// 等待10秒未獲得鎖,自動釋放
lock.lock(10, TimeUnit.SECONDS);
// 等待鎖定時間不超過100秒
// 10秒后自動釋放鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
使用上非常簡單,RedissonClient客戶端提供了眾多的接口實現,支持可重入鎖、、公平鎖、讀寫鎖、鎖超時、RedLock等都提供了完整實現。
lock()加鎖流程:
為了兼容老的版本,Redisson里都是通過lua腳本執行Redis命令的,同時保證了原子性操作。
加鎖執行的lua腳本:
Redis里的Hash散列結構存儲的。
參數解釋:
KEY[1]:要加鎖的Key名稱,比如示例中的myLock。
ARGV[1]:針對加鎖的Key設置的過期時間
ARGV[2]:Hash結構中Key名稱,lockName為UUID:線程ID
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
1)客戶端C1申請加鎖,key為myLock。
2)如果key不存在,通過hset設置值,通過pexpire設置過期時間。同時開啟Watchdog任務,默認每隔10秒中判斷一下,如果key還在,重置過期時間到30秒。
開啟WatchDog源碼:
3)客戶端C1相同線程再次加鎖,如果key存在,判斷Redis里Hash中的lockName跟當前線程lockName相同,則將Hash中的lockName的值加1,代表支持可重入加鎖。
4)客戶單C2申請加鎖,如果key存在,判斷Redis里Hash中的lockName跟當前線程lockName不同,則執行pttl返回剩余過期時間。
5)客戶端C2線程內不斷嘗試pttl時間,此處是基于Semaphore信號量實現的,有許可立即返回,否則等到pttl時間還是沒有得到許可,繼續重試。
重試源碼:
Redisson這樣的實現就解決了,當業務處理時間比過期時間長的問題。
同時,Redisson 還自己擴展 Lock 接口,叫做 RLock 接口,擴展了很多的鎖接口,比如給 Key 設定過期時間,非阻塞+超時時間等。
void lock(long leaseTime, TimeUnit unit);
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
redisson里的WatchDog(看門狗)邏輯保證了沒有死鎖發生。
如果客戶端宕機了,WatchDog任務也就跟著停掉了。此時,不會對Key重置過期時間了,等掛掉的客戶端持有的Key過期時間到了,鎖自動釋放,其他客戶端嘗試獲得這把鎖。
可以進一步看官網的關于WatchDog描述:
If Redisson instance which acquired lock crashes then such lock could hang forever in acquired state. To avoid this Redisson maintains lock watchdog, it prolongs lock expiration while lock holder Redisson instance is alive. By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout setting.
unlock()解鎖過程也是同樣的,通過lua腳本執行一大坨指令的。
解鎖lua腳本:
根據剛剛對加鎖過程的分析,大家可以自行看下腳本分析下。
Zookeeper 是一種提供「分布式服務協調」的中心化服務,是以 Paxos 算法為基礎實現的。Zookeeper數據節點和文件目錄類似,同時具有Watch機制,基于這兩個特性,得以實現分布式鎖功能。
數據節點:
順序臨時節點:Zookeeper 提供一個多層級的節點命名空間(節點稱為 Znode),每個節點都用一個以斜杠(/)分隔的路徑來表示,而且每個節點都有父節點(根節點除外),非常類似于文件系統。
節點類型可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),每個節點還能被標記為有序性(SEQUENTIAL),一旦節點被標記為有序性,那么整個節點就具有順序自增的特點。
一般我們可以組合這幾類節點來創建我們所需要的節點,例如,創建一個持久節點作為父節點,在父節點下面創建臨時節點,并標記該臨時節點為有序性。
Watch 機制:
Zookeeper 還提供了另外一個重要的特性,Watcher(事件監聽器)。
ZooKeeper 允許用戶在指定節點上注冊一些 Watcher,并且在一些特定事件觸發的時候,ZooKeeper 服務端會將事件通知給用戶。
圖解Zookeeper實現分布式鎖:
首先,我們需要建立一個父節點,節點類型為持久節點(PERSISTENT)如圖中的 /locks/lock_name1
節點 ,每當需要訪問共享資源時,就會在父節點下建立相應的順序子節點,節點類型為臨時節點(EPHEMERAL),且標記為有序性(SEQUENTIAL),并且以臨時節點名稱 + 父節點名稱 + 順序號組成特定的名字,如圖中的 /0000000001 /0000000002 /0000000003
作為臨時有序節點。
在建立子節點后,對父節點下面的所有以臨時節點名稱 name 開頭的子節點進行排序,判斷剛剛建立的子節點順序號是否是最小的節點,如果是最小節點,則獲得鎖。
如果不是最小節點,則阻塞等待鎖,并且獲得該節點的上一順序節點,為其注冊監聽事件,等待節點對應的操作獲得鎖。當調用完共享資源后,刪除該節點,關閉 zk,進而可以觸發監聽事件,釋放該鎖。
// 加鎖
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
// ... 其他代碼略
InterProcessMutex 是 Curator 實現的可重入鎖,可重入鎖源碼過程分析:
加鎖流程:
1)可重入鎖記錄在 ConcurrentMap<Thread, LockData> threadData 這個 Map 里面。
2)如果 threadData.get(currentThread) 是有值的那么就證明是可重入鎖,然后記錄就會加 1。
3)資源目錄下創建一個節點:比如這里創建一個 /0000000002 這個節點,這個節點需要設置為 EPHEMERAL_SEQUENTIAL 也就是臨時節點并且有序。
4)獲取當前目錄下所有子節點,判斷自己的節點是否是最小的節點。
5)如果是最小的節點,則獲取到鎖。如果不是最小的節點,則證明前面已經有人獲取到鎖了,那么需要獲取自己節點的前一個節點。
6)節點 /0000000002 的前一個節點是 /0000000001,我們獲取到這個節點之后,再上面注冊 Watcher,Watcher 調用的是 object.notifyAll(),用來解除阻塞。
7)object.wait(timeout) 或 object.wait() 進行阻塞等待
解鎖流程:
1)如果可重入鎖次數減1后,加鎖次數不為 0 直接返回,減1后加鎖次數為0,繼續。
2)刪除當前節點。
3)刪除 threadDataMap 里面的可重入鎖的數據。
上面介紹的諸如Apache Curator、Redisson、Spring框架集成的分布式鎖,既然是框架實現,會考慮用戶需求,盡量設計和實現通用的分布式鎖接口。
基本都涵蓋了如下的方式實現:
當然,Redisson和Curator都是自己定義的分布式鎖接口實現的,易于擴展。
Curator里自定義了InterProcessLock接口,Redisson里自定義RLock接口,繼承了 java.util.concurrent.locks.Lock接口。
對于Redis實現的分布式鎖:
大部分需求下,不會遇到「極端復雜場景」,基于Redis實現分布式鎖很常用,性能也高。
它獲取鎖的方式簡單粗暴,獲取不到鎖直接不斷嘗試獲取鎖,比較消耗性能。
另外來說的話,redis的設計定位決定了它的數據并不是強一致性的,沒有一致性算法,在某些極端情況下,可能會出現問題,鎖的模型不夠健壯。
即便有了Redlock算法的實現,但存在爭議,某些復雜場景下,也無法保證其實現完全沒有問題,并且也是比較消耗性能的。
對于Zookeeper實現的分布式鎖:
Zookeeper優點:
天生設計定位是分布式協調,強一致性。鎖的模型健壯、簡單易用、適合做分布式鎖。
如果獲取不到鎖,只需要添加一個監聽器就可以了,不用一直輪詢,性能消耗較小。
如果客戶端宕機,也沒關系,臨時節點會自動刪除,觸發監聽器通知下一個節點。
Zookeeper缺點:
若有大量的客戶端頻繁的申請加鎖、釋放鎖,對于ZK集群的壓力會比較大。
另外,本文對spring-integration集成redis做了詳細分析,推薦可以直接使用,更推薦直接使用 Redisson
,實現了非常多的分布式鎖各種機制,有單獨開放Springboot集成的jar包,使用上也是非常方便的。
文章開頭部分提到的幾個業務場景,經過對分布式鎖家族的介紹和原理分析,可以自行選擇技術方案了。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。