您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“ZooKeeper三分布式鎖實現及完整運行的代碼”,內容詳細,步驟清晰,細節處理妥當,希望這篇“ZooKeeper三分布式鎖實現及完整運行的代碼”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
首先我們先介紹一個簡單的zookeeper實現分布式鎖的思路:
用zookeeper中一個臨時節點代表鎖,比如在/exlusive_lock下創建臨時子節點/exlusive_lock/lock。
所有客戶端爭相創建此節點,但只有一個客戶端創建成功。
創建成功代表獲取鎖成功,此客戶端執行業務邏輯
未創建成功的客戶端,監聽/exlusive_lock變更
獲取鎖的客戶端執行完成后,刪除/exlusive_lock/lock,表示鎖被釋放
鎖被釋放后,其他監聽/exlusive_lock變更的客戶端得到通知,再次爭相創建臨時子節點/exlusive_lock/lock。此時相當于回到了第2步。
我們的程序按照上述邏輯直至搶占到鎖,執行完業務邏輯。
上述是較為簡單的分布式鎖實現方式。能夠應付一般使用場景,但存在著如下兩個問題:
1、鎖的獲取順序和最初客戶端爭搶順序不一致,這不是一個公平鎖。每次鎖獲取都是當次最先搶到鎖的客戶端。
2、羊群效應,所有沒有搶到鎖的客戶端都會監聽/exlusive_lock變更。當并發客戶端很多的情況下,所有的客戶端都會接到通知去爭搶鎖,此時就出現了羊群效應。
為了解決上面的問題,我們重新設計。
我們在2.0版本中,讓每個客戶端在/exlusive_lock下創建的臨時節點為有序節點,這樣每個客戶端都在/exlusive_lock下有自己對應的鎖節點,而序號排在最前面的節點,代表對應的客戶端獲取鎖成功。排在后面的客戶端監聽自己前面一個節點,那么在他前序客戶端執行完成后,他將得到通知,獲得鎖成功。邏輯修改如下:
每個客戶端往/exlusive_lock下創建有序臨時節點/exlusive_lock/lock_。創建成功后/exlusive_lock下面會有每個客戶端對應的節點,如/exlusive_lock/lock_000000001
客戶端取得/exlusive_lock下子節點,并進行排序,判斷排在最前面的是否為自己。如果自己的鎖節點在第一位,代表獲取鎖成功,此客戶端執行業務邏輯
如果自己的鎖節點不在第一位,則監聽自己前一位的鎖節點。例如,自己鎖節點lock_000000002,那么則監聽lock_000000001.
當前一位鎖節點(lock_000000001)對應的客戶端執行完成,釋放了鎖,將會觸發監聽客戶端(lock_000000002)的邏輯。
監聽客戶端重新執行第2步邏輯,判斷自己是否獲得了鎖。
如此修改后,每個客戶端只關心自己前序鎖是否釋放,所以每次只會有一個客戶端得到通知。而且,所有客戶端的執行順序和最初鎖創建的順序是一致的。解決了1.0版本的兩個問題。
接下來我們看看代碼如何實現。
此類是分布式鎖類,實現了2個分布式鎖的相關方法:
1、獲取鎖
2、釋放鎖
主要程序邏輯圍繞著這兩個方法的實現,特別是獲取鎖的邏輯。我們先看一下該類的成員變量:
private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath;
定義了zkClient,用來操作zookeeper。
鎖的根路徑,及自增節點的前綴。此處生產環境應該由客戶端傳入。
當前鎖的路徑。
public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); }
創建zkClient,同時創建了狀態監聽。此監聽可以去掉,這里只是打印出失去連接狀態。
暴露出來的獲取鎖的方法為acquireLock(),邏輯很簡單:
public void acquireLock() throws InterruptedException, KeeperException { //創建鎖節點 createLock(); //嘗試獲取鎖 attemptLock(); }
首先創建鎖節點,然后嘗試去取鎖。真正的邏輯都在這兩個方法中。
先判斷鎖的根節點/Locks是否存在,不存在的話創建。然后在/Locks下創建有序臨時節點,并設置當前的鎖路徑變量lockPath。
代碼如下:
private void createLock() throws KeeperException, InterruptedException { //如果根節點不存在,則創建根節點 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創建EPHEMERAL_SEQUENTIAL類型節點 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創建: " + lockPath); this.lockPath=lockPath; }
這是最核心的方法,客戶端嘗試去獲取鎖,是對2.0版本邏輯的實現,這里就不再重復邏輯,直接看代碼:
private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節點,按照節點序號排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號最小的節點,則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號最小的節點,監聽前一個節點 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個節點不存在了,比如說執行完畢,或者執行節點掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } }
注意這一行代碼
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
我們在獲取前一個節點的時候,同時設置了監聽watcher。如果前鎖存在,則阻塞主線程。
watcher定義代碼如下:
private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } };
watcher只是notifyAll,讓主線程繼續執行,以便再次調用attemptLock(),去嘗試獲取lock。如果沒有異常情況的話,此時當前客戶端應該能夠成功獲取鎖。
釋放鎖原語實現很簡單,參照releaseLock()方法。代碼如下:
public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); }
關于分布式鎖的代碼到此就講解完了,我們再看下客戶端如何使用它。
我們創建一個TicketSeller類,作為客戶端來使用分布式鎖。
不帶鎖的業務邏輯方法,代碼如下:
private void sell(){ System.out.println("售票開始"); // 線程隨機休眠數毫秒,模擬現實中的費時操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復雜邏輯執行了一段時間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結束"); }
僅是為了演示,sleep了一段時間。
此方法中,加鎖后執行業務邏輯,代碼如下:
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); }
接下來我們寫一個main函數做測試:
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } }
main函數中我們循環調用ticketSeller.sellTicketWithLock(),執行加鎖后的賣票邏輯。
1、先啟動一個java程序運行,可以看到日志輸出如下:
main 鎖創建: /Locks/Lock_0000000391 main 鎖獲得, lockPath: /Locks/Lock_0000000391 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000391 main 鎖創建: /Locks/Lock_0000000392 main 鎖獲得, lockPath: /Locks/Lock_0000000392 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000392 main 鎖創建: /Locks/Lock_0000000393 main 鎖獲得, lockPath: /Locks/Lock_0000000393 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000393
可見每次執行都是按照鎖的順序執行,而且由于只有一個進程,并沒有鎖的爭搶發生。
2、我們再啟動一個同樣的程序,鎖的爭搶此時發生了,可以看到雙方的日志輸出如下:
程序1:
main 鎖獲得, lockPath: /Locks/Lock_0000000471 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000471 main 鎖創建: /Locks/Lock_0000000473 等待前鎖釋放,prelocakPath:Lock_0000000472 /Locks/Lock_0000000472 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000473 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000473
可以看到Lock_0000000471執行完成后,該進程獲取的鎖為Lock_0000000473,這說明Lock_0000000472被另外一個進程創建了。此時Lock_0000000473在等待前鎖釋放。Lock_0000000472釋放后,Lock_0000000473才獲得鎖,然后才執行業務邏輯。
我們再看程序2的日志:
main 鎖獲得, lockPath: /Locks/Lock_0000000472 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000472 main 鎖創建: /Locks/Lock_0000000474 等待前鎖釋放,prelocakPath:Lock_0000000473 /Locks/Lock_0000000473 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000474 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000474
可以看到,確實是進程2獲取了Lock_0000000472。
zookeeper實現分布式鎖就先講到這。注意代碼只做演示用,并不適合生產環境使用。
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; public class LockSample { //ZooKeeper配置信息 private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath; // 監控lockPath的前一個節點的watcher private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } }; public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); } //獲取鎖的原語實現. public void acquireLock() throws InterruptedException, KeeperException { //創建鎖節點 createLock(); //嘗試獲取鎖 attemptLock(); } //創建鎖的原語實現。在lock節點下創建該線程的鎖節點 private void createLock() throws KeeperException, InterruptedException { //如果根節點不存在,則創建根節點 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創建EPHEMERAL_SEQUENTIAL類型節點 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創建: " + lockPath); this.lockPath=lockPath; } private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節點,按照節點序號排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號最小的節點,則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號最小的節點,監控前一個節點 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個節點不存在了,比如說執行完畢,或者執行節點掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } } //釋放鎖的原語實現 public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); } }
import org.apache.zookeeper.KeeperException; import java.io.IOException; public class TicketSeller { private void sell(){ System.out.println("售票開始"); // 線程隨機休眠數毫秒,模擬現實中的費時操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復雜邏輯執行了一段時間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結束"); } public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); } public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } } }
讀到這里,這篇“ZooKeeper三分布式鎖實現及完整運行的代碼”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。