您好,登錄后才能下訂單哦!
本篇內容介紹了“Java中對于并發問題的處理方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
首先我們一起回顧一些并發的場景
最基本的,我們要弄清楚什么的并發嘞?我簡單粗暴的理解就是:一段代碼,在同一時間段內,被多個線程同時處理的情況就是并發現象。下面簡單畫了個圖:
那么只要是并發現象就需要我們進行并發處理嗎?那肯定不是滴。我們就拿大家都能理解的訂單業務來舉例,比如說下面兩種簡單的場景:
對于C端業務來講,基本上是由一串隨機的序列號組成,可以為UUID、數字串、年月日商戶(加密)+隨機唯一序列號等等方式。這樣的目的也是為了保障商戶訂單量的安全,防止他人去進行惡意分析。
對于B端業務來講,基本上都是由商戶+年月日+順序遞增序列號的方式組成。這樣方便客戶方進行訂單的匯總以及后期的追溯業務。
以上兩種場景的區別基本上就是隨機唯一序列號和順序遞增序列號的區別。偽代碼如下:
public void addOrder() { // 1.獲取當前年月日以及商戶標識 String currentDate = "yyyyMMddHHmmss"; String businessman = "商戶標識"; // 2.獲取獲取序列號 long index = getIndex(); // 3.拼接訂單號 String orderNum = businessman + currentDate + index; // 4.生成訂單 save(訂單對象); }
那么對于C端的隨機唯一序列號來講,我認為肯定是沒必要進行并發控制的,只要寫一個生成隨機唯一序列號的算法就好了,這樣生成出來的訂單號必然是唯一的。
public String getIndex() { // 根據算法生成唯一序列號 return buildIndexUtils.build(); }
但對于B端的順序遞增序列號來講,就需要進行并發控制了。因為既然要保證順序遞增,我在生成當前序列號的同時就必然需要之前上一個單子的序列號是什么,因此我就必然需要一個地方去存儲這個序列號。偽代碼如下:
public String getIndex() { // 1.獲取當前商戶、當前單據已生成的最大序列號 Integer index = dao.getIndex(商戶, 單據) + 1; // 2.序列號 + 1 index = index++; // 3.修改當前商戶、當前單據已生成的最大序列號 dao.update(商戶, 單據, index); // 4.返回序列號 return index + ""; }
此時如果事務為可重復讀,Thread1開啟事務并獲取并修改序列號,此時在Thread1未提交事務之前Thread2開啟事務并獲取序列號。此時兩個線程獲取到的序列號必然是一致的,這樣就會出現訂單號重復的問題。
如果更換隔離級別呢?是否能夠解決這個問題?
讀已提交?同樣如果在Thread1提交事務之前Thread2就執行完第一步獲取最大序列號呢?一樣有問題。
讀未提交?一樣的呀,在兩個Thread都執行完第一步,但沒有執行update的情況。
串行化?那就和加同步鎖沒啥區別的,而且是阻塞式的。一堆請求占用數據庫連接阻塞在這里,如果出現資源耗盡的情況就比較嚴重了。
不用事務?這個如果遇到2中的場景也一樣的。
那么加鎖呢?
單機環境下我們可以選擇Synchronized或Lock來進行處理。眾所周知,JDK1.6之后就對Synchronized進行了改進,不再是單純的阻塞,而是先進行自旋處理,在一定程度上也達到了自旋節省資源的效果。但是Synchronized或Lock還是要根據實際情況來進行處理的。如果我們為了省事而使用Synchronized對事務代碼進行加鎖的話,首先我們要保證避免長事務的出現,否則響應超時了,而事務還沒有釋放,那就比較嚴重了,異常情況堪比鎖表。
分布式環境下我們可以依賴Redis或Zookeeper來實現分布式鎖。這里需要注意的是,如果要依賴Redis實現的話,盡可能保證Redis采用單實例或分片集群的方式進行部署。主從的部署方式在某種極端情況下出現節點宕機時會導致誤判的情況。畢竟Redis是AP性質的。
還可以通過數據庫來實現,比如通過select for update來實現行鎖、通過version字段實現樂觀鎖、添加唯一約束的方式。首先select for update實現行鎖和上面的串行化事務差別不大,都是數據庫連接的阻塞,不建議使用。而樂觀鎖和唯一約束的方案更適用于作為一個保底方案,否則人家并發請求的時候只有一個請求能成功,其他的都失敗。這樣的用戶體驗也不好。
最后我們能得出一個結論。是否進行并發控制要依據該并發操作是否會造成數據安全問題來決定的。好了,下面向大家分享一些在學習工作中對于并發問題的處理思路
由于請求重試導致的并發安全問題
在與第三方系統交互或者微服務內部跨模塊交互時,我們通常會采用HTTP或RPC等方式,并設置最大請求時間以及重試次數。因為我們絕對不允許因為下游服務的異常問題而拖累當前服務的正常運行。而通常情況下,最大請求時間也是根據兩個服務之間的實際業務以及下游接口進行多次測試而設定的,一般來說不會隨便的出現請求超時的情況。但是一旦下游業務的接口因為某種原因(比如網絡卡頓或者出現效率問題)導致請求超時的情況,就很有可能因為上游服務的重試而導致下游服務數據重復的問題。
這種情況從本質上來說也就是個重復消費的問題。我們只需要雙方配合做好冪等就好了。
1.首先,如果涉及到前端,比如說點擊前端的按鈕觸發業務并且調用下游服務的業務。這個時候既要考慮前端重復提交也要考慮后端的重復發送以及重復消費問題。前端最常用的方式就是做一個進度條或進行防抖處理,避免一個用戶頻繁點擊按鈕。
那么如果是多個用戶同時提交同一條數據呢?這個情況主要是在B端業務中出現,比如說多個用戶均具有這條數據的修改權限,此時也并發點擊按鈕提交了這條數據。一般來說,這種情況出現的概率還是極少數的,也不會有多少并發量。因此我們直接采用數據庫的樂觀鎖進行保底控制就好了,只允許一個人操作成功,其他人操作失敗并提示該數據已被修改。
/** * @param id 數據ID * @param status 數據的狀態 */ public void update(Long id, Integer status) { // 1.根據ID查詢數據 PO po = dao.select(id); // 2.判斷數據的狀態是否符合修改要求(這一步主要是應對兩個線程都進入Controller層,其中線程1剛好提交事務后,線程2開始事務的情況) if(!status.equals(po.getStatus())) { throw new TJCException("數據已被修改,請刷新后重試"); } // 3.修改數據(啟用樂觀鎖機制,主要應對線程1提交事務之前線程2開啟事務的情況) int i = dao.update("update table set xxx = ?, version = version + 1 where id = ? and version > ?"); if(i == 0) { throw new TJCException("數據已被修改,請刷新后重試"); } // 繼續執行下面業務 }
2.上游服務請求下游服務時,在請求頭或消息中添加消息唯一ID。下游服務第一次接收到這個消息后首先將消息保存在緩存中并根據測試結果設置合理的有效期(有效期盡可能比正常請求時間長個一兩分鐘就好)。這樣就可以攔截上述所說的重試導致的重復消費問題。
// 上游服務發送消息 public void request() { String messageId = "xxxx"; rpc.request(messageId, message); } // 下游服務消費消息 public void consume(String messageId, String message) { // 將messageId存儲在redis中, 單機環境也可以直接找個map去存或者存在Guava中 Boolean flag = stringRedisTemplate.opsForValue() .setIfAbsent(messageId, "1", 60, TimeUnit.SECONDS); if(!flag) { log.error("重復消息攔截"); return; } // 繼續執行下面業務 ..... // 事務完成后(提交/回滾),刪除標識 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCompletion(int status) { stringRedisTemplate.delete(messageId); } }); }
在這里是否有小伙伴會有這樣的一個疑問,如果重復發送的消息中messageId不一致或者上游服務接口本身就被調用了多次怎么辦?
(1)首先,我覺得在上游服務接口本身就被調用了多次的情況下,第一點中的第2步驟(判斷數據狀態)這種方式就可以把它攔截掉。
(2)其次,如果出現重復發送的消息中messageId不一致的情況,我認為這就屬于程序員問題了,可以不放在這里進行考慮。如果硬要考慮的話,貌似也沒什么更好的辦法,那就加鎖吧。
順序遞增訂單號問題
在開頭我們通過引用這個生成訂單號的例子分析了一些什么情況下需要進行并發處理問題,并且上面是采用加鎖方式處理的。那么是否還有其他的方式比加鎖更好一些呢?比較加鎖影響吞吐量呀,哈哈。非必要情況下,我是不會進行加鎖處理的,除非在定制開發的過程中,用戶的要求是能用就行,那就可以偷懶了哈哈,節省時間去摸魚!!!!
下面給大家分享一些我常用的一種方式:Redis+Lua。我們都知道操作內存肯定是比操作數據庫要更快一些的,那么我們可以干脆將各個單據的序列號添加到Redis中。并且訂單號是根據年月日來進行重置的,所以我們可以將序列號的過期時間設置為24小時。
偽代碼如下:
// 序列號的key可以設置為(模塊名:orderIndex:訂單類型:yyyyMMdd) String dateFormat = getCurrentDateFormat("yyyyMMdd"); // key String key = 模塊名 + ":" + orderIndex + ":" + 訂單類型 + ":" + dateFormat; String script = "if (redis.call('exists', KEYS[1]) == 0) then redis.call('setex', KEYS[1], ARGV[1], ARGV[2]) return 1 else return redis.call('incr', KEYS[1]) end"; DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(script); long count = stringRedisTemplate.execute(defaultRedisScript, Arrays.asList(key), (3600 * 24) + "", "1");
我們都清楚,Redis多指令執行是沒辦法保證原子性的。所以我們要借助Lua腳本將多個Redis執行以腳本的方式執行來保證多指令執行的原子性,再配合Redis基于內存以及單線程執行指令的優勢,可以代替鎖來賦予功能更大的吞吐量。
計數統計問題
在工作中我還做過這樣一個需求。首先通過消息隊列接收、主動拉取數據源的方式獲取用戶在實際業務中產生的源數據并根據設置的規則比對校驗生成符合條件的數據保存在數據庫中。并且對通過各個維度對生成的數據進行計數統計并推送下游單據。
比如說其中有一個統計維度為“在各個班的工作時間內,根據次數統計符合條件的數據并匯總推送下游單據”。那么要做這項業務,首先我們要對各個班的數據進行分別計數,當前班開始工作時同步開啟計數,結束工作時停止計數,當計數器達到設置的標準后,將這些數據進行統計處理后推送下游單據。
根據上面的業務,通常來說有兩種方式解決:
將班、計數量、數據ID等數據存儲在數據庫中,并對獲取數據、處理數據、計數、推送下游單據等操作統一加鎖進行處理,保證數據計數的準確性。
依然是通過Redis+Lua的方式進行處理。
最后通過實際的業務分析決定采用Redis+Lua的方式進行處理。只不過這次的Lua要寫相對復雜的業務了。
偽代碼如下:
/** * @param indexStdId 標準ID * @param currentTeamClassId 班ID * @param dataId 數據ID * @param count 計數要求 */ public List<Long> countMonitor(Long indexStdId, Long currentTeamClassId, Long dataId, Integer count) { StringBuilder countMonitorLua = new StringBuilder(); countMonitorLua.append("if (redis.call('hget', KEYS[1], KEYS[2]) == ARGV[2]) "); countMonitorLua.append("then "); countMonitorLua.append(" if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[3]) "); countMonitorLua.append(" then "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 0) "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" local list = redis.call('lrange', KEYS[4], 0, -1) "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" return list "); countMonitorLua.append(" else "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" redis.call('hincrby', KEYS[1], KEYS[3], 1) "); countMonitorLua.append(" return {} "); countMonitorLua.append(" end "); countMonitorLua.append("else "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 1) "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[2], ARGV[2]) "); countMonitorLua.append(" if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[4]) "); countMonitorLua.append(" then "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 0) "); countMonitorLua.append(" local list2 = redis.call('lrange', KEYS[4], 0, -1) "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" return list2 "); countMonitorLua.append(" else "); countMonitorLua.append(" return {} "); countMonitorLua.append(" end "); countMonitorLua.append("end "); DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(List.class); defaultRedisScript.setScriptText(countMonitorLua.toString()); List<String> keys = new ArrayList<>(); keys.add(COUNTMONITOR_HASH.replace("${indexStd}", indexStdId.toString())); keys.add(COUNTMONITOR_HASH_CURRENTTEAMCLASSID); keys.add(COUNTMONITOR_HASH_COUNT); keys.add(COUNTMONITOR_LIST.replace("${indexStd}", indexStdId.toString())); List dataIdList = stringRedisTemplate.execute(defaultRedisScript, keys, gapDataId.toString(), currentTeamClassId.toString(), (count - 1) + "", count + ""); List<Long> collect = null; if(!gapDataIdList.isEmpty()) { collect = (List<Long>) gapDataIdList.stream().map(o -> Long.valueOf(o.toString())).collect(Collectors.toList()); } return collect; }
以上代碼是根據我實際的業務代碼改編成的偽代碼,這個段代碼沒必要看懂哈,首先是偽代碼,其實這個業務比較復雜,我也沒寫注釋。更多的還是分享一下優化的處理思路:
首先計數量是由客戶定的,可以設置的很小也可以設置的很大。由于這一點考慮,我將計數分成的兩部分,一個是String類型的key做計數器,一個是List類型的key用來記錄正在被計數的數據ID。這個List有可能是一個大key。所以我們不會去頻繁的讀取它的數量進行判斷,而是通過讀取這個String類型的計數器來校驗計數。當計數符合條件后就將List取出來。這樣做的好處是節省了頻繁讀取大key的耗時(實際上Redis讀取大Key是非常耗時的,我們在實際開發中要時刻注意這一點)。
“Java中對于并發問題的處理方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。