您好,登錄后才能下訂單哦!
本篇文章為大家展示了Zookeeper中怎么實現一個分布式鎖,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
zk有四種節點,一個最容易想到的策略就是創建節點,誰創建成功了,就表示誰持有了這個鎖
這個思路與redis的setnx
有點相似,因為zk的節點創建,也只會有一個會話會創建成功,其他的則會拋已存在的異常
借助臨時節點,會話丟掉之后節點刪除,這樣可以避免持有鎖的實例異常而沒有主動釋放導致所有實例都無法持有鎖的問題
如果采用這種方案,如果我想實現阻塞獲取鎖的邏輯,那么其中一個方案就需要寫一個while(true)來不斷重試
while(true) { if (tryLock(xxx)) return true; else Thread.sleep(1000); }
另外一個策略則是借助事件監聽,當節點存在時,注冊一個節點刪除的觸發器,這樣就不需要我自己重試判斷了;充分借助zk的特性來實現異步回調
public void lock() { if (tryLock(path, new Watcher() { @Override public void process(WatchedEvent event) { synchronized (path){ path.notify(); } } })) { return true; } synchronized (path) { path.wait(); } }
那么上面這個實現有什么問題呢?
每次節點的變更,那么所有的都會監聽到變動,好處是非公平鎖的支持;缺點就是剩下這些喚醒的實例中也只會有一個搶占到鎖,無意義的喚醒浪費性能
接下來這種方案更加常見,晚上大部分的教程也是這種case,主要思路就是創建臨時順序節點
只有序號最小的節點,才表示搶占鎖成功;如果不是最小的節點,那么就監聽它前面一個節點的刪除事件,前面節點刪除了,一種可能是他放棄搶鎖,一種是他釋放自己持有的鎖,不論哪種情況,對我而言,我都需要撈一下所有的節點,要么拿鎖成功;要么換一個前置節點
接下來我們來一步步看下,基于臨時順序節點,可以怎么實現分布式鎖
對于zk,我們依然采用apache的提供的包 zookeeper
來操作;后續提供Curator
的分布式鎖實例
核心依賴
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
版本說明:
zk版本: 3.6.2
SpringBoot: 2.2.1.RELEASE
第一步,都是實例創建
public class ZkLock implements Watcher { private ZooKeeper zooKeeper; // 創建一個持久的節點,作為分布式鎖的根目錄 private String root; public ZkLock(String root) throws IOException { try { this.root = root; zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this); Stat stat = zooKeeper.exists(root, false); if (stat == null) { // 不存在則創建 createNode(root, true); } } catch (Exception e) { e.printStackTrace(); } } // 簡單的封裝節點創建,這里只考慮持久 + 臨時順序 private String createNode(String path, boolean persistent) throws Exception { return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL); } }
在我們的這個設計中,我們需要持有當前節點和監聽前一個節點的變更,所以我們在ZkLock實例中,添加兩個成員
/** * 當前節點 */ private String current; /** * 前一個節點 */ private String pre;
接下來就是嘗試獲取鎖的邏輯
current不存在,在表示沒有創建過,就創建一個臨時順序節點,并賦值current
current存在,則表示之前已經創建過了,目前處于等待鎖釋放過程
接下來根據當前節點順序是否最小,來表明是否持有鎖成功
當順序不是最小時,找前面那個節點,并賦值 pre;
監聽pre的變化
/** * 嘗試獲取鎖,創建順序臨時節點,若數據最小,則表示搶占鎖成功;否則失敗 * * @return */ public boolean tryLock() { try { String path = root + "/"; if (current == null) { // 創建臨時順序節點 current = createNode(path, false); } List<String> list = zooKeeper.getChildren(root, false); Collections.sort(list); if (current.equalsIgnoreCase(path + list.get(0))) { // 獲取鎖成功 return true; } else { // 獲取鎖失敗,找到前一個節點 int index = Collections.binarySearch(list, current.substring(path.length())); // 查詢當前節點前面的那個 pre = path + list.get(index - 1); } } catch (Exception e) { e.printStackTrace(); } return false; }
請注意上面的實現,這里并沒有去監聽前一個節點的變更,在設計tryLock
,因為是立馬返回成功or失敗,所以使用這個接口的,不需要注冊監聽
我們的監聽邏輯,放在 lock()
同步阻塞里面
嘗試搶占鎖,成功則直接返回
拿鎖失敗,則監聽前一個節點的刪除事件
public boolean lock() { if (tryLock()) { return true; } try { // 監聽前一個節點的刪除事件 Stat state = zooKeeper.exists(pre, true); if (state != null) { synchronized (pre) { // 阻塞等待前面的節點釋放 pre.wait(); // 這里不直接返回true,因為前面的一個節點刪除,可能并不是因為它持有鎖并釋放鎖,如果是因為這個會話中斷導致臨時節點刪除,這個時候需要做的是換一下監聽的 preNode return lock(); } } else { // 不存在,則再次嘗試拿鎖 return lock(); } } catch (Exception e) { e.printStackTrace(); } return false; }
注意:
當節點不存在時,或者事件觸發回調之后,重新調用lock()
,表明我胡漢三又來競爭鎖了?
為啥不是直接返回 true? 而是需要重新競爭呢?
因為前面節點的刪除,有可能是因為前面節點的會話中斷導致的;但是鎖還在另外的實例手中,這個時候我應該做的是重新排隊
最后別忘了釋放鎖
public void unlock() { try { zooKeeper.delete(current, -1); current = null; zooKeeper.close(); } catch (Exception e) { e.printStackTrace(); } }
到此,我們的分布式鎖就完成了,接下來我們復盤下實現過程
所有知識點來自前一篇的zk基礎使用(創建節點,刪除節點,獲取所有自己點,監聽事件)
搶鎖過程 =》 創建序號最小的節點
若節點不是最小的,那么就監聽前面的節點刪除事件
這個實現,支持了鎖的重入(why? 因為鎖未釋放時,我們保存了current,當前節點存在時則直接判斷是不是最小的;而不是重新創建)
最后寫一個測試case,來看下
@SpringBootApplication public class Application { private void tryLock(long time) { ZkLock zkLock = null; try { zkLock = new ZkLock("/lock"); System.out.println("嘗試獲取鎖: " + Thread.currentThread() + " at: " + LocalDateTime.now()); boolean ans = zkLock.lock(); System.out.println("執行業務邏輯:" + Thread.currentThread() + " at:" + LocalDateTime.now()); Thread.sleep(time); } catch (Exception e) { e.printStackTrace(); } finally { if (zkLock != null) { zkLock.unlock(); } } } public Application() throws IOException, InterruptedException { new Thread(() -> tryLock(10_000)).start(); Thread.sleep(1000); // 獲取鎖到執行鎖會有10s的間隔,因為上面的線程搶占到鎖,并持有了10s new Thread(() -> tryLock(1_000)).start(); System.out.println("---------over------------"); Scanner scanner = new Scanner(System.in); String ans = scanner.next(); System.out.println("---> over --->" + ans); } public static void main(String[] args) { SpringApplication.run(Application.class); } }
輸出結果如下
上述內容就是Zookeeper中怎么實現一個分布式鎖,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。