# Java线程池总结

# 为什么使用线程池

# 手动创建线程的危害

  • 第一点,反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大。
  • 第二点,过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定。

# 使用线程池的好处

  • 第一点,线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。
  • 第二点,线程池可以统筹内存和 CPU 的使用,避免资源使用不当。
  • 第三点,线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务。

# Executor

随着当今处理器中可用的核心数量的增加, 随着对实现更高吞吐量的需求的不断增长,多线程 API 变得非常流行。 Java 提供了自己的多线程框架,称为Executor 框架。

Executor 框架包含一组用于有效管理工作线程的组件。Executor API 通过 Executors 将任务的执行与要执行的实际任务解耦。 这是 生产者-消费者 (opens new window) 模式的一种实现。

java.util.concurrent.Executors 提供了用于创建工作线程的线程池的工厂方法。

为了使用 Executor 框架,我们需要创建一个线程池并提交任务给它以供执行。Executor 框架的工作是调度和执行已提交的任务并从线程池中拿到返回的结果。

所有的危害因素都会导致系统的吞吐量下降。线程池通过保持线程一直存活并重用这些线程来克服这个问题。当提交到线程池中的任务多于正在执行的线程时,那些多余的任务将被放到队列中。 一旦执行任务的线程有空闲的了,它们会从队列中取下一个任务来执行。对于 JDK 提供的现成的 executors 此任务队列基本是无界的。

# Executor线程池类型

我们可以直接使用Executor给我们提供的线程池,已针对不同场景进行设置了。

# SingleThreadExecutor

此线程池 executor 只有一个线程。它用于以顺序方式的形式执行任务。如果此线程在执行任务时因异常而挂掉,则会创建一个新线程来替换此线程,后续任务将在新线程中执行。

ExecutorService executorService = Executors.newSingleThreadExecutor()
1

# FixedThreadPool(n)

顾名思义,它是一个拥有固定数量线程的线程池。提交给 executor 的任务由固定的 n 个线程执行,如果有更多的任务,它们存储在 LinkedBlockingQueue 里。这个数字 n 通常跟底层处理器支持的线程总数有关。

ExecutorService executorService = Executors.newFixedThreadPool(4);
1

# CachedThreadPool

该线程池主要用于执行大量短期并行任务的场景。与固定线程池不同,此线程池的线程数不受限制。如果所有的线程都在忙于执行任务并且又有新的任务到来了,这个线程池将创建一个新的线程并将其提交到 executor。只要其中一个线程变为空闲,它就会执行新的任务。 如果一个线程有 60 秒的时间都是空闲的,它们将被结束生命周期并从缓存中删除。

但是,如果管理得不合理,或者任务不是很短的,则线程池将包含大量的活动线程。这可能导致资源紊乱并因此导致性能下降。

ExecutorService executorService = Executors.newCachedThreadPool();
1

# ScheduledExecutor

当我们有一个需要定期运行的任务或者我们希望延迟某个任务时,就会使用此类型的 executor。

ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);
1

可以使用 scheduleAtFixedRatescheduleWithFixedDelayScheduledExecutor 中定期的执行任务。

scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
1
2

这两种方法的主要区别在于它们对连续执行定期任务之间的延迟的应答。

scheduleAtFixedRate:无论前一个任务何时结束,都以固定间隔执行任务。

scheduleWithFixedDelay:只有在当前任务完成后才会启动延迟倒计时。

# Future 对象

可以使用 executor 返回的 java.util.concurrent.Future 对象访问提交给 executor 的任务的结果。 Future 可以被认为是 executor 对调用者的响应。

Future<String> result = executorService.submit(callableTask);
1

如上所述,提交给 executor 的任务是异步的,即程序不会等待当前任务执行完成,而是直接进入下一步。相反,每当任务执行完成时,executor 在此 Future对象中设置它。

调用者可以继续执行主程序,当需要提交任务的结果时,他可以在这个 Future对象上调用.get() 方法来获取。如果任务完成,结果将立即返回给调用者,否则调用者将被阻塞,直到 executor 完成此操作的执行并计算出结果。

如果调用者不能无限期地等待任务执行的结果,那么这个等待时间也可以设置为定时地。可以通过 Future.get(long timeout,TimeUnit unit) 方法实现,如果在规定的时间范围内没有返回结果,则抛出 TimeoutException。调用者可以处理此异常并继续执行该程序。

