BlockingQueue
是 Java 并發編程中用于在生產者和消費者線程之間傳遞數據的一種阻塞隊列。它可以用于實現線程池,以便在有限的線程資源下高效地處理任務。下面是一個簡單的線程池實現,使用 BlockingQueue
作為任務隊列:
import java.util.concurrent.*;
public class BlockingQueueThreadPool {
private final int corePoolSize;
private final BlockingQueue<Runnable> taskQueue;
private final ThreadPoolExecutor threadPoolExecutor;
public BlockingQueueThreadPool(int corePoolSize, int maxPoolSize, int queueCapacity) {
this.corePoolSize = corePoolSize;
this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
this.threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
taskQueue
);
}
public void submit(Runnable task) throws InterruptedException {
if (threadPoolExecutor.getCorePoolSize()< corePoolSize) {
synchronized (this) {
if (threadPoolExecutor.getCorePoolSize()< corePoolSize) {
threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
threadPoolExecutor.execute(task);
}
public void shutdown() {
threadPoolExecutor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueThreadPool threadPool = new BlockingQueueThreadPool(2, 4, 10);
for (int i = 0; i < 20; i++) {
final int taskId = i;
threadPool.submit(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
}
在這個實現中,我們創建了一個名為 BlockingQueueThreadPool
的類,它包含一個核心線程池大小、一個最大線程池大小和一個任務隊列容量。我們使用 ThreadPoolExecutor
來管理線程池,并將 LinkedBlockingQueue
作為任務隊列。
submit()
方法用于向線程池提交任務。在提交任務之前,我們會檢查當前核心線程池的大小是否小于預期的核心線程池大小。如果是,則將核心線程池大小設置為預期值。這樣可以確保在任務提交時,線程池中始終有足夠的線程來處理任務。
shutdown()
方法用于關閉線程池。在關閉線程池之前,所有已提交的任務都將被執行完畢。