您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關java中怎么實現異步處理,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
1.DeferredResult 加線程池 (DeferredResult 提供了超時、錯誤處理,功能非常完善,再加上多線程處理請求效果很不錯)
2.新開個定時任務線程池 定時輪詢當前任務列表 超時就停止(需要自己維護任務列表)Hystrix就是這種方案
3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法處理 前者拋出異常后者返回默認值
總結,其實線程池統一設置超時這個需求本身就是偽需求,線程執行任務時間本身就是參差不齊的,而且這個控制權應該交給Runable或Callable內部業務處理,不同的業務處理超時、異常、報警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相當豐富,建議在多線程處理的場景多使用這些api。
具體實現:
DeferredResult 先建個工具類。調用方使用execute方法,傳入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定義一些成員變量方便后期業務擴展使用)
然后在其他線程業務處理完設置結果,調用setResult方法,傳入msgId相同的DeferredResultDTO和result對象
/** * DeferredResult 工具類 * * @author tiancong * @date 2020/10/14 19:23 */ @UtilityClass @Slf4j public class DeferredResultUtil { private Map<DeferredResultDTO, DeferredResult<ResultVO<Object>>> taskMap = new ConcurrentHashMap<>(16); public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto) { return execute(dto, 5000L); } public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto, Long time) { if (taskMap.containsKey(dto)) { throw new BusinessException(String.format("msgId=%s 已經存在,請勿重發消息", dto.getMsgId())); } DeferredResult<ResultVO<Object>> deferredResult = new DeferredResult<>(time); deferredResult.onError((e) -> { taskMap.remove(dto); log.info("處理失敗 ", e); deferredResult.setResult(ResultVoUtil.fail("處理失敗")); }); deferredResult.onTimeout(() -> { taskMap.remove(dto); if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) { ExamController.getCURRENT_STUDENT().remove(dto.getMsgId()); } deferredResult.setResult(ResultVoUtil.fail("請求超時,請聯系工作人員!")); }); taskMap.putIfAbsent(dto, deferredResult); return deferredResult; } public void setResult(DeferredResultDTO dto, ResultVO<Object> resultVO) { if (taskMap.containsKey(dto)) { DeferredResult<ResultVO<Object>> deferredResult = taskMap.get(dto); deferredResult.setResult(resultVO); taskMap.remove(dto); } else { log.error("ERROR 未找到該消息msgId:{}", dto.getMsgId()); } } }
2. 新開個定時任務線程池 定時輪詢當前任務列表
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 2, r -> { Thread thread = new Thread(r); thread.setName("failAfter-%d"); thread.setDaemon(true); return thread; }); private static int timeCount; public static void main(String[] args) throws InterruptedException { ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor(); executorService.setCorePoolSize(4); executorService.setQueueCapacity(10); executorService.setMaxPoolSize(100); executorService.initialize(); // executorService.setAwaitTerminationSeconds(5); // executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS); executorService.setWaitForTasksToCompleteOnShutdown(true); Random random = new Random(); long start = System.currentTimeMillis(); List<ListenableFuture<Boolean>> asyncResultList = new ArrayList<>(); for (int i = 0; i < 100; i++) { ListenableFuture<Boolean> asyncResult = executorService.submitListenable(() -> { int r = random.nextInt(10); log.info("{} 開始睡{}s", Thread.currentThread().getName(), r); TimeUnit.SECONDS.sleep(r); log.info("{} 干完了 {}s", Thread.currentThread().getName(), r); //throw new RuntimeException("出現異常"); return true; }); asyncResult.addCallback(data -> { try { // 休息3毫秒模擬獲取到執行結果后的操作 TimeUnit.MILLISECONDS.sleep(3); log.info("{} 收到結果:{}", Thread.currentThread().getName(), data); } catch (Exception e) { e.printStackTrace(); } }, ex -> log.info("**異常信息**", ex)); asyncResultList.add(asyncResult); } System.out.println(String.format("總結耗時:%s ms", System.currentTimeMillis() - start)); // 守護進程 定時輪詢 終止超時的任務 scheduler.scheduleAtFixedRate(() -> { // 模擬守護進程 終止超過6s的任務 timeCount++; if (timeCount > 6) { for (ListenableFuture<Boolean> future : asyncResultList) { if (!future.isDone()) { log.error("future 因超時終止任務,{}", future); future.cancel(true); } } } }, 0, 1000, TimeUnit.MILLISECONDS); } }
額外補充:
CompletableFuture實現了CompletionStage接口,里面很多豐富的異步編程接口。
applyToEither方法是哪個先完成,就apply哪一個結果(但是兩個任務都會最終走完)
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { public static void main(String[] args) throws InterruptedException { // CompletableFuture<String> responseFuture = within( // createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS); // responseFuture // .thenAccept(T::send) // .exceptionally(throwable -> { // log.error("Unrecoverable error", throwable); // return null; // }); // // 注意 exceptionally是new 的CompletableFuture CompletableFuture<Object> timeoutCompletableFuture = timeoutAfter(1000, TimeUnit.MILLISECONDS).exceptionally(xxx -> "超時"); // 異步任務超時、異常處理 List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") // .map(x -> within( // createTaskSupplier(x), 3000, TimeUnit.MILLISECONDS) // .thenAccept(T::send) // .exceptionally(throwable -> { // log.error("Unrecoverable error", throwable); // return null; // })) .map(x -> CompletableFuture.anyOf(createTaskSupplier(x) , timeoutCompletableFuture)) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); // .map(x -> CompletableFuture.anyOf(createTaskSupplier(x) // , oneSecondTimeout).join()) // .collect(Collectors.toList()); System.out.println("-------結束------"); System.out.println(collect.toString()); } private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 2, r -> { Thread thread = new Thread(r); thread.setName("failAfter-%d"); thread.setDaemon(true); return thread; }); private static String send(String s) { log.info("最終結果是{}", s); return s; } private static CompletableFuture<String> createTaskSupplier(String x) { return CompletableFuture.supplyAsync(getStringSupplier(x)) .exceptionally(Throwable::getMessage); } private static Supplier<String> getStringSupplier(String text) { return () -> { System.out.println("開始 " + text); if ("1".equals(text)) { throw new RuntimeException("運行時錯誤"); } try { if ("5".equals(text)) { TimeUnit.SECONDS.sleep(5); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("結束 " + text); return text + "號"; }; } private static <T> CompletableFuture<T> within(CompletableFuture<T> future, long timeout, TimeUnit unit) { final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit); // 哪個先完成 就apply哪一個結果 這是一個關鍵的API return future.applyToEither(timeoutFuture, Function.identity()); } private static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) { CompletableFuture<T> result = new CompletableFuture<T>(); // timeout 時間后 拋出TimeoutException 類似于sentinel / watcher scheduler.schedule(() -> result.completeExceptionally(new TimeoutException("超時:" + timeout)), timeout, unit); // return CompletableFuture.supplyAsync(()-> (T)"另一個分支任務"); return result; } }
以上就是java中怎么實現異步處理,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。