如果在执行任务时出现异常,则对 get 方法的调用将抛出一个ExecutionException

对于 Future.get()方法返回的结果,一个重要的事情是,只有提交的任务实现了java.util.concurrent.Callable接口时才返回 Future。如果任务实现了Runnable接口,那么一旦任务完成,对 .get() 方法的调用将返回 null

另一个关注点是 Future.cancel(boolean mayInterruptIfRunning) 方法。此方法用于取消已提交任务的执行。如果任务已在执行,则 executor 将尝试在mayInterruptIfRunning 标志为 true 时中断任务执行。

# 创建并执行一个Executor

我们现在将创建一个任务并尝试在 fixed pool executor 中执行它:

public class Task implements Callable<String> {

    private String message;

    public Task(String message) {
        this.message = message;
    }

    @Override
    public String call() throws Exception {
        return "Hello " + message + "!";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

Task 类实现 Callable 接口并有一个 String 类型作为返回值的方法。 这个方法也可以抛出 Exception。这种向 executor 抛出异常的能力以及 executor 将此异常返回给调用者的能力非常重要,因为它有助于调用者知道任务执行的状态。

现在让我们来执行一下这个任务:

public class ExecutorExample {  
    public static void main(String[] args) {

        Task task = new Task("World");

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> result = executorService.submit(task);

        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Error occured while executing the submitted task");
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

我们创建了一个具有4个线程数的 FixedThreadPool executors,因为这个 demo是在四核处理器上开发的。如果正在执行的任务执行大量 I/O 操作或花费较长时间等待外部资源,则线程数可能超过处理器的核心数。

我们实例化了 Task 类,并将它提交给 executors 执行。 结果由 Future 对象返回,然后我们在屏幕上打印。

让我们运行 ExecutorExample 并查看其输出:

Hello World!
1

正如所料,任务追加了问候语 Hello 并通过 Future object 返回结果。

最后,我们调用 executorService 对象上的 shutdown 来终止所有线程并将资源返回给 OS。

.shutdown() 方法等待 executor 完成当前提交的任务。 但是,如果要求是立即关闭 executor 而不等待,那么我们可以使用 .shutdownNow() 方法。

任何待执行的任务都将结果返回到 java.util.List 对象中。

我们也可以通过实现 Runnable 接口来创建同样的任务:

public class Task implements Runnable{

    private String message;

    public Task(String message) {
        this.message = message;
    }

    public void run() {
        System.out.println("Hello " + message + "!");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

当我们实现 Runnable 时,这里有一些重要的变化。

  1. 无法从 run() 方法得到任务执行的结果。 因此,我们直接在这里打印。
  2. run() 方法不可抛出任何已受检的异常。

# ThreadPoolExecutor

除了使用Executor给我们提供的线程池,我们还可以自定义线程池。

Executors创建线程池本质上也是使用ThreadPollExecutor,只是某些参数是固定的,无法设置。

private ExecutorService threadPool = new ThreadPoolExecutor(
    4,                  // int corePoolSize,    核心线程数,一旦创建将不会再释放。
    4,                  // int maximumPoolSize, 最大线程数,允许创建的最大线程数。
    60L,                // long keepAliveTime,  非核心线程空闲时,允许保存的最大时间,过期以后线程将被释放销毁。
    TimeUnit.SECONDS,   // TimeUnit unit,       时间单位,TimeUnit.SECONDS等  
    new LinkedBlockingQueue<Runnable>(1024),    // BlockingQueue<Runnable> workQueue,   任务队列,存储暂时无法执行的线程任务,等待空闲线程来执行。此处只可以是BlockingQueue<Runnable>,所以不能使用ConcurrentLinkedQueue<Runnable>
    Executors.defaultThreadFactory(),           // ThreadFactory threadFactory, 线程工厂,用于创建线程。
    new ThreadPoolExecutor.CallerRunsPolicy() // RejectedExecutionHandler handler 当线程边界和队列容量达到最大时,用于处理阻塞时的程序。
);
1
2
3
4
5
6
7
8
9

《阿里巴巴Java开发手册》强制要求:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。

Executors 返回的线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

  • CachedThreadPool 和 ScheduledThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

线程池

上次更新时间: 2024年2月14日星期三上午10点24分