您好,登錄后才能下訂單哦!
本篇內容主要講解“ZooKeeper同步框架怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“ZooKeeper同步框架怎么實現”吧!
首先,定義一個同步接口,它有一個execute方法,主要負責同步任務的實現。
Path參數是任務節點(用戶),只有相同的節點才會同步工作。想象一下,去銀行取錢,如果每個人都有一個專屬的柜臺,那效率是明顯的。
SynchronousProcessor參數用來處理具體的業務。
Synchronous.java
package org.bigmouth.nvwa.zookeeper.concurrent; /** * 同步,支持分布式 * * @author Allen Hu * 2015-4-17 */ public interface Synchronous { /** * 同步執行,根據path標識來區分同步工作。不同的path將不會同步進行。 * * @param處理結果類型 * @param path 任務節點 * e.g. "/project/synchronous/0000001" * @param processor 業務處理器 * @return 處理結果 */T execute(String path, SynchronousProcessorprocessor); }
MutexLockSynchronous.java
Synchronous的實現類,基于普通排它鎖的方式實現。
package org.bigmouth.nvwa.zookeeper.concurrent; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.zookeeper.common.PathUtils; import org.bigmouth.nvwa.zookeeper.ZkClientHolder; /** * 基于普通排他鎖的方式實現同步 * * @author Allen Hu * 2015-4-17 */ public class MutexLockSynchronous implements Synchronous { private final ZkClientHolder zkClientHolder; public MutexLockSynchronous(ZkClientHolder zkClientHolder) { this.zkClientHolder = zkClientHolder; } @Override publicT execute(String path, SynchronousProcessorprocessor) { PathUtils.validatePath(path); InterProcessLock lock = new InterProcessMutex(zkClientHolder.get(), path); try { lock.acquire(); if (null != processor) return processor.process(); } catch (Exception e) { if (null != processor) processor.exceptionCaught(e); } finally { try { lock.release(); } catch (Exception e) { } } return null; } }
SynchronousProcessor.java
任務處理器接口,實現它來完成具體的業務工作
package org.bigmouth.nvwa.zookeeper.concurrent; /** * 同步業務處理器 * * @author Allen Hu * 2015-4-17 */ public interface SynchronousProcessor{ /** * 處理具體的業務 * * @return */ T process(); /** * 異常捕獲 * * @param throwable */ void exceptionCaught(Throwable throwable); }
ZkClientHolder.java
當然少不了這個了,繼承的父類可以不需要了解,就是定義了兩個抽象方法:doInit和doDestroy方法。
package org.bigmouth.nvwa.zookeeper; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.bigmouth.nvwa.utils.BaseLifeCycleSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; /** * ZooKeeper client holder * * @author Allen Hu * 2015-4-16 */ public class ZkClientHolder extends BaseLifeCycleSupport { private static final Logger LOGGER = LoggerFactory.getLogger(ZkClientHolder.class); public static final int MAX_RETRIES = 3; public static final int BASE_SLEEP_TIMEMS = 3000; private CuratorFramework zkClient; private final String connectString; private final int sessionTimeout; public ZkClientHolder(String connectString, int sessionTimeout) { Preconditions.checkArgument(StringUtils.isNotBlank(connectString), "connectString cannot be blank"); Preconditions.checkArgument(sessionTimeout >= 10000, "sessionTimeout must be greater than 10000"); this.connectString = connectString; this.sessionTimeout = sessionTimeout; } public CuratorFramework get() { return zkClient; } @Override protected void doInit() { zkClient = CuratorFrameworkFactory.builder() .sessionTimeoutMs(sessionTimeout) .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES)) .build(); zkClient.start(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Connected to ZooKepper server: {}", connectString); } } @Override protected void doDestroy() { if (null != zkClient) zkClient.close(); } }
最后來個測試類,模擬多個用戶多線程處理任務的過程,我們達到了相同用戶間同步的目的。
package org.bigmouth.nvwa.zookeeper.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.utils.ZKPaths; import org.bigmouth.nvwa.zookeeper.ZkClientHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Allen Hu * 2015-4-17 */ public class ConcurrentTest { private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentTest.class); private ZkClientHolder zkClientHolder = new ZkClientHolder("172.16.3.24:2181", 60000); private Synchronous synchronous; public ConcurrentTest() { zkClientHolder.init(); synchronous = new MutexLockSynchronous(zkClientHolder); } public class Service implements Runnable { private final String id; private final long sleepInMillis; public Service(String id, long sleepInMillis) { this.id = id; this.sleepInMillis = sleepInMillis; } @Override public void run() { synchronous.execute(ZKPaths.makePath("/nvwa/zookeeper/concurrent", id), new SynchronousProcessor() { @Override public String process() { LOGGER.info(id + " star...!"); try { Thread.sleep(sleepInMillis); } catch (InterruptedException e) { e.printStackTrace(); } LOGGER.info(id + " has execution!"); return id; } @Override public void exceptionCaught(Throwable throwable) { throwable.printStackTrace(); } }); } } static ExecutorService executor = Executors.newCachedThreadPool(); public static void main(String[] args) { ConcurrentTest ct = new ConcurrentTest(); executor.submit(ct.new Service("1", 5000)); // 1號 處理5秒 executor.submit(ct.new Service("1", 2000)); // 1號 處理2秒 executor.submit(ct.new Service("2", 5000)); // 2號 處理5秒 executor.submit(ct.new Service("3", 10000)); // 3號 處理10秒 executor.submit(ct.new Service("3", 500)); // 3號 處理0.5秒 } }
輸出結果,1、2、3任務并行,而相同的任務串行。如:第二個1號等第一個1號執行完才開始。
到此,相信大家對“ZooKeeper同步框架怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。