您好,登錄后才能下訂單哦!
這篇文章主要介紹“基于Zookeeper怎么實現分布式鎖”,在日常操作中,相信很多人在基于Zookeeper怎么實現分布式鎖問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”基于Zookeeper怎么實現分布式鎖”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Zookeeper是一個分布式的,開源的分布式應用程序協調服務,是Hadoop和hbase的重要組件。
引用官網的圖例:
特征:
zookeeper的數據機構是一種節點樹的數據結構,zNode是基本的單位,znode是一種和unix文件系統相似的節點,可以往這個節點存儲或向這個節點獲取數據
通過客戶端可以對znode進行數據操作,還可以注冊watcher監控znode的改變
持久節點(Persistent)
持久順序節點(Persistent_Sequential)
臨時節點(Ephemeral)
臨時順序節點(Ephemeral_Sequential)
下載zookeeper,官網鏈接,https://zookeeper.apache.org/releases.html#download,去官網找到對應的軟件下載到本地
修改配置文件,${ZOOKEEPER_HOME}\conf,找到zoo_sample.cfg文件,先備份一份,另外一份修改為zoo.cfg
解壓后點擊zkServer.cmd運行服務端:
在cmd窗口或者直接在idea編輯器里的terminal輸入命令:
zkCli.cmd -server 127.0.0.1:2181
輸入命令help查看幫助信息:
ZooKeeper -server host:port -client-configuration properties-file cmd args addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path [-b batch size] delquota [-n|-b|-N|-B] path get [-s] [-w] path getAcl [-s] path getAllChildrenNumber path getEphemerals path history listquota path ls [-s] [-w] [-R] path printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b|-N|-B val path stat [-w] path sync path version whoami
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
,-s
表示順序節點,-e
表示臨時節點,若不指定表示持久節點,acl
是來進行權限控制的
[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0 Created /zk-test0000000000
查看
[zk: 127.0.0.1:2181(CONNECTED) 4] ls / [zk-test0000000000, zookeeper]
設置修改節點數據
set /zk-test 123
獲取節點數據
get /zk-test
ps,zookeeper命令詳情查看help幫助文檔,也可以去官網看看文檔
ok,然后java寫個例子,進行watcher監聽
package com.example.concurrent.zkSample; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; /** * <pre> * Zookeeper 例子 * </pre> * * <pre> * @author mazq * 修改記錄 * 修改后版本: 修改人: 修改日期: 2021/12/09 16:57 修改內容: * </pre> */ public class ZookeeperSample { public static void main(String[] args) { ZkClient client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); client.subscribeDataChanges("/zk-test", new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("監聽到節點數據改變!"); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("監聽到節點數據被刪除了"); } }); try { Thread.sleep(1000 * 60 * 2); } catch (InterruptedException e) { e.printStackTrace(); } } }
Zookeeper有什么典型的應用場景:
注冊中心(Dubbo)
命名服務
Master選舉
集群管理
分布式隊列
分布式鎖
Zookeeper適合用來做分布式鎖,然后具體實現是利用什么原理?我們知道zookeeper是類似于unix的文件系統,文件系統我們也知道在一個文件夾下面,會有文件名稱不能一致的特性的,也就是互斥的特性。同樣zookeeper也有這個特性,在同個znode節點下面,子節點命名不能重復。所以利用這個特性可以來實現分布式鎖
業務場景:在高并發的情況下面進行訂單場景,這是一個典型的電商場景
自定義的Zookeeper序列化類:
package com.example.concurrent.zkSample; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import java.io.UnsupportedEncodingException; public class MyZkSerializer implements ZkSerializer { private String charset = "UTF-8"; @Override public byte[] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes , charset); } catch (UnsupportedEncodingException e) { throw new ZkMarshallingError(); } } }
訂單編號生成器類,因為SimpleDateFormat是線程不安全的,所以還是要加上ThreadLocal
package com.example.concurrent.zkSample; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; public class OrderCodeGenerator { private static final String DATE_FORMAT = "yyyyMMddHHmmss"; private static AtomicInteger ai = new AtomicInteger(0); private static int i = 0; private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(DATE_FORMAT); } }; public static DateFormat getDateFormat() { return (DateFormat) threadLocal.get(); } public static String generatorOrderCode() { try { return getDateFormat().format(new Date(System.currentTimeMillis())) + i++; } finally { threadLocal.remove(); } } }
pom.xml加上zookeeper客戶端的配置:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
實現一個zookeeper分布式鎖,思路是獲取節點,這個是多線程競爭的,能獲取到鎖,也就是創建節點成功,就執行業務,其它搶不到鎖的線程,阻塞等待,注冊watcher監聽鎖是否釋放了,釋放了,取消注冊watcher,繼續搶鎖
package com.example.concurrent.zkSample; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j public class ZKDistributeLock implements Lock { private String localPath; private ZkClient zkClient; ZKDistributeLock(String localPath) { super(); this.localPath = localPath; zkClient = new ZkClient("localhost:2181"); zkClient.setZkSerializer(new MyZkSerializer()); } @Override public void lock() { while (!tryLock()) { waitForLock(); } } private void waitForLock() { // 創建countdownLatch協同 CountDownLatch countDownLatch = new CountDownLatch(1); // 注冊watcher監聽 IZkDataListener listener = new IZkDataListener() { @Override public void handleDataChange(String path, Object o) throws Exception { //System.out.println("zookeeper data has change!!!"); } @Override public void handleDataDeleted(String s) throws Exception { // System.out.println("zookeeper data has delete!!!"); // 監聽到鎖釋放了,釋放線程 countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(localPath , listener); // 線程等待 if (zkClient.exists(localPath)) { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注冊 zkClient.unsubscribeDataChanges(localPath , listener); } @Override public void unlock() { zkClient.delete(localPath); } @Override public boolean tryLock() { try { zkClient.createEphemeral(localPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
訂單服務api
package com.example.concurrent.zkSample; public interface OrderService { void createOrder(); }
訂單服務實現類,加上zookeeper分布式鎖
package com.example.concurrent.zkSample; import java.util.concurrent.locks.Lock; public class OrderServiceInvoker implements OrderService{ @Override public void createOrder() { Lock zkLock = new ZKDistributeLock("/zk-test"); //Lock zkLock = new ZKDistributeImproveLock("/zk-test"); String orderCode = null; try { zkLock.lock(); orderCode = OrderCodeGenerator.generatorOrderCode(); } finally { zkLock.unlock(); } System.out.println(String.format("thread name : %s , orderCode : %s" , Thread.currentThread().getName(), orderCode)); } }
因為搭建分布式環境比較繁瑣,所以這里使用juc里的并發協同工具類,CyclicBarrier模擬多線程并發的場景,模擬分布式環境的高并發場景
package com.example.concurrent.zkSample; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class ConcurrentDistributeTest { public static void main(String[] args) { // 多線程數 int threadSize = 30; // 創建多線程循環屏障 CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{ System.out.println("準備完成!"); }) ; // 模擬分布式集群的場景 for (int i = 0 ; i < threadSize ; i ++) { new Thread(()->{ OrderService orderService = new OrderServiceInvoker(); // 所有線程都等待 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } // 模擬并發請求 orderService.createOrder(); }).start(); } } }
跑多幾次,沒有發現訂單號重復的情況,分布式鎖還是有點效果的
thread name : Thread-6 , orderCode : 202112100945110
thread name : Thread-1 , orderCode : 202112100945111
thread name : Thread-13 , orderCode : 202112100945112
thread name : Thread-11 , orderCode : 202112100945113
thread name : Thread-14 , orderCode : 202112100945114
thread name : Thread-0 , orderCode : 202112100945115
thread name : Thread-8 , orderCode : 202112100945116
thread name : Thread-17 , orderCode : 202112100945117
thread name : Thread-10 , orderCode : 202112100945118
thread name : Thread-5 , orderCode : 202112100945119
thread name : Thread-2 , orderCode : 2021121009451110
thread name : Thread-16 , orderCode : 2021121009451111
thread name : Thread-19 , orderCode : 2021121009451112
thread name : Thread-4 , orderCode : 2021121009451113
thread name : Thread-18 , orderCode : 2021121009451114
thread name : Thread-3 , orderCode : 2021121009451115
thread name : Thread-9 , orderCode : 2021121009451116
thread name : Thread-12 , orderCode : 2021121009451117
thread name : Thread-15 , orderCode : 2021121009451118
thread name : Thread-7 , orderCode : 2021121009451219
注釋加鎖的代碼,再加大并發數,模擬一下
package com.example.concurrent.zkSample; import java.util.concurrent.locks.Lock; public class OrderServiceInvoker implements OrderService{ @Override public void createOrder() { //Lock zkLock = new ZKDistributeLock("/zk-test"); //Lock zkLock = new ZKDistributeImproveLock("/zk-test"); String orderCode = null; try { //zkLock.lock(); orderCode = OrderCodeGenerator.generatorOrderCode(); } finally { //zkLock.unlock(); } System.out.println(String.format("thread name : %s , orderCode : %s" , Thread.currentThread().getName(), orderCode)); } }
跑多幾次,發現出現訂單號重復的情況,所以分布式鎖是可以保證分布式環境的線程安全的
上面例子是一種非公平鎖的方式,一旦監聽到鎖釋放了,所有線程都會去搶鎖,所以容易出現“驚群效應”:
巨大的服務器性能損耗
網絡沖擊
可能造成宕機
所以,需要改進分布式鎖,改成一種公平鎖的模式
公平鎖:多個線程按照申請鎖的順序去獲取鎖,線程會在隊列里排隊,按照順序去獲取鎖。只有隊列第1個線程才能獲取到鎖,獲取到鎖之后,其它線程都會阻塞等待,等到持有鎖的線程釋放鎖,其它線程才會被喚醒。
非公平鎖:多個線程都會去競爭獲取鎖,獲取不到就進入隊列等待,競爭得到就直接獲取鎖;然后持有鎖的線程釋放鎖之后,所有等待的線程就都會去競爭鎖。
流程圖:
代碼改進:
package com.example.concurrent.zkSample; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class ZKDistributeImproveLock implements Lock { private String localPath; private ZkClient zkClient; private String currentPath; private String beforePath; ZKDistributeImproveLock(String localPath) { super(); this.localPath = localPath; zkClient = new ZkClient("localhost:2181"); zkClient.setZkSerializer(new MyZkSerializer()); if (!zkClient.exists(localPath)) { try { this.zkClient.createPersistent(localPath); } catch (ZkNodeExistsException e) { } } } @Override public void lock() { while (!tryLock()) { waitForLock(); } } private void waitForLock() { CountDownLatch countDownLatch = new CountDownLatch(1); // 注冊watcher IZkDataListener listener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { // 監聽到鎖釋放,喚醒線程 countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforePath, listener); // 線程等待 if (zkClient.exists(beforePath)) { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注冊 zkClient.unsubscribeDataChanges(beforePath , listener); } @Override public void unlock() { zkClient.delete(this.currentPath); } @Override public boolean tryLock() { if (this.currentPath == null) { currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123"); } // 獲取Znode節點下面的所有子節點 List<String> children = zkClient.getChildren(localPath); // 列表排序 Collections.sort(children); if (currentPath.equals(localPath + "/" + children.get(0))) { // 當前節點是第1個節點 return true; } else { //得到當前的索引號 int index = children.indexOf(currentPath.substring(localPath.length() + 1)); //取到前一個 beforePath = localPath + "/" + children.get(index - 1); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
thread name : Thread-13 , orderCode : 202112100936140
thread name : Thread-3 , orderCode : 202112100936141
thread name : Thread-14 , orderCode : 202112100936142
thread name : Thread-16 , orderCode : 202112100936143
thread name : Thread-1 , orderCode : 202112100936144
thread name : Thread-9 , orderCode : 202112100936145
thread name : Thread-4 , orderCode : 202112100936146
thread name : Thread-5 , orderCode : 202112100936147
thread name : Thread-7 , orderCode : 202112100936148
thread name : Thread-2 , orderCode : 202112100936149
thread name : Thread-17 , orderCode : 2021121009361410
thread name : Thread-15 , orderCode : 2021121009361411
thread name : Thread-0 , orderCode : 2021121009361412
thread name : Thread-10 , orderCode : 2021121009361413
thread name : Thread-18 , orderCode : 2021121009361414
thread name : Thread-19 , orderCode : 2021121009361415
thread name : Thread-8 , orderCode : 2021121009361416
thread name : Thread-12 , orderCode : 2021121009361417
thread name : Thread-11 , orderCode : 2021121009361418
thread name : Thread-6 , orderCode : 2021121009361419
Redis和Zookeeper都可以用來實現分布式鎖,兩者可以進行對比:
基于Redis實現分布式鎖
實現比較復雜
存在死鎖的可能
性能比較好,基于內存 ,而且保證的是高可用,redis優先保證的是AP(分布式CAP理論)
基于Zookeeper實現分布式鎖
實現相對簡單
可靠性高,因為zookeeper保證的是CP(分布式CAP理論)
性能相對較好 并發1~2萬左右,并發太高,還是redis性能好
到此,關于“基于Zookeeper怎么實現分布式鎖”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。