您好,登錄后才能下訂單哦!
本篇內容主要講解“Java根據某個key加鎖怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java根據某個key加鎖怎么實現”吧!
日常開發中,有時候需要根據某個 key 加鎖,確保多線程情況下,對該 key 的加鎖和解鎖之間的代碼串行執行。
大家可以借助每個 key 對應一個 ReentrantLock ,讓同一個 key 的線程使用該 lock 加鎖;每個 key 對應一個 Semaphore ,讓同一個 key 的線程使用 Semaphore 控制同時執行的線程數。
接口定義
public interface LockByKey<T> { /** * 加鎖 */ void lock(T key); /** * 解鎖 */ void unlock(T key); }
每個 key 對應一個 ReentrantLock ,讓同一個 key 的線程使用該 lock 加鎖。
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; public class DefaultLockByKeyImpl<T> implements LockByKey<T> { private final Map<T, ReentrantLock> lockMap = new ConcurrentHashMap<>(); /** * 加鎖 */ @Override public void lock(T key) { // 如果key為空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能為空"); } // 獲取或創建一個ReentrantLock對象 ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); // 獲取鎖 lock.lock(); } /** * 解鎖 */ @Override public void unlock(T key) { // 如果key為空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能為空"); } // 從Map中獲取鎖對象 ReentrantLock lock = lockMap.get(key); // 獲取不到報錯 if (lock == null) { throw new IllegalArgumentException("key " + key + "尚未加鎖"); } // 其他線程非法持有不允許釋放 if (!lock.isHeldByCurrentThread()) { throw new IllegalStateException("當前線程尚未持有,key:" + key + "的鎖,不允許釋放"); } lock.unlock(); } }
注意事項:
(1)參數合法性校驗
(2)解鎖時需要判斷該鎖是否為當前線程持有
import com.google.common.collect.Lists; import org.junit.Test; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class DefaultLockByKeyImplTest { private final LockByKey<String> lockByKey = new DefaultLockByKeyImpl<>(); private final CountDownLatch countDownLatch = new CountDownLatch(7); private final ExecutorService executorService = Executors.newFixedThreadPool(10); @Test public void test() throws InterruptedException { List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d"); Set<String> executingKeySet = new HashSet<>(); for (int i = 0; i < keys.size(); i++) { String key = keys.get(i); int finalI = i; executorService.submit(() -> { lockByKey.lock(key); if (executingKeySet.contains(key)) { throw new RuntimeException("存在正在執行的 key:" + key); } executingKeySet.add(key); try { System.out.println("index:" + finalI + "對 [" + key + "] 加鎖 ->" + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { System.out.println("index:" + finalI + "釋放 [" + key + "] ->" + Thread.currentThread().getName()); lockByKey.unlock(key); executingKeySet.remove(key); countDownLatch.countDown(); } }); } countDownLatch.await(); } }
如果同一個 key 沒釋放能夠再次進入,會拋出異常。
也可以通過日志來觀察執行情況:
index:0對 [a] 加鎖 ->pool-1-thread-1 index:6對 [d] 加鎖 ->pool-1-thread-7 index:4對 [c] 加鎖 ->pool-1-thread-5 index:3對 [b] 加鎖 ->pool-1-thread-4 index:6釋放 [d] ->pool-1-thread-7 index:4釋放 [c] ->pool-1-thread-5 index:0釋放 [a] ->pool-1-thread-1 index:3釋放 [b] ->pool-1-thread-4 index:1對 [a] 加鎖 ->pool-1-thread-2 index:5對 [b] 加鎖 ->pool-1-thread-6 index:1釋放 [a] ->pool-1-thread-2 index:5釋放 [b] ->pool-1-thread-6 index:2對 [a] 加鎖 ->pool-1-thread-3 index:2釋放 [a] ->pool-1-thread-3
每個 key 對應一個 Semaphore ,讓同一個 key 的線程使用 Semaphore 控制同時執行的線程數。
import lombok.SneakyThrows; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; public class SimultaneousEntriesLockByKey<T> implements LockByKey<T> { private final Map<T, Semaphore> semaphores = new ConcurrentHashMap<>(); /** * 最大線程 */ private int allowed_threads; public SimultaneousEntriesLockByKey(int allowed_threads) { this.allowed_threads = allowed_threads; } /** * 加鎖 */ @Override public void lock(T key) { Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v); semaphore.acquireUninterruptibly(); } /** * 解鎖 */ @Override public void unlock(T key) { // 如果key為空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能為空"); } // 從Map中獲取鎖對象 Semaphore semaphore = semaphores.get(key); if (semaphore == null) { throw new IllegalArgumentException("key " + key + "尚未加鎖"); } semaphore.release(); if (semaphore.availablePermits() >= allowed_threads) { semaphores.remove(key, semaphore); } }
import com.google.common.collect.Lists; import org.junit.Test; import java.time.LocalDateTime; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class SimultaneousEntriesLockByKeyTest { private final int maxThreadEachKey = 2; private final LockByKey<String> lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey); private final CountDownLatch countDownLatch = new CountDownLatch(7); private final ExecutorService executorService = Executors.newFixedThreadPool(10); @Test public void test() throws InterruptedException { List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d"); Map<String, Integer> executingKeyCount = Collections.synchronizedMap(new HashMap<>()); for (int i = 0; i < keys.size(); i++) { String key = keys.get(i); int finalI = i; executorService.submit(() -> { lockByKey.lock(key); executingKeyCount.compute(key, (k, v) -> { if (v != null && v + 1 > maxThreadEachKey) { throw new RuntimeException("超過限制了"); } return v == null ? 1 : v + 1; }); try { System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "對 [" + key + "] 加鎖 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key)); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "釋放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1)); lockByKey.unlock(key); executingKeyCount.compute(key, (k, v) -> v - 1); countDownLatch.countDown(); } }); } countDownLatch.await(); } }
輸出:
time:2023-03-15T20:49:57.044195 ,index:6對 [d] 加鎖 ->pool-1-thread-7count:1
time:2023-03-15T20:49:57.058942 ,index:5對 [b] 加鎖 ->pool-1-thread-6count:2
time:2023-03-15T20:49:57.069789 ,index:1對 [a] 加鎖 ->pool-1-thread-2count:2
time:2023-03-15T20:49:57.042402 ,index:4對 [c] 加鎖 ->pool-1-thread-5count:1
time:2023-03-15T20:49:57.046866 ,index:0對 [a] 加鎖 ->pool-1-thread-1count:2
time:2023-03-15T20:49:57.042991 ,index:3對 [b] 加鎖 ->pool-1-thread-4count:2
time:2023-03-15T20:49:58.089557 ,index:0釋放 [a] ->pool-1-thread-1count:1
time:2023-03-15T20:49:58.082679 ,index:6釋放 [d] ->pool-1-thread-7count:0
time:2023-03-15T20:49:58.084579 ,index:4釋放 [c] ->pool-1-thread-5count:0
time:2023-03-15T20:49:58.083462 ,index:5釋放 [b] ->pool-1-thread-6count:1
time:2023-03-15T20:49:58.089576 ,index:3釋放 [b] ->pool-1-thread-4count:1
time:2023-03-15T20:49:58.085359 ,index:1釋放 [a] ->pool-1-thread-2count:1
time:2023-03-15T20:49:58.096912 ,index:2對 [a] 加鎖 ->pool-1-thread-3count:1
time:2023-03-15T20:49:59.099935 ,index:2釋放 [a] ->pool-1-thread-3count:0
到此,相信大家對“Java根據某個key加鎖怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。