您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關distributeTemplate分布式配置以及上下文的同步維護是怎樣的,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
我們實現分布式 也是按照 這樣一步一步來的,首先是公用的資源對象,第一個必須的公用資源 對象是配置,這配置交給用戶外部填寫 手動配置 并能維持同步 更新 所以需要一個配置對象 維護在內存里面,需要事件驅動監聽配置文件的變化情況
ok下面來 看看代碼是怎么做的 ,首先 配置 有多重配置方式 ini conf prop xml 各種方式本質上 我們分布式就是要各臺主機 自己所在的節點 都能知道我在網絡上情況,或者所可以找到像zookeeper 只要知道或者找到了 才能進行以后的通訊同步
我們為了能夠以后支持多重配置 ,所以先把配置定義為一個接口
public interface IConfig { public static final String DEFAULTFILE="distribute.conf"; /** * * 從文件讀取配置初始化 * @param file * 添加(修改)人:zhuyuping */ public void readConfigFormFile(String file); /** * 從用戶提供的json字符串中初始化 * @param json * 添加(修改)人:zhuyuping */ public void readConfigFormString(String json); /** * 獲得系統內存中維護的內存表 * * 添加(修改)人:zhuyuping */ public Table<InetSocketAddress, String, Object> getConfig(); /** * 攝入context上下文 * @param context * 添加(修改)人:zhuyuping */ public void setContext(Context context); /** * 獲得上下文context * @return * 添加(修改)人:zhuyuping */ public Context getContext(); public BiMap<String, InetSocketAddress> getAlias(); }
主要 是3個方法 ,一個是讀配置文件到內存對象中,擰一個就是內存對象寫到文件中,對于分布式還需要一個同步
有時候 我們可能需要歷史版本 能夠容錯還原 所以可能需要一個版本號 記錄當前版本,然后保持有可能多個節點上配置文件更改后,發送請求時候能夠保持順序的更新,這后面我會處理 加入一個分布式先進先出的隊列的,這里暫時未加 后面 加了會補上的,還有上下文context 對象,有人會說 你為什么需要上下文對象去保存變量啊,好多框架 都有上下文這個對象,也許是攔截器模式 也許是門面模式 等等其他模式,但是 這里我都不是,其實上下文只是為了本地節點的各個功能代碼段之間的一個橋梁,有句話叫做 兩情若是久長時 又豈在朝朝暮暮 我們有鵲橋,他就是上下文最重要的本質 就是維護本地節點上下文貫穿 如果上下文 里面保存著門面 那么他就有門面的功能 便于用戶隨時獲取門面對象 進行操作,ok我們來看看 context是怎么定義的
/** * * * * @author zhuyuping * @version 1.0 * @created 2014-7-9 * @function:上下文接口 他只是存儲用戶上面類所有過程中的變量不是config配置而且分布式中不會同步的只會在單點上有效 切記 、、后期如果想支持xml 配置或者其他配置 可以添加策略模式 */ public interface Context { public final static String defaultConfig="distribute.conf";//默認配置名 public void putValue(String key,Object value); public Object getValue(String key); public void setCurrHost(String host,int port); public InetSocketAddress getCurrHost(); /** * 獲得默認配置文件 * @return * 添加(修改)人:zhuyuping */ public String getDefaultFc(); // /** // * 設置默認屬性文件的名稱 // * @param pfile // * 添加(修改)人:zhuyuping // */ // public void setDefaultFc(String pfile); // /** // * 注入template 門面 便于后面直接通過上下文來使用 如果要整合spring ApplicationContextAware // * @param distributedTemplate // * 添加(修改)人:zhuyuping // */ // public void setTemplate(DistributedOperations distributedTemplate); // // public DistributedOperations getTemplate(); }
這里其實 就是一個map 保存屬性key value 而常用的就取出作為一個方法了
這個context 因為后面我們給用戶一個繼承的 這樣便于 用戶實現自己的上下文 或交給其他框架上下文 以及整合所以我們實現了一個抽象的 默認實現
/** * * * * @author zhuyuping * @version 1.0 * @created 2014-7-9 下午5:58:37 * @function:抽象的上下文 主要是 管理context的資源 還有就是提供用戶自定義 整合spring使用該類 //這里后期需整合策略 實現 */ public abstract class AbstractContext implements Context{ //?也可以使用LocalThread 也可以 private Map<String,Object> context=Maps.newConcurrentMap(); private InetSocketAddress currHost;//當前主機 比如192.168.0.1 8888 private String dfConfig;//默認讀取的配置文件 當用戶 提供就修改 沒有提供就默認 public AbstractContext(String dfConfig) { super(); this.dfConfig = dfConfig; //當前classes 下的文件 //currentPort this.currHost=new InetSocketAddress(ConfigFactory.load(dfConfig).getString("client.currentHost"),ConfigFactory.load(dfConfig).getInt("client.currentPort")); } @Override public InetSocketAddress getCurrHost() { return currHost; } @Override public void setCurrHost(String host,int port) { this.currHost=new InetSocketAddress(host, port); } @Override public String getDefaultFc() { return dfConfig!=null?dfConfig:defaultConfig; } public AbstractContext() { super(); this.dfConfig=defaultConfig; this.currHost=new InetSocketAddress(ConfigFactory.load(defaultConfig).getString("client.currentHost"),ConfigFactory.load(defaultConfig).getInt("client.currentPort")); } @Override public void putValue(String key, Object value) { context.put(key, value); } @Override public Object getValue(String key) { return context.get(key); } }
ok 很簡單的 維護者
然后回到剛才的配置 ,首先 我們配置文件 需要從文件讀取 到配置對象中 ,這是為了用戶更改 或者初始化時候 吧配置文件初始化到配置內存對象中 然后這個內存對象將會在同步 配置文件 更改 網絡通訊時候用到,在對于全局的所有節點的淪陷時候 單純的context只維護本節點橋梁信息的 已經不夠用了 因為他不會同步的,這就是為什么需要他的原因,我這里采用的配置文件時conf 也就是configLoad方式,后面我會逐步添加更多的支持方式 無非是xml 等讀取問題,這并不重要 思想才是重要的
/** * * * * @author zhuyuping * @version 1.0 * @created 2014-7-9 下午4:07:00 如果后期需要支持xml 其他 只需要使用策略模式 * @function:基類config 主要維持 配置基本信息 以及配置信息的同步 備份 同時內存中維持一張內存表table */ public abstract class AbstractConfig implements IConfig{ /** * 當前的配置表 行 為主機別名 其中一定有一列為版本號 AotmicLong 以及配置的相關字段 值為相關的對象 */ protected Table<InetSocketAddress, String, Object> config=HashBasedTable.create();//table信息 table 信息 這里無需要用到 哪個ConcurrentHashMap<K, V> 因為這個只會加載讀取 加載 //不會修改,因為這個table當用戶真的更新config后 會同步并同時刷到更新到文件中的 ,而且每次用戶提供查詢的配置 //是不會更新到文件里面的 protected AtomicLong version=new AtomicLong(0);//初始化版本為0;//判斷當前的版本 是佛在config 里面是否存在 protected BiMap<String,InetSocketAddress> alias=HashBiMap.create(); protected Context context; public BiMap<String, InetSocketAddress> getAlias() { return alias; } /** * context 需要提供當前主機 以及 * @param context */ public AbstractConfig(Context context) { super(); this.context = context; wrapConfig(ConfigFactory.load(context.getDefaultFc())); } @Override public void setContext(Context context) { this.context=context; } @Override public Context getContext() { return context; } @Override public void readConfigFormFile(String file) { Config config=TypeSafeConfigLoadUtils.loadFromFile(new File(file)); wrapConfig(config); } @Override public void readConfigFormString(String json) { Config config=TypeSafeConfigLoadUtils.loadFromString(json); wrapConfig(config); } /** * 對config進行初始化 table * @param config * 添加(修改)人:zhuyuping */ protected abstract void wrapConfig(Config config); /** * 把table 從內存中讀取從新寫入到配置文件中 * @param config * 添加(修改)人:zhuyuping */ protected abstract String wrapTable(Table<String, String, Object> config); /** * 只保留最近的5個版本 可以回滾 更新***的 * * 添加(修改)人:zhuyuping */ public void updateVersion(Long version){ } /** * 版本數更新 * 更新完后 需要 * 添加(修改)人:zhuyuping */ public void addVersion(){ Long v=version.getAndIncrement(); //TODO 需要通知所有節點 我要修改版本了 如果同時有幾個人也這樣 那么接受該節點下次更新的版本號, //在回調函數中 更新配置 隨后同步 只保留5個版本 } @Override public Table<InetSocketAddress, String, Object> getConfig() { return config; } /** * * 提交對文件配置做出的修改 * 添加(修改)人:zhuyuping */ protected abstract void commit(); /** * * 提交對配置的修改 如果一個人在一個節點上 更改了配置 需要核對版本 并從新更新本地的配置文件 * 添加(修改)人:zhuyuping */ protected abstract void sync(); }
這里 我為了好維護 就直接使用guava的table ,其實你可以用List< map> 實現,這里 重要是獲取所有主機列表的方法
然后就是配置文件讀取后 寫入context 對象 當然 上面說的讀取配置到內存對象 ,內存對象寫入到配置文件是基礎
然后看看怎么寫入的 我為了以后支持xml 所以這讀取方式 寫入方式 交給后面的子類實現類去實現 這樣實現xml只要實現這2個方法即可
/** * * * * @author zhuyuping * @version 1.0 * @created 2014-7-9 下午10:38:05 * @function:默認的配置 允許用戶實現自定義的配置規則 只需要繼承 AbstractConfig */ public class SimpleDistributeConfig extends AbstractConfig{ public SimpleDistributeConfig(Context context) { super(context); } @Override protected void wrapConfig(Config configObj) { //得到所有的節點 List<? extends ConfigObject> nodes=configObj.getObjectList("server.hosts"); int i=0; for (ConfigObject node : nodes) { i++; //如果后期添加其他mysql 等支持 這里就需要添加判斷 //Integer localport=node.containsKey("localPort")?Integer.parseInt(node.get("localPort").render()):LOCALPORT;//Integer.parseInt(node.get("localPort").render()); Integer remotePort=Integer.parseInt(node.get("remotePort").render()); String remoteIp=node.get("remoteHost").unwrapped().toString();//遠程主機的ip //開始初始化配置table String name=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主機別名 InetSocketAddress remoteHost=new InetSocketAddress(remoteIp, remotePort); super.alias.put(name, remoteHost); super.config.put(remoteHost,"version", super.version.incrementAndGet()); super.config.put(remoteHost, "remoteHost", remoteHost); //super.config.put(remoteHost, "localPort", localport); super.config.put(remoteHost, "remotePort", remotePort); super.config.put(remoteHost, "name", name); if(node.containsKey("file")){ HashMap fcs=(HashMap) node.get("file").unwrapped(); String syncPath=fcs.get("syncPath").toString();//文件同步的路徑 // System.out.println("SimpleDistributeConfig.wrapConfig() "+syncPath); super.config.put(remoteHost, "file", syncPath);//以后配置多的時候 分裝成一個bean存入 } } String chost=configObj.getString("client.currentHost"); int port=configObj.getInt("client.currentPort"); super.context.setCurrHost(chost, port); //config.root().containsKey(key) //config.getString(path); } @Override protected String wrapTable(Table<String, String, Object> table) { StringBuilder sb=new StringBuilder("server{"); sb.append("\n\t").append("hosts=["); Set<String> rows=table.rowKeySet(); int size=rows.size(); int i=0; for (String row : rows) { i++; Map<String,Object> map=table.row(row); if(!map.containsKey("remoteHost")) continue; sb.append("\t {"); Object remoteHost=map.get("remoteHost"); Object remotePort=map.get("remotePort"); //Object localPort=map.get("localPort"); Object name=map.get("name"); sb.append("name=").append(name).append("\n\t"); //sb.append("localPort=").append(localPort).append("\n\t"); sb.append("remoteHost=").append(remoteHost).append("\n\t"); sb.append("remotePort=").append(remotePort).append("\n\t"); if(map.containsKey("file")){ sb.append("file{").append("\n\t").append("syncPath=").append(map.get("syncPath")).append("}"); } sb.append("\t }"); if(i!=size){ sb.append(","); } } sb.append("]").append("\n\t").append("}"); //繼續 保存client sb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(context.getCurrHost().getHostString()).append("\n\t"); sb.append("\n\t").append("currentPort=").append(context.getCurrHost().getPort()).append("\n\t"); sb.append("}"); return sb.toString(); } @Override protected void commit() { } @Override protected void sync() { } public static void main(String[] args) { } }
ok 基本的簡單的 配置 以及 上下文 這些用來同步的 用來做橋梁的都已經做好了 下面就是怎么監聽配置文件的更改,有人說怎么監聽一個配置文件更改啊,其實 文件有個屬性 ***更改的時間,只要監聽這個就可以 初級版本沒有加上更改,這個后面可以隨時加上 而且為了以后好更改 只寫了相關方式沒有 加上
import java.io.File; public interface FileListener { public void fileChanged(File file); }
然后用定時器 timeer 或者線程池定時線程去輪訓他 ,
這里 注意弱應用 ,其實 對于這些時間,我建議用LinkTransferQueue,他是FIFO 先進先出的無阻曬的隊列 然后隊列 加上若引用,保證時間的一些順序,
然后 通知了接口 在接口里面我們在做相應的更改配置 同步配置 的相關操作,這一章基本思想就是
同步 資源 配置 上下文為橋梁 事件驅動為引擎
還有支出的是 timer excutors.newSchulerXXXX 還有很多方式實現輪訓的方式 這種方式 也可以實現心跳線
同時 告訴大家的事 Apache、 camel| 里面的事件驅動文件部分 核心代碼就是上面 這一小部分代碼沒有貼完整 ,但是代碼已經托管到 http://git.oschina.net/zhuyuping/distributeTemplate
關于distributeTemplate分布式配置以及上下文的同步維護是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。