您好,登錄后才能下訂單哦!
這篇文章主要介紹了SpringBoot多線程@Async怎么用,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
導入:可以將大批量的數據insert操作采用多線程的方式并行執行
第三方服務的接口調用:由于存在個別第三方服務調用比較耗時的場景,此時就可以與自身服務的邏輯并行執行
簡而言之:接口中部份業務邏輯可以通過并行的方式來優化接口性能
@Configuration @EnableAsync public class TaskPoolConfig { @Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名 public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線程數(CPU核心數+1) executor.setCorePoolSize(10); //最大線程數(2*CPU核心數+1) executor.setMaxPoolSize(20); //緩沖隊列數 executor.setQueueCapacity(200); //允許線程空閑時間(單位:默認為秒) executor.setKeepAliveSeconds(60); //線程池名前綴 executor.setThreadNamePrefix("sub-thread-"); // 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator()); // 線程池對拒絕任務的處理策略:CallerRunsPolicy:不在新線程中執行任務,而是由調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
1.實現TaskDecorator接口
/** * 子線程裝飾器 * * @author Da Shuai * @date 2021-06-10 18:28:17 */ public class SubThreadTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } }
2.之前的線程池配置加如下代碼使其生效
// 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator());
思路:
實例化CountDownLatch對象,同時傳入x(線程數量:這個數量必須等于子線程數量)進行構造
每個子線程執行完畢后會調用countDown()方法
子線程邏輯后方調用await()方法
這樣線程計數器為0之前,主線程就一直處于pending狀態
主線程邏輯
new CountDownLatch(X) latch.await()
@Override @Transactional public void importExcel(File file) { CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); asyncManager.asyncSaveVote(voteDO); } //System.out.println(1/0); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
子線程邏輯
latch.countDown()
@Override @Async public void asyncSaveVote(VoteDO voteDO, CountDownLatch latch) { log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName()); log.info("當前線程為 {},保存開始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); latch.countDown(); log.info("當前線程為 {},保存結束", Thread.currentThread().getName()); }
日志
2021-06-11 16:31:08.653 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.653 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:31:08.653 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:31:08.655 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:31:08.655 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@42c3f403]
2021-06-11 16:31:08.655 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:31:08.676 INFO 27516 --- [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2021-06-11 16:31:08.894 INFO 27516 --- [nio-8080-exec-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2021-06-11 16:31:08.921 INFO 27516 --- [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 --- [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s開始
2021-06-11 16:31:08.921 INFO 27516 --- [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s開始
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存開始
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,休眠10s結束
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存開始
2021-06-11 16:31:18.921 INFO 27516 --- [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存開始
2021-06-11 16:31:19.080 DEBUG 27516 --- [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 --- [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.080 DEBUG 27516 --- [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:31:19.156 DEBUG 27516 --- [ sub-thread-1] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-1(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 --- [ sub-thread-3] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-3(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.156 DEBUG 27516 --- [ sub-thread-2] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-2(String), 2021-06-11T16:31:19.032(LocalDateTime), 2021-06-11T16:31:19.037(LocalDateTime)
2021-06-11 16:31:19.172 DEBUG 27516 --- [ sub-thread-3] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.178 DEBUG 27516 --- [ sub-thread-2] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.187 DEBUG 27516 --- [ sub-thread-1] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:31:19.224 INFO 27516 --- [ sub-thread-3] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-3,保存結束
2021-06-11 16:31:19.224 INFO 27516 --- [ sub-thread-1] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-1,保存結束
2021-06-11 16:31:19.224 INFO 27516 --- [ sub-thread-2] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-2,保存結束
2021-06-11 16:31:19.226 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : --------------返回內容----------------
2021-06-11 16:31:19.328 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:31:19.328 INFO 27516 --- [nio-8080-exec-1] com.zhdj.config.LogAspect : --------------返回內容----------------
思路:
1.子線程邏輯返回Future對象
2.主線程邏輯循環判斷每個子線程返回的Future對象isDone()是否為true
主線程邏輯
循環判斷future.isDone()是否為true
@Override @Transactional public void importExcel(File file) { List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 3; i++) { VoteDO voteDO = new VoteDO(); voteDO.setTitle(i + ""); Future future = asyncManager.asyncSaveVote(voteDO); futureList.add(future); } //檢查所有子線程是否均執行完畢 while (true) { boolean isAllDone = true; for (Future future : futureList) { if (null == future || !future.isDone()) { isAllDone = false; } } if (isAllDone) { log.info("所有子線程執行完畢"); break; } } }
子線程邏輯
返回Future對象
@Override public Future asyncSaveVote(VoteDO voteDO) { log.info("當前線程為 {},休眠10s開始", Thread.currentThread().getName()); try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("當前線程為 {},休眠10s結束", Thread.currentThread().getName()); log.info("當前線程為 {},保存開始", Thread.currentThread().getName()); voteDO.setDesc(Thread.currentThread().getName()); voteDao.insert(voteDO); log.info("當前線程為 {},保存結束", Thread.currentThread().getName()); //返回需要用AsyncResult類 return new AsyncResult<>(true); }
日志
2021-06-11 16:42:28.974 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.974 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求地址:http://localhost:8080/api/import
2021-06-11 16:42:28.974 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求方式:POST
2021-06-11 16:42:28.975 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法:com.zhdj.controller.ImportController.importExcel
2021-06-11 16:42:28.975 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : 請求類方法參數:[org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile@7e23bacc]
2021-06-11 16:42:28.975 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : ===============請求內容===============
2021-06-11 16:42:28.979 INFO 20492 --- [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 --- [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s開始
2021-06-11 16:42:28.979 INFO 20492 --- [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s開始
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,休眠10s結束
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存開始
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存開始
2021-06-11 16:42:38.980 INFO 20492 --- [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存開始
2021-06-11 16:42:38.981 DEBUG 20492 --- [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 --- [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.981 DEBUG 20492 --- [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Preparing: INSERT INTO vote ( title, `desc`, gmt_create, gmt_modified ) VALUES ( ?, ?, ?, ? )
2021-06-11 16:42:38.982 DEBUG 20492 --- [ sub-thread-5] com.zhdj.dao.VoteDao.insert : ==> Parameters: 1(String), sub-thread-5(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 --- [ sub-thread-4] com.zhdj.dao.VoteDao.insert : ==> Parameters: 0(String), sub-thread-4(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.982 DEBUG 20492 --- [ sub-thread-6] com.zhdj.dao.VoteDao.insert : ==> Parameters: 2(String), sub-thread-6(String), 2021-06-11T16:42:38.980(LocalDateTime), 2021-06-11T16:42:38.981(LocalDateTime)
2021-06-11 16:42:38.988 DEBUG 20492 --- [ sub-thread-5] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.989 INFO 20492 --- [ sub-thread-5] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-5,保存結束
2021-06-11 16:42:38.993 DEBUG 20492 --- [ sub-thread-6] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:38.993 INFO 20492 --- [ sub-thread-6] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-6,保存結束
2021-06-11 16:42:39.004 DEBUG 20492 --- [ sub-thread-4] com.zhdj.dao.VoteDao.insert : <== Updates: 1
2021-06-11 16:42:39.005 INFO 20492 --- [ sub-thread-4] com.zhdj.AsyncManagerImpl : 當前線程為 sub-thread-4,保存結束
2021-06-11 16:42:39.005 INFO 20492 --- [nio-8080-exec-2] com.zhdj.service.impl.VoteServiceImpl : 所有子線程執行完畢
2021-06-11 16:42:39.005 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : --------------返回內容----------------
2021-06-11 16:42:39.005 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : Response內容:null
2021-06-11 16:42:39.005 INFO 20492 --- [nio-8080-exec-2] com.zhdj.config.LogAspect : --------------返回內容----------------
暫時無解決方案,這是弊端
在項目中,當訪問其他人的接口較慢或者做耗時任務時,不想程序一直卡在耗時任務上,想程序能夠并行執行,我們可以使用多線程來并行的處理任務,也可以使用spring提供的異步處理方式@Async。
調用之后,不返回任何數據。
調用之后,返回數據,通過Future來獲取返回數據
使用@EnableAsync啟用異步注解
@Configuration @EnableAsync @Slf4j public class AsyncConfig{ }
在異步處理的方法dealNoReturnTask上添加注解@Async
@Component @Slf4j public class AsyncTask { @Async public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis()); } }
Test測試類:
@SpringBootTest(classes = SpringbootApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @Slf4j public class AsyncTest { @Autowired private AsyncTask asyncTask; @Test public void testDealNoReturnTask(){ asyncTask.dealNoReturnTask(); try { log.info("begin to deal other Task!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
日志打印結果為:
begin to deal other Task!
AsyncExecutorThread-1 deal No Return Task start
AsyncExecutorThread-1 deal No Return Task end at 1499751227034
從日志中我們可以看出,方法dealNoReturnTask()是異步執行完成的。
dealNoReturnTask()設置sleep 3s是為了模擬耗時任務
testDealNoReturnTask()設置sleep 10s是為了確認異步是否執行完成
異步調用返回數據,Future表示在未來某個點獲取執行結果,返回數據類型可以自定義
@Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }
測試類用isCancelled判斷異步任務是否取消,isDone判斷任務是否執行結束
@Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }
日志打印如下,我們可以看出任務一直在等待異步任務執行完畢,用future.get()來獲取異步任務的返回結果
begin to deal other Task!
wait async task to end ...
wait async task to end ...
wait async task to end ...
wait async task to end ...
deal async task is Done
return result is {"thread":"AsyncExecutorThread-1","time":1499752617330}
我們可以實現AsyncConfigurer接口,也可以繼承AsyncConfigurerSupport類來實現
在方法getAsyncExecutor()中創建線程池的時候,必須使用 executor.initialize(),
不然在調用時會報線程池未初始化的異常。
如果使用threadPoolTaskExecutor()來定義bean,則不需要初始化
@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //如果不初始化,導致找到不到執行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
異步異常處理類:
@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } }
異步處理異常類:
@Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }
在無返回值的異步調用中,異步處理拋出異常,AsyncExceptionHandler的handleUncaughtException()會捕獲指定異常,原有任務還會繼續運行,直到結束。
在有返回值的異步調用中,異步處理拋出異常,會直接拋出異常,異步任務結束,原有處理結束執行。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“SpringBoot多線程@Async怎么用”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。