线程池是一种管理线程的机制,通过预先创建一定数量的线程,可以在需要时重复使用这些线程,从而避免频繁创建和销毁线程带来的性能开销。
使用线程池的优点包括:
Java 提供了 java.util.concurrent 包中的 Executors 工具类来创建各种类型的线程池。常见的方法包括:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
可以通过 execute 方法或 submit 方法将任务提交到线程池。
fixedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println("Task executed"); } });
Future> future = fixedThreadPool.submit(new Callable() { @Override public String call() throws Exception { return "Task completed"; } }); try { String result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
Callable 是一个类似于 Runnable 的接口,但它可以返回一个结果或抛出异常。Future 表示一个异步计算的结果,可以用来获取 Callable 的返回值或检查任务是否完成。
Callable callableTask = new Callable() { @Override public String call() throws Exception { return "Task result"; } }; Future future = fixedThreadPool.submit(callableTask); try { String result = future.get(); // 阻塞等待任务完成并获取结果 System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
可以使用 shutdown 或 shutdownNow 方法来关闭线程池。
停止接受新任务,并让已提交的任务执行完毕。
fixedThreadPool.shutdown();
尝试停止所有正在执行的任务,并返回等待执行的任务列表。
List notExecutedTasks = fixedThreadPool.shutdownNow();
try { if (!fixedThreadPool.awaitTermination(60, TimeUnit.SECONDS)) { fixedThreadPool.shutdownNow(); } } catch (InterruptedException e) { fixedThreadPool.shutdownNow(); }
ThreadPoolExecutor 是 Java 线程池的核心实现类,它的构造函数包含以下参数:
5, // corePoolSize 10, // maximumPoolSize 60, // keepAliveTime TimeUnit.SECONDS, // unit new LinkedBlockingQueue(), // workQueue Executors.defaultThreadFactory(), // threadFactory new ThreadPoolExecutor.AbortPolicy() // handler );
当线程池无法接受新任务时,会使用拒绝策略来处理这些任务。ThreadPoolExecutor 提供了以下几种拒绝策略:
默认策略,抛出 RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy();
由调用线程执行任务
new ThreadPoolExecutor.CallerRunsPolicy();
直接丢弃任务,不抛出异常。
new ThreadPoolExecutor.DiscardPolicy();
丢弃队列中最旧的任务,然后尝试重新提交任务。
new ThreadPoolExecutor.DiscardOldestPolicy();
ForkJoinPool 是 Java 7 引入的一种特殊的线程池,设计用于处理可以递归拆分成更小任务的并行计算。它基于工作窃取算法,适合处理大规模并行任务。
ForkJoinPool 与 ThreadPoolExecutor 的主要区别在于:
import java.util.concurrent.RecursiveTask; import java.util.concurrent.ForkJoinPool; class SumTask extends RecursiveTask { private final int[] array; private final int start, end; public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Integer compute() { if (end - start <= 10) { int sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end); leftTask.fork(); return rightTask.compute() + leftTask.join(); } } } public class ForkJoinExample { public static void main(String[] args) { int[] array = new int[100]; for (int i = 0; i < array.length; i++) { array[i] = i + 1; } ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(array, 0, array.length); int result = pool.invoke(task); System.out.println("Sum: " + result); } }
在 ThreadPoolExecutor 中,当线程池和队列都满时,可以自定义拒绝策略来处理新提交的任务。可以通过实现 RejectedExecutionHandler 接口来自定义拒绝策略。
import java.util.concurrent.*; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 自定义拒绝策略,例如记录日志或将任务放入另一个队列 System.out.println("Task " + r.toString() + " rejected from " + executor.toString()); } public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new CustomRejectedExecutionHandler() ); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is executing task"); }); } executor.shutdown(); } }
可以通过继承 ThreadPoolExecutor 类来实现一个自定义的线程池,并重写其方法以添加自定义行为。
import java.util.concurrent.*; public class CustomThreadPoolExecutor extends ThreadPoolExecutor { public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); System.out.println("Before executing task: " + r.toString()); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("After executing task: " + r.toString()); } @Override protected void terminated() { super.terminated(); System.out.println("Thread pool terminated"); } public static void main(String[] args) { CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor( 2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2) ); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is executing task"); }); } executor.shutdown(); } }
可以通过定期调整 ThreadPoolExecutor 的核心参数来实现动态调整线程池的功能。例如,可以使用 ScheduledExecutorService 定期检查任务队列的长度,并根据需要调整核心线程数和最大线程数。
import java.util.concurrent.*; public class DynamicThreadPool { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2) ); ScheduledExecutorService adjuster = Executors.newScheduledThreadPool(1); adjuster.scheduleAtFixedRate(() -> { int queueSize = executor.getQueue().size(); if (queueSize > 2) { executor.setCorePoolSize(Math.min(executor.getCorePoolSize() + 1, 10)); executor.setMaximumPoolSize(Math.min(executor.getMaximumPoolSize() + 1, 20)); } else if (queueSize == 0) { executor.setCorePoolSize(Math.max(executor.getCorePoolSize() - 1, 2)); executor.setMaximumPoolSize(Math.max(executor.getMaximumPoolSize() - 1, 4)); } System.out.println("Adjusted Pool Size: " + executor.getCorePoolSize() + ", " + executor.getMaximumPoolSize()); }, 0, 1, TimeUnit.SECONDS); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is executing task"); }); } executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } adjuster.shutdown(); } }
可以使用 PriorityBlockingQueue 来实现一个具有优先级的线程池。任务需要实现 Comparable 接口,以定义任务的优先级。
import java.util.concurrent.*; public class PriorityThreadPool { static class PriorityTask implements Runnable, Comparable { private final int priority; private final String name; public PriorityTask(int priority, String name) { this.priority = priority; this.name = name; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " is executing task: " + name); } @Override public int compareTo(PriorityTask o) { return Integer.compare(o.priority, this.priority); } @Override public String toString() { return name + "(priority=" + priority + ")"; } } public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60, TimeUnit.SECONDS, new PriorityBlockingQueue<>() ); for (int i = 0; i < 10; i++) { int priority = i % 3; executor.execute(new PriorityTask(priority, "Task-" + i)); } executor.shutdown(); } }
可以使用 ScheduledThreadPoolExecutor 来实现一个支持超时任务的线程池。通过 schedule 方法,可以提交一个带有超时功能的任务。
import java.util.concurrent.*; public class TimeoutThreadPool { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4); for (int i = 0; i < 10; i++) { ScheduledFuture> future = executor.schedule(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is executing task"); }, 1, TimeUnit.SECONDS); executor.schedule(() -> { if (!future.isDone()) { future.cancel(true); System.out.println("Task timed out and was cancelled"); } }, 3, TimeUnit.SECONDS); } executor.shutdown(); } }