您好,登錄后才能下訂單哦!
這篇文章給大家介紹springboot @Async 注解如何實現方法異步,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
處理大批量數據的時候,效率很慢。所以考慮一下使用多線程。
剛開始自己手寫的一套,用了線程池啟動固定的線程數進行跑批。但是后來老大考慮到自己手寫的風險不好控制,所以使用spring的方法。
這里沒有詳細介紹,只有簡單的demo,只會用,不懂原理:
package com.xxx.xxx.xxx; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 類功能說明:服務生產者啟動類 * <p> * <strong></strong> * </p> * * @version * @author * @since 1.8 */ @Configuration @EnableAsync public class Application extends SpringBootServletInitializer { @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設置核心線程數 executor.setCorePoolSize(5); // 設置最大線程數 executor.setMaxPoolSize(60); // 設置隊列容量 executor.setQueueCapacity(20); // 設置線程活躍時間(秒) executor.setKeepAliveSeconds(60); // 設置默認線程名稱 executor.setThreadNamePrefix("what-"); // 設置拒絕策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任務結束后再關閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }
springboot的App類,很簡單,就能使用很多東西。
package com.xxx.xxx.service.impl; import java.util.concurrent.Future; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import com.xxx.xxx.service.XXXAsyncService ; @Service public class XXXAsyncServiceImpl implements XXXAsyncService { @Async public Future<Long> rtn1() throws Exception { //do something //有返回值的時候,可以返回string,long之類的。 return new AsyncResult<>(1); } @Async public void rtn2() throws Exception { //do something //這個可以沒有返回值. } }
package com.xxx.xxx.controller; import java.util.concurrent.Future; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.xxx.xxx.service.XXXAsyncService; @RestController @RequestMapping(value="/xxx") public class XXXAsyncController { @Autowired private XXXAsyncService xxxAsyncService; /** * 這里調用異步方法 */ @RequestMapping(value = "/xxx") public void dodo() throws Exception { int threads = 10;//十個線程 List<Future<Long>> list = new ArrayList<>(); for(int i = 0;i < threads; i++){ //這里循環調用異步方法。 //如果存在大量數據,可以在這里把數據切片,然后循環調用,分批處理數據。效率杠杠的。 list .add(xxxAsyncService.rtn1()); } long count = 0; for(Future<Long> l : tsfCountList) { //異步調用需要返回值的時候,這里可以把返回值都放入到list集合中,然后可以統一處理。 這里的get()是阻塞的,因為需要所以異步方法返回,在繼續執行。 count += l.get(); } System.out.println("調用次數:" + count); } }
這些代碼全是手寫,記錄下來,以后用的時候,省的忘了,查起來麻煩。。
@Configuration @EnableAsync public class SpringAsyncConfig { ... }
默認情況下,@EnableAsync檢測Spring的@Async注釋和EJB 3.1 javax. EJB .異步;此選項還可用于檢測其他用戶定義的注釋類型。(也可以在SpringBoot的啟動類上直接加@EnableAsync注解)
在 Spring 中,用 @Async 注解指定的方法,該方法被調用時會以異步的方式執行。而如果沒有在 @Async 注解中指定線程池,就會使用默認的線程池。默認的線程池為 SimpleAsyncTaskExecutor 。
該線程池不會復用線程,每有一個新任務被提交,該線程池就會創建一個新的線程實例用于執行任務。下面為相關的代碼:
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
而如果想要指定線程池,可以通過在 @Async 注解中的 value 參數中指定所要使用的線程池的 Bean Name 。另一種方法是是一個實現了 AsyncConfigurer 接口或是繼承其默認適配器類 AsyncConfigurerSupport 的配置類,這樣 @Async 注解的方法就會使用指定的自定義的線程池。
使用@Async注解的話采用的是springBoot默認的線程池,不過一般我們會自定義線程池(因為比較靈活),配置方式有:
使用 xml 文件配置的方式
使用Java代碼結合@Configuration進行配置(推薦使用)
package com.deppon.ptos.load.config; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @Description: 異步線程管理 * @Author: LYH * @CreateDate: 2019/6/27 8:54 * @Version: 1.0 * @JDK: 1.8 */ @Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(corePoolSize); //配置最大線程數 executor.setMaxPoolSize(maxPoolSize); //配置隊列大小 executor.setQueueCapacity(queueCapacity); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; } }
package com.deppon.ptos.load.config; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * @Description: 打印異步線程的執行情況 使用Callbale Future 來返回線程的信息 * @Author: 633805 LYH * @CreateDate: 2019/6/27 8:59 * @Version: 1.0 * @JDK: 1.8 */ @Component @Slf4j public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
使用:
@Async("asyncServiceExecutor")
到這一步,異步就算開啟了。
下面主要說一說錯誤的
如下方式會使@Async失效
異步方法使用static修飾
異步類沒有使用@Component注解(或其他注解)導致spring無法掃描到異步類
異步方法不能與被調用的異步方法在同一個類中
類中需要使用@Autowired或@Resource等注解自動注入,不能自己手動new對象
如果使用SpringBoot框架必須在啟動類中增加@EnableAsync注解
關于springboot @Async 注解如何實現方法異步就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。