您好,登錄后才能下訂單哦!
本篇文章為大家展示了Jedis中怎么實現分布式鎖,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
package com.xxx.arch.seq.client.redis; import java.io.Closeable; import java.util.*; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.*; import com.xxx.arch.seq.constant.Constants; /** * Jedis配置實例封裝類(兼容單節點連接池和集群節點) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */ public class JedisConfig { private static volatile JedisConfig redisConfig; //當前模式:1單例,2哨兵 3集群Cluster private int singleton; //jedis連接池 private JedisPool jedisPool; private JedisSentinelPool sentinelPool; private Jedis jedis; //jeids集群 private JedisCluster jedisCluster; private JedisConfig() { Properties redisProp = new Properties(); redisProp.setProperty("arch.seq.redis.host", Constants.ARCH_SEQ_REDIS_NODES); redisProp.setProperty("arch.seq.redis.password", Constants.ARCH_SEQ_REDIS_PASSWORD); redisProp.setProperty("arch.seq.redis.sentinel.master", Constants.ARCH_SEQ_REDIS_SENTINEL_MASTER); String hostConf = redisProp.getProperty("arch.seq.redis.host"); if (hostConf == null) { throw new RuntimeException("get redis configuration error"); } if ("${arch.seq.redis.host}".equals(hostConf)) { throw new RuntimeException("please check occ var \"arch.seq.redis.host\""); } if(!hostConf.contains(",")&&!hostConf.contains(">>")){ singleton = 1; }else if(hostConf.contains(">>")){ singleton=2; }else{ singleton=3; } if (singleton==1) { initJedisPool(redisProp); } else if(singleton==2){ initJedisSentinel(redisProp); }else{ initJedisCluster(redisProp); } } private void initJedisPool(Properties redisProp) { String[] hostConf = redisProp.getProperty("arch.seq.redis.host").split(":"); this.jedisPool = new JedisPool(new JedisPoolConfig(), hostConf[0], Integer.valueOf(hostConf[1]), 0, redisProp.getProperty("arch.seq.redis.password")); } private void initJedisCluster(Properties redisProp) { String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(","); Set<HostAndPort> nodes = new HashSet<>(); String[] hostConf; for (String hc : hostConfList) { hostConf = hc.split(":"); nodes.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1]))); } jedisCluster = new JedisCluster(nodes, 0, 0, 4, redisProp.getProperty("arch.seq.redis.password"), new GenericObjectPoolConfig()); } private void initJedisSentinel(Properties redisProp) { String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(">>"); Set sentinels = new HashSet(); String[] hostConf; for (String hc : hostConfList) { hostConf= hc.split(":"); sentinels.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])).toString()); } sentinelPool = new JedisSentinelPool(redisProp.getProperty("arch.seq.redis.sentinel.master"), sentinels,redisProp.getProperty("arch.seq.redis.password")); jedis = sentinelPool.getResource(); } public static JedisConfig getInstance() { if (redisConfig == null) { synchronized (JedisConfig.class) { if (redisConfig == null) { redisConfig = new JedisConfig(); } } } return redisConfig; } public JedisConn getConn() { if(singleton==1){ return new JedisConn(jedisPool.getResource()); } if(singleton==2){ return new JedisConn(sentinelPool.getResource()); } if(singleton==3){ return new JedisConn(jedisCluster); } return null; } /** * redis連接封裝類,支持單機和集群,支持常規操作,支持分布式鎖 */ public static class JedisConn implements Closeable { private JedisCommands invoke; public JedisConn(JedisCommands invoke) { this.invoke = invoke; } /** * 設置一個必須是不存在的值 * * @param key - 關鍵字 * @param value * @return 1-成功 0-失敗 */ public Long setnx(String key, String value) { return invoke.setnx(key, value); } /** * 獲得一個值 * * @param key - 關鍵字 * @return */ public String get(String key) { return invoke.get(key); } /** * 更新一個值 * * @param key - 關鍵字 * @param value - 值 * @return */ public String set(String key, String value) { return invoke.set(key, value); } /** * 更新一個值,并返回更新前的老值 * * @param key - 關鍵字 * @param value - 值 * @return 更新前的老值 */ public String getSet(String key, String value) { return invoke.getSet(key, value); } /** * 刪除一個值 * * @param key - 關鍵字 */ public void del(String key) { invoke.del(key); } /** * 遞增一個值,并返回最新值 * * @param key - 關鍵字 * @return 最新值 */ public Long incr(String key) { return invoke.incr(key); } /** * 遞增一個值,并返回最新值 * * @param key - 關鍵字 * @return 最新值 */ public Long incr(String key, long total) { return invoke.incrBy(key, total); } /** * 設置過期時間 * * @param key - 關鍵字 * @param expireTime - 過期時間,毫秒 * @return */ public Long expire(String key, long expireTime) { return invoke.pexpire(key, expireTime); } private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX";//NX是不存在時才set private static final String SET_WITH_EXPIRE_TIME = "PX";//默認毫秒, 解釋:EX是秒,PX是毫秒 /** * 嘗試獲取分布式鎖 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public boolean tryLock(String lockKey, String requestId, long expireTime) { String result = invoke.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } private static final Long RELEASE_SUCCESS = 1L; /** * 釋放分布式鎖 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public boolean unLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = evalScript(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } private Object evalScript(String script, List<String> keys, List<String> args) { return (invoke instanceof Jedis) ? ((Jedis)invoke).eval(script, keys, args) : ((JedisCluster)invoke).eval(script, keys, args); } public void close() { if (invoke instanceof Jedis) { ((Jedis) invoke).close(); } } } }
package com.xxx.arch.seq.core; import com.xxx.arch.seq.client.redis.JedisConfig; import com.xxx.arch.seq.task.ContinuationOfLifeTask; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; /** * 基于redis 的分布式鎖 */ @Slf4j public final class DistributedLock { //續命任務延遲隊列 private static final DelayQueue<ContinuationOfLifeTask> QUEUE = new DelayQueue<>(); //續命任務映射緩存 private static final Map<String, ContinuationOfLifeTask> CACHE = new ConcurrentHashMap<>(); //延長鎖時間的守護線程 private static final ExecutorService CONTINUATION_OF_LIFE_EXECUTOR = Executors.newSingleThreadExecutor(); private static final long TIMEOUT = 1000; //限制最大長度 private static final int SIZE = 5000; static { /** * 延長鎖時間的核心線程代碼 */ CONTINUATION_OF_LIFE_EXECUTOR.execute(() -> { while (true){ //獲取優先級最高的任務 ContinuationOfLifeTask task; try { task = QUEUE.take(); } catch (InterruptedException e) { continue; } if (task == null){ continue; } //驗證是否活躍 long nowTime = System.currentTimeMillis(); if (task.isActive() && !task.isDiscarded(nowTime)){ //是否可以執行 if (task.isExecute(nowTime)){ task.execute(); //驗證是否還需要續命 if (task.isActive() && task.checkCount()){ QUEUE.add(task); }else { //清理不需要任務的緩存 CACHE.remove(task.getId()); } }else { //清理不需要任務的緩存 //如果是時間沒到不能執行的 不需要刪除,一般不存在 if (nowTime >= task.getEndTime()){ CACHE.remove(task.getId()); } } }else { //清理過期的或者不活躍的任務 CACHE.remove(task.getId()); } } }); } private DistributedLock(){} /** * 獲得分布式鎖 * * @param lockKey - 分布式鎖的key,保證全局唯一 * @param requestId - 本次請求的唯一ID,可用UUID等生成 * @param expireTime - 鎖獲取后,使用的最長時間,毫秒 * @param flagCount - 延續鎖的次數 * @return - 是否成功獲取鎖 */ public static boolean getDistributeLock(String lockKey, String requestId, long expireTime,int flagCount) { JedisConfig.JedisConn conn = null; try { conn = JedisConfig.getInstance().getConn(); //獲取鎖 if (QUEUE.size() < SIZE && conn.tryLock(lockKey, requestId, expireTime)){ //創建一個續命任務 ContinuationOfLifeTask task = ContinuationOfLifeTask.build(lockKey, requestId, expireTime, flagCount); //如果放入隊列超時 或者失敗 if (!QUEUE.offer(task, TIMEOUT, TimeUnit.MILLISECONDS)){ //釋放鎖 releaseDistributeLock(lockKey, requestId); //返回鎖獲取失敗 return false; } //設置緩存 CACHE.put(lockKey + requestId, task); return true; } return false; } finally { if (conn != null) { conn.close(); } } } /** * 獲取分布式鎖 * 默認是延長3次鎖壽命 * @param lockKey 分布式鎖的key,保證全局唯一 * @param requestId 本次請求的唯一ID,可用UUID等生成 * @param expireTime 鎖獲取后,使用的最長時間,毫秒 * @return */ public static boolean getDefaultDistributeLock(String lockKey, String requestId, long expireTime) { return getDistributeLock(lockKey, requestId, expireTime, 3); } /** * 獲取永久分布式鎖(默認24小時) * 使用時候記得一定要釋放鎖 * @param lockKey * @param requestId * @return */ public static boolean getPermanentDistributedLock(String lockKey, String requestId){ return getDistributeLock(lockKey, requestId, 10000, 6 * 60 * 24); } /** * 釋放分布式鎖 * * @param lockKey - 分布式鎖的key,保證全局唯一 * @param requestId - 本次請求的唯一ID,可用UUID等生成 * @return */ public static boolean releaseDistributeLock(String lockKey, String requestId) { JedisConfig.JedisConn conn = null; try { ContinuationOfLifeTask task = CACHE.remove(lockKey + requestId); if (task != null){ task.setActive(false); QUEUE.remove(task); } conn = JedisConfig.getInstance().getConn(); return conn.unLock(lockKey, requestId); } finally { if (conn != null) { conn.close(); } } } }
package com.xxx.arch.seq.task; import com.xxx.arch.seq.client.redis.JedisConfig; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 續命任務類 */ @Slf4j public class ContinuationOfLifeTask implements Delayed { private String id; //結束時間 即為需要續命的時間 private long endTime; //是否還存活 private volatile boolean active; //鎖的key private String lockKey; //鎖超時時間 private long timeout; //鎖的持續時間 private long expireTime; //鎖的續命次數 -1 代表無限 private int flagCount; //續命次數統計 count 不能大于 flagCount private int count; private ContinuationOfLifeTask(String id, String lockKey, long expireTime, long endTime, long timeout, int flagCount) { this.id = id; this.lockKey = lockKey; this.expireTime = expireTime; this.endTime = endTime; this.timeout = timeout; this.flagCount = flagCount; this.active = true; this.count = 0; } public void execute() { //該續命任務是否還存活 if (active) { JedisConfig.JedisConn conn = null; // 當前次數是否小于指定續命次數 // 當前時間是否大于結束時間 if (flagCount > count) { //重試次數 int retryCount = 0; // 當前時間是否大于過期時間 while (System.currentTimeMillis() >= endTime && retryCount < 3) { try { // 續命延期鎖的過期時間 (conn = JedisConfig.getInstance().getConn()).expire(lockKey, expireTime); long expiration = expireTime / 10; //保證最少提前100毫秒 timeout = System.currentTimeMillis() + expireTime; //更新結束時間 endTime = timeout - (expiration > 100 ? expiration : 100); //增加執行次數 count++; if (log.isDebugEnabled()) { log.debug("【續命】鎖關鍵字:{},續期:{}毫秒,計數:{}", lockKey, expireTime, count); } break; } catch (Exception e) { try { log.error(e.getMessage(), e); retryCount++; Thread.sleep(100L); } catch (InterruptedException ie) { log.error(e.getMessage(), e); } } finally { if (conn != null) { conn.close(); } } } } } } /** * 是否可以執行 必須是活躍且執行次數沒有到最大值 * 且時間沒有過期的任務才能執行 * * @return */ public boolean isExecute(long nowTime) { return nowTime >= endTime && nowTime <= timeout && flagCount >= count; } /** * 是否丟棄 * * @return */ public boolean isDiscarded(long nowTime) { return nowTime > timeout || flagCount <= count; } public boolean checkCount() { return count < flagCount; } public static final ContinuationOfLifeTask build(String lockKey, String requestId, long expireTime, int flagCount) { if (StringUtils.isAnyBlank(lockKey, requestId)) { throw new IllegalArgumentException("lockKey Can't be blank !"); } //校驗入參如果鎖定時間低于 1000 毫秒 延長到 1000 毫秒 if (expireTime < 1000) { expireTime = 1000; } //校驗 鎖的續命次數 如果小于 -1 則默認等于3 if (flagCount < -1) { flagCount = 3; } long expiration = expireTime / 10; //保證最少提前100毫秒 long timeout = System.currentTimeMillis() + expireTime; long endTime = timeout - (expiration > 500 ? expiration : 500); return new ContinuationOfLifeTask(lockKey + requestId, lockKey, expireTime, endTime, timeout, flagCount); } public long getEndTime() { return endTime; } public ContinuationOfLifeTask setEndTime(long endTime) { this.endTime = endTime; return this; } public boolean isActive() { return active; } public ContinuationOfLifeTask setActive(boolean active) { this.active = active; return this; } public String getLockKey() { return lockKey; } public ContinuationOfLifeTask setLockKey(String lockKey) { this.lockKey = lockKey; return this; } public long getExpireTime() { return expireTime; } public ContinuationOfLifeTask setExpireTime(long expireTime) { this.expireTime = expireTime; return this; } public int getFlagCount() { return flagCount; } public ContinuationOfLifeTask setFlagCount(int flagCount) { this.flagCount = flagCount; return this; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public long getDelay(TimeUnit unit) { return unit.convert((endTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } }
package com.xxx.arch.seq.constant; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigService; import org.apache.commons.lang3.StringUtils; public class Constants { //apollo公共的ZK配置集群NameSpace public static final String ZK_NAME_SPACE = "33.zk"; public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence-redis"; // public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence"; public static final String ARCH_SEQ_ZOOKEEPER_CONNECT_STRING = getConfig(ZK_NAME_SPACE,"zk.address", ""); public static final String ARCH_SEQ_REDIS_NODES = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.nodes", ""); public static final String ARCH_SEQ_REDIS_SENTINEL_MASTER = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.sentinel.master", ""); public static final String ARCH_SEQ_REDIS_PASSWORD = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.common.key", ""); public static String getConfig(String nameSpace,String key,String defultValue){ if(StringUtils.isBlank(nameSpace)){ return ""; } Config config = ConfigService.getConfig(nameSpace); return config.getProperty(key,defultValue); } }
上述內容就是Jedis中怎么實現分布式鎖,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。