您好,登錄后才能下訂單哦!
本篇內容介紹了“Java ScheduledThreadPoolExecutor的坑如何解決”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
這個坑就是如果ScheduledThreadPoolExecutor
中執行的任務出錯拋出異常后,不僅不會打印異常堆棧信息,同時還會取消后面的調度, 直接看例子。
@Test public void testException() throws InterruptedException { // 創建1個線程的調度任務線程池 ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); // 創建一個任務 Runnable runnable = new Runnable() { volatile int num = 0; @Override public void run() { num ++; // 模擬執行報錯 if(num > 5) { throw new RuntimeException("執行錯誤"); } log.info("exec num: [{}].....", num); } }; // 每隔1秒鐘執行一次任務 scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS); Thread.sleep(10000); }
運行結果:
只執行了5次后,就不打印,不執行了,因為報錯了
任務報錯,也沒有打印一次堆棧,更導致調度任務取消,后果十分嚴重。
解決方法也非常簡單,只要通過try catch捕獲異常即可。
運行結果:
看到不僅打印了異常堆棧,而且也會進行周期性的調度。
更好的建議可以在自己的項目中封裝一個包裝類,要求所有的調度都提交通過我們統一的包裝類, 如下代碼:
@Slf4j public class RunnableWrapper implements Runnable { // 實際要執行的線程任務 private Runnable task; // 線程任務被創建出來的時間 private long createTime; // 線程任務被線程池運行的開始時間 private long startTime; // 線程任務被線程池運行的結束時間 private long endTime; // 線程信息 private String taskInfo; private boolean showWaitLog; /** * 執行間隔時間多久,打印日志 */ private long durMs = 1000L; // 當這個任務被創建出來的時候,就會設置他的創建時間 // 但是接下來有可能這個任務提交到線程池后,會進入線程池的隊列排隊 public RunnableWrapper(Runnable task, String taskInfo) { this.task = task; this.taskInfo = taskInfo; this.createTime = System.currentTimeMillis(); } public void setShowWaitLog(boolean showWaitLog) { this.showWaitLog = showWaitLog; } public void setDurMs(long durMs) { this.durMs = durMs; } // 當任務在線程池排隊的時候,這個run方法是不會被運行的 // 但是當任務結束了排隊,得到線程池運行機會的時候,這個方法會被調用 // 此時就可以設置線程任務的開始運行時間 @Override public void run() { this.startTime = System.currentTimeMillis(); // 此處可以通過調用監控系統的API,實現監控指標上報 // 用線程任務的startTime-createTime,其實就是任務排隊時間 // 這邊打印日志輸出,也可以輸出到監控系統中 if(showWaitLog) { log.info("任務信息: [{}], 任務排隊時間: [{}]ms", taskInfo, startTime - createTime); } // 接著可以調用包裝的實際任務的run方法 try { task.run(); } catch (Exception e) { log.error("run task error", e); throw e; } // 任務運行完畢以后,會設置任務運行結束的時間 this.endTime = System.currentTimeMillis(); // 此處可以通過調用監控系統的API,實現監控指標上報 // 用線程任務的endTime - startTime,其實就是任務運行時間 // 這邊打印任務執行時間,也可以輸出到監控系統中 if(endTime - startTime > durMs) { log.info("任務信息: [{}], 任務執行時間: [{}]ms", taskInfo, endTime - startTime); } } }
使用:
我們還可以在包裝類里面封裝各種監控行為,如本例打印日志執行時間等。
那大家有沒有想過為什么任務出錯會導致異常無法打印,甚至調度都取消了呢?讓我們從源碼出發,一探究竟。
1.下面是調度任務的入口方法。
// ScheduledThreadPoolExecutor#scheduleAtFixedRate public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 將執行任務和參數包裝成ScheduledFutureTask對象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延遲執行 delayedExecute(t); return t; }
這個方法主要做了兩個事情:
將執行任務和參數包裝成ScheduledFutureTask對象
調用delayedExecute
方法延遲執行任務
2.延遲或周期性任務的主要執行方法, 主要是將任務丟到隊列中,后續由工作線程獲取執行。
// ScheduledThreadPoolExecutor#delayedExecute private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 將任務丟到阻塞隊列中 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 開啟工作線程,去執行任務,或者從隊列中獲取任務執行 ensurePrestart(); } }
3.現在任務已經在隊列中了,我們看下任務執行的內容是什么,還記得前面的包裝對象ScheduledFutureTask
類,它的實現類是ScheduledFutureTask
,繼承了Runnable類。
// ScheduledFutureTask#run方法 public void run() { // 是不是周期性任務 boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期性任務的話, 直接調用一次下面的run else if (!periodic) ScheduledFutureTask.super.run(); // 如果是周期性任務,則調用runAndReset方法,如果返回true,繼續執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下次調度時間 setNextRunTime(); // 重新執行調度任務 reExecutePeriodic(outerTask); } }
這里的關鍵就是看ScheduledFutureTask.super.runAndReset()
方法是否返回true,如果是true的話繼續調度。
4.runAndReset方法也很簡單,關鍵就是看報異常如何處理。
// FutureTask#runAndReset protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; // 是否繼續下次調度,默認false boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 執行任務 c.call(); // 執行成功的話,設置為true ran = true; // 異常處理,關鍵點 } catch (Throwable ex) { // 不會修改ran的值,最終是false,同時也不打印異常堆棧 setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 返回結果 return ran && s == NEW; }
關鍵點ran變量,最終返回是不是下次繼續調度執行
如果拋出異常的話,可以看到不會修改ran為true。
“Java ScheduledThreadPoolExecutor的坑如何解決”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。