您好,登錄后才能下訂單哦!
背景
某些場景下,有可能一個方法不能被并發執行,有可能一個方法的特定參數不能被并發執行。比如不能將一個消息發送多次,創建緩存最好只創建一次等等。為了實現上面的目標我們就需要采用同步機制來完成,但同步的邏輯如何實現呢,是否會影響到原有邏輯呢?
嵌入式
這里講的嵌入式是說獲取鎖以及釋放鎖的邏輯與業務代碼耦合在一起,又分分布式與單機兩種不同場景的不同實現。
單機版本
下面方法,每個productId不允許并發訪問,所以這里可以直接用synchronized來鎖定不同的參數。
@Service public class ProductAppService { public void invoke(Integer productId) { synchronized (productId) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("productId:" + productId+" time:"+new Date()); } } }
測試腳本:三個相同的參數0,兩個不同的參數1和2,通過一個多線程的例子來模似。如果有并發請求的測試工具可能效果會更好。
private void testLock(){ ExecutorService executorService= Executors.newFixedThreadPool(5); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(1); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(2); } }); executorService.shutdown(); }
測試結果如下,0,1,2三個請求未被阻塞,后面的兩個0被阻塞。
分布式版本
分布式的除了鎖機制不同之外其它的測試方法相同,這里只貼出鎖的部分:
public void invoke2(Integer productId) { RLock lock=this.redissonService.getRedisson().getLock(productId.toString()); try { boolean locked=lock.tryLock(3000,500, TimeUnit.MILLISECONDS); if(locked){ Thread.sleep(1000); System.out.print("productId:" + productId+" time:"+new Date()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
嵌入式的缺點
比較明顯的就是鎖的邏輯與業務邏輯混合在一起,增加了程序復雜度而且也不利于鎖機制的更替。
注解式
能否將鎖的邏輯隱藏起來,通過在特定方法上增加注解來實現呢?就像Spring Cache的應用。當然是可以的,這里我們只需要解決如下三個問題:
定義注解
鎖一般有如下幾個屬性:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RequestLockable { String[] key() default ""; long maximumWaiteTime() default 2000; long expirationTime() default 1000; TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
實現注解
由于我們的目標是注解式鎖,這里通過AOP的方式來實現,具體依賴AspectJ,創建一個攔截器:
public abstract class AbstractRequestLockInterceptor { protected abstract Lock getLock(String key); protected abstract boolean tryLock(long waitTime, long leaseTime, TimeUnit unit,Lock lock) throws InterruptedException; /** * 包的表達式目前還有待優化 TODO */ @Pointcut("execution(* com.chanjet.csp..*(..)) && @annotation(com.chanjet.csp.product.core.annotation.RequestLockable)") public void pointcut(){} @Around("pointcut()") public Object doAround(ProceedingJoinPoint point) throws Throwable{ Signature signature = point.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); String targetName = point.getTarget().getClass().getName(); String methodName = point.getSignature().getName(); Object[] arguments = point.getArgs(); if (method != null && method.isAnnotationPresent(RequestLockable.class)) { RequestLockable requestLockable = method.getAnnotation(RequestLockable.class); String requestLockKey = getLockKey(method,targetName, methodName, requestLockable.key(), arguments); Lock lock=this.getLock(requestLockKey); boolean isLock = this.tryLock(requestLockable.maximumWaiteTime(),requestLockable.expirationTime(), requestLockable.timeUnit(),lock); if(isLock) { try { return point.proceed(); } finally { lock.unlock(); } } else { throw new RuntimeException("獲取鎖資源失敗"); } } return point.proceed(); } private String getLockKey(Method method,String targetName, String methodName, String[] keys, Object[] arguments) { StringBuilder sb = new StringBuilder(); sb.append("lock.").append(targetName).append(".").append(methodName); if(keys != null) { String keyStr = Joiner.on(".").skipNulls().join(keys); if(!StringUtils.isBlank(keyStr)) { LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); String[] parameters =discoverer.getParameterNames(method); ExpressionParser parser = new SpelExpressionParser(); Expression expression = parser.parseExpression(keyStr); EvaluationContext context = new StandardEvaluationContext(); int length = parameters.length; if (length > 0) { for (int i = 0; i < length; i++) { context.setVariable(parameters[i], arguments[i]); } } String keysValue = expression.getValue(context, String.class); sb.append("#").append(keysValue); } } return sb.toString(); } }
注意如下幾點:
為什么會存在抽象方法?那是為下面的將注解機制與具體的鎖實現解耦服務的,目的是希望注解式鎖能夠得到復用也便于擴展。鎖的key生成規則是什么?前綴一般是方法所在類的完全限定名,方法名稱以及spel表達式來構成,避免重復。SPEL表達式如何支持?
LocalVariableTableParameterNameDiscoverer它在Spring MVC解析Controller的參數時有用到,可以從一個Method對象中獲取參數名稱列表。
SpelExpressionParser是標準的spel解析器,利用上面得來的參數名稱列表以及參數值列表來獲取真實表達式。
問題
基于aspectj的攔截器,@Pointcut中的參數目前未找到動態配置的方法,如果有解決方案的可以告訴我。
將注解機制與具體的鎖實現解耦
注解式鎖理論上應該與具體的鎖實現細節分離,客戶端可以任意指定鎖,可以是單機下的ReentrantLock也可以是基于redis的分布式鎖,當然也可以是基于zookeeper的鎖,基于此目的上面我們創建的AbstractRequestLockInterceptor這個攔截器是個抽象類。看下基于redis的分布式鎖的子類實現:
@Aspect public class RedisRequestLockInterceptor extends AbstractRequestLockInterceptor { @Autowired private RedissonService redissonService; private RedissonClient getRedissonClient(){ return this.redissonService.getRedisson(); } @Override protected Lock getLock(String key) { return this.getRedissonClient().getLock(key); } @Override protected boolean tryLock(long waitTime, long leaseTime, TimeUnit unit,Lock lock) throws InterruptedException { return ((RLock)lock).tryLock(waitTime,leaseTime,unit); } }
注解式鎖的應用
只需要在需要同步的方法上增加@RequestLockable,然后根據需要指定或者不指定key,也可以根據實際場景配置鎖等待時間以及鎖的生命周期。
@RequestLockable(key = {"#productId"}) public void invoke3(Integer productId) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("productId:" + productId+" time:"+new Date()); }
當然為了攔截器生效,我們需要在配置文件中配置上攔截器。
<bean class="com.product.api.interceptor.RedisRequestLockInterceptor"></bean> <aop:aspectj-autoproxy proxy-target-class="true"/>
注解式鎖的優點:鎖的邏輯與業務代碼完全分離,降低了復雜度。靈活的spel表達式可以靈活的構建鎖的key。支持多種鎖,可以隨意切換而不影響業務代碼。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。