您好,登錄后才能下訂單哦!
這篇文章給大家介紹怎么在Java中自定義線程池,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
public interface IThreadPool<Job extends Runnable> { /** * 關閉線程池 */ public void shutAlldown(); /** * 執行任務 * * @param job 任務 */ public void execute(Job job); /** * 添加工作者 * * @param addNum 添加數 */ public void addWorkers(int addNum); /** * 減少工作者 * * @param reduceNum 減少數目 */ public void reduceWorkers(int reduceNum); }
線程池的核心是維護了1個任務列表和1個工作者列表。
import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> { // 默認線程數 private static int DEAFAULT_SIZE = 5; // 最大線程數 private static int MAX_SIZE = 10; // 任務列表 private LinkedList<Job> tasks = new LinkedList<Job>(); // 工作線程列表 private List<Worker> workers = Collections .synchronizedList(new ArrayList<Worker>()); /** * 默認構造函數 */ public XYThreadPool() { initWokers(DEAFAULT_SIZE); } /** * 執行線程數 * * @param threadNums 線程數 */ public XYThreadPool(int workerNum) { workerNum = workerNum <= 0 ? DEAFAULT_SIZE : workerNum > MAX_SIZE ? MAX_SIZE : workerNum; initWokers(workerNum); } /** * 初始化線程池 * * @param threadNums 線程數 */ public void initWokers(int threadNums) { for (int i = 0; i < threadNums; i++) { Worker worker = new Worker(); worker.start(); workers.add(worker); } // 添加關閉鉤子 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutAlldown(); } }); } @Override public void shutAlldown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void execute(Job job) { synchronized (tasks) { // 提交任務就是將任務對象加入任務隊列,等待工作線程去處理 tasks.addLast(job); tasks.notifyAll(); } } @Override public void addWorkers(int addNum) { // 新線程數必須大于零,并且線程總數不能大于最大線程數 if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) { initWokers(addNum); } else { System.out.println("addNum too large"); } } @Override public void reduceWorkers(int reduceNum) { if ((workers.size() - reduceNum <= 0)) System.out.println("thread num too small"); else { // 暫停指定數量的工作者 int count = 0; while (count != reduceNum) { for (Worker w : workers) { w.shutdown(); count++; } } } } /** * 工作線程 */ class Worker extends Thread { private volatile boolean flag = true; @Override public void run() { while (flag) { Job job = null; // 加鎖(若只有一個woker可不必加鎖,那就是所謂的單線程的線程池,線程安全) synchronized (tasks) { // 任務隊列為空 while (tasks.isEmpty()) { try { // 阻塞,放棄對象鎖,等待被notify喚醒 tasks.wait(); System.out.println("block when tasks is empty"); } catch (InterruptedException e) { e.printStackTrace(); } } // 不為空取出任務 job = tasks.removeFirst(); System.out.println("get job:" + job + ",do biz"); job.run(); } } } public void shutdown() { flag = false; } } }
(1) 當調用wait()方法時線程會放棄對象鎖,進入等待此對象的等待鎖定池,只有針對此對象調用notify()方法后本線程才進入對象鎖定池準備
(2) Object的方法:void notify(): 喚醒一個正在等待該對象的線程。void notifyAll(): 喚醒所有正在等待該對象的線程。
notifyAll使所有原來在該對象上等待被notify的線程統統退出wait狀態,變成等待該對象上的鎖,一旦該對象被解鎖,它們會去競爭。
notify只是選擇一個wait狀態線程進行通知,并使它獲得該對象上的鎖,但不驚動其它同樣在等待被該對象notify的線程們,當第一個線程運行完畢以后釋放對象上的鎖,此時如果該對象沒有再次使用notify語句,即便該對象已經空閑,其他wait狀態等待的線程由于沒有得到該對象的通知,繼續處在wait狀態,直到這個對象發出一個notify或notifyAll,它們等待的是被notify或notifyAll,而不是鎖。
每調用一次就會創建一個擁有10個線程工作者的線程池。
public class TestService1 { public static void main(String[] args) { // 啟動10個線程 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10); pool.execute(new Runnable() { @Override public void run() { System.out.println("====1 test===="); } }); } } public class TestService2 { public static void main(String[] args) { // 啟動10個線程 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10); pool.execute(new Runnable() { @Override public void run() { System.out.println("====2 test===="); } }); } }
在項目中所有的線程調用,一般都共用1個固定工作者數大小的線程池。
import javax.annotation.PostConstruct; import org.springframework.stereotype.Component; import com.xy.pool.XYThreadPool; /** * 統一線程池管理類 */ @Component public class XYThreadManager { private XYThreadPool<Runnable> executorPool; @PostConstruct public void init() { executorPool = new XYThreadPool<Runnable>(10); } public XYThreadPool<Runnable> getExecutorPool() { return executorPool; } } import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service("testService3") public class TestService3 { @Autowired private XYThreadManager threadManager; public void test() { threadManager.getExecutorPool().execute(new Runnable() { @Override public void run() { System.out.println("====3 test===="); } }); } } import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service("testService4") public class TestService4 { @Autowired private XYThreadManager threadManager; public void test() { threadManager.getExecutorPool().execute(new Runnable() { @Override public void run() { System.out.println("====4 test===="); } }); } } import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class TestMain { @SuppressWarnings("resource") public static void main(String[] args) { ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml"); TestService3 t3 = (TestService3) atc.getBean("testService3"); t3.test(); TestService4 t4 = (TestService4) atc.getBean("testService4"); t4.test(); } }
補充:論如何優雅的自定義ThreadPoolExecutor線程池
線程池想必大家也都用過,JDK的Executors 也自帶一些線程池。但是不知道大家有沒有想過,如何才是最優雅的方式去使用過線程池嗎? 生產環境要怎么去配置自己的線程池才是合理的呢?
今天周末,剛好有時間來總結一下自己所認為的'優雅', 如有問題歡迎大家指正。
要使用好線程池,那么一定要遵循幾個規則:
線程池相關參數配置
利用Hook嵌入你的行為
線程池的關閉
線程池大小的設置
這其實是一個面試的考點,很多面試官會問你線程池coreSize 的大小來考察你對于線程池的理解。
首先針對于這個問題,我們必須要明確我們的需求是計算密集型還是IO密集型,只有了解了這一點,我們才能更好的去設置線程池的數量進行限制。
1、計算密集型:
顧名思義就是應用需要非常多的CPU計算資源,在多核CPU時代,我們要讓每一個CPU核心都參與計算,將CPU的性能充分利用起來,這樣才算是沒有浪費服務器配置,如果在非常好的服務器配置上還運行著單線程程序那將是多么重大的浪費。對于計算密集型的應用,完全是靠CPU的核數來工作,所以為了讓它的優勢完全發揮出來,避免過多的線程上下文切換,比較理想方案是:
線程數 = CPU核數+1,也可以設置成CPU核數*2,但還要看JDK的版本以及CPU配置(服務器的CPU有超線程)。
一般設置CPU * 2即可。
2、IO密集型
我們現在做的開發大部分都是WEB應用,涉及到大量的網絡傳輸,不僅如此,與數據庫,與緩存間的交互也涉及到IO,一旦發生IO,線程就會處于等待狀態,當IO結束,數據準備好后,線程才會繼續執行。因此從這里可以發現,對于IO密集型的應用,我們可以多設置一些線程池中線程的數量,這樣就能讓在等待IO的這段時間內,線程可以去做其它事,提高并發處理效率。那么這個線程池的數據量是不是可以隨便設置呢?當然不是的,請一定要記得,線程上下文切換是有代價的。目前總結了一套公式,對于IO密集型應用:
線程數 = CPU核心數/(1-阻塞系數) 這個阻塞系數一般為0.8~0.9之間,也可以取0.8或者0.9。
套用公式,對于雙核CPU來說,它比較理想的線程數就是20,當然這都不是絕對的,需要根據實際情況以及實際業務來調整:final int poolSize = (int)(cpuCore/(1-0.9))
針對于阻塞系數,《Programming Concurrency on the JVM Mastering》即《Java 虛擬機并發編程》中有提到一句話:
對于阻塞系數,我們可以先試著猜測,抑或采用一些細嫩分析工具或java.lang.management API 來確定線程花在系統/IO操作上的時間與CPU密集任務所耗的時間比值。
說到這一點,我們只需要謹記一點,一定不要選擇沒有上限限制的配置項。
這也是為什么不建議使用Executors 中創建線程的方法。
比如,Executors.newCachedThreadPool的設置與無界隊列的設置因為某些不可預期的情況,線程池會出現系統異常,導致線程暴增的情況或者任務隊列不斷膨脹,內存耗盡導致系統崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規范的首要原則! 小心無大錯,千萬別過度自信!
可以看下Executors中四個創建線程池的方法:
//使用無界隊列 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //線程池數量是無限的 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
其他的就不再列舉了,大家可以自行查閱源碼。
第二,合理設置線程數量、和線程空閑回收時間,根據具體的任務執行周期和時間去設定,避免頻繁的回收和創建,雖然我們使用線程池的目的是為了提升系統性能和吞吐量,但是也要考慮下系統的穩定性,不然出現不可預期問題會很麻煩!
第三,根據實際場景,選擇適用于自己的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!盡量采用自定義的拒絕策略去進行兜底!
第四,線程池拒絕策略,自定義拒絕策略可以實現RejectedExecutionHandler接口。
JDK自帶的拒絕策略如下:
AbortPolicy:直接拋出異常阻止系統正常工作。
CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。
DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。
DiscardPolicy:丟棄無法處理的任務,不給予任何處理。
利用Hook,留下線程池執行軌跡:
ThreadPoolExecutor提供了protected類型可以被覆蓋的鉤子方法,允許用戶在任務執行之前會執行之后做一些事情。我們可以通過它來實現比如初始化ThreadLocal、收集統計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務被執行完的時候讓用戶插入邏輯,如rerminated 。
如果hook方法執行失敗,則內部的工作線程的執行將會失敗或被中斷。
我們可以使用beforeExecute和afterExecute來記錄線程之前前和后的一些運行情況,也可以直接把運行完成后的狀態記錄到ELK等日志系統。
內容當線程池不在被引用并且工作線程數為0的時候,線程池將被終止。我們也可以調用shutdown來手動終止線程池。如果我們忘記調用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的!
當然,穩妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調用線程池的關閉方法!
線程池核心代碼:
public class AsyncProcessQueue { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * Task 包裝類<br> * 此類型的意義是記錄可能會被 Executor 吃掉的異常<br> */ public static class TaskWrapper implements Runnable { private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class); private final Runnable gift; public TaskWrapper(final Runnable target) { this.gift = target; } @Override public void run() { // 捕獲異常,避免在 Executor 里面被吞掉了 if (gift != null) { try { gift.run(); } catch (Exception e) { _LOGGER.error("Wrapped target execute exception.", e); } } } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 執行指定的任務 * * @param task * @return */ public static boolean execute(final Runnable task) { return AsyncProcessor.executeTask(new TaskWrapper(task)); } } public class AsyncProcessor { static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class); /** * 默認最大并發數<br> */ private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; /** * 線程池名稱格式 */ private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d"; /** * 線程工廠名稱 */ private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME) .daemon(true).build(); /** * 默認隊列大小 */ private static final int DEFAULT_SIZE = 500; /** * 默認線程存活時間 */ private static final long DEFAULT_KEEP_ALIVE = 60L; /**NewEntryServiceImpl.java:689 * Executor */ private static ExecutorService executor; /** * 執行隊列 */ private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); static { // 創建 Executor // 此處默認最大值改為處理器數量的 4 倍 try { executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, executeQueue, FACTORY); // 關閉事件的掛鉤 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { AsyncProcessor.LOGGER.info("AsyncProcessor shutting down."); executor.shutdown(); try { // 等待1秒執行關閉 if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout."); executor.shutdownNow(); } } catch (InterruptedException e) { AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted."); executor.shutdownNow(); } AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete."); } })); } catch (Exception e) { LOGGER.error("AsyncProcessor init error.", e); throw new ExceptionInInitializerError(e); } } /** * 此類型無法實例化 */ private AsyncProcessor() { } /** * 執行任務,不管是否成功<br> * 其實也就是包裝以后的 {@link Executer} 方法 * * @param task * @return */ public static boolean executeTask(Runnable task) { try { executor.execute(task); } catch (RejectedExecutionException e) { LOGGER.error("Task executing was rejected.", e); return false; } return true; } /** * 提交任務,并可以在稍后獲取其執行情況<br> * 當提交失敗時,會拋出 {@link } * * @param task * @return */ public static <T> Future<T> submitTask(Callable<T> task) { try { return executor.submit(task); } catch (RejectedExecutionException e) { LOGGER.error("Task executing was rejected.", e); throw new UnsupportedOperationException("Unable to submit the task, rejected.", e); } } }
使用方式:
AsyncProcessQueue.execute(new Runnable() { @Override public void run() { //do something } });
關于怎么在Java中自定義線程池就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。