您好,登錄后才能下訂單哦!
ZooKeeper構建配置服務
* 配置服務是分布式應用所需要的基本服務之一,它使集群中的機器可以共享配置信息中那些公共的部分。
* 簡單的說,ZooKeeper可以作為一個具有高可用性的配置存儲器,允許分布式應用的參與者檢索和更新配置文件。
* 使用ZooKeeper中的觀察機制,可以建立一個活躍的配置服務,使那些感興趣的客戶端能夠獲得配置信息修改的通知。
在每個znode上存儲一個鍵值對,ActiveKeyValueStore 提供了從zookeeper服務上寫和讀取鍵值方法。
public class ActiveKeyValueStore extends ConnectionWatcher{ private static final Charset CHARSET =Charset.forName("GBk"); private static final int MAX_RETRIES = 5; private static final long RETRY_PERIOD_SECONDS = 60; public void write(String path, String value) throws Exception{ int retries = 0; while(true){ try { Stat stat = zk.exists(path, false); if(stat == null){ zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET), -1); } } catch (KeeperException.SessionExpiredException e) { throw e; }catch(KeeperException e){ if(retries++ == MAX_RETRIES){ throw e; } TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS); } } } public String read(String path, Watcher watcher) throws Exception{ byte[] data = zk.getData(path, watcher, null); return new String(data, CHARSET); } }
與zookeeper服務創建連接
public class ConnectionWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws Exception{ //創建zookeeper實例的時候會啟動一個線程連接到zookeeper服務。 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } //當客戶端已經與zookeeper建立連接后,Watcher的process方法會被調用。 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } public void close() throws Exception{ zk.close(); } }
ResilientConfigUpdater類提供了管理更新配置信息方法。
public class ResilientConfigUpdater { public static final String PATH = "/config"; private ActiveKeyValueStore store; private Random random = new Random(); public ResilientConfigUpdater(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void run() throws Exception{ while(true){ String value = random.nextInt(100)+""; store.write(PATH, value); System.out.printf("Set %s to %s\n", PATH, value); TimeUnit.SECONDS.sleep(random.nextInt(10)); } } public static void main(String[] args) throws Exception { while(true){ try{ ResilientConfigUpdater updater = new ResilientConfigUpdater("192.168.44.231"); updater.run(); }catch(KeeperException.SessionExpiredException e){ //start a new session }catch(KeeperException e){ e.printStackTrace(); break; } } } }
ConfigWatcher類提供了配置信息變更觀察器,在信息修改后會觸發顯示方法被調用。
public class ConfigWatcher implements Watcher{ private ActiveKeyValueStore store; public ConfigWatcher(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void displayConfig() throws Exception{ String value = store.read(ConfigUpdater.PATH, this); System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value); } public void process(WatchedEvent event) { // TODO Auto-generated method stub if(event.getType() == EventType.NodeDataChanged){ try { displayConfig(); } catch (Exception e) { System.out.println("Interrupted. Exiting."); Thread.currentThread().interrupt(); } } } public static void main(String[] args) throws Exception { ConfigWatcher watcher = new ConfigWatcher("192.168.44.231"); watcher.displayConfig(); Thread.sleep(Long.MAX_VALUE); } }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。