Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

线程池 #26

Open
cnwutianhao opened this issue Mar 8, 2024 · 0 comments
Open

线程池 #26

cnwutianhao opened this issue Mar 8, 2024 · 0 comments
Labels

Comments

@cnwutianhao
Copy link
Owner

在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开一个新线程去执行,则这些线程的创建和销毁将消耗大量的资源;并且线程都是“各自为政”的,很难对其进行控制,更何况有一堆的线程在执行。这时就需要线程池来对线程进行管理。在 Java 中提供了 Executor 框架用于把任务的提交和执行解耦,任务的提交交给 Runnable 或者 Callable,而 Executor 框架用来处理任务。Executor 框架中最核心的成员就是 ThreadPoolExecutor,它是线程池的核心实现类。

一、ThreadPoolExecutor

可以通过 ThreadPoolExecutor 来创建一个线程池,ThreadPoolExecutor 类一共有4个构造方法。其中,拥有最多参数的构造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    ...
}

这些参数的作用如下所示:

  • corePoolSize:核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运行的线程数少于 corePoolSize,则创建新线程来处理任务;如果等于或者多于 corePoolSize,则不再创建。如果调用线程池的 prestartAllcoreThread 方法,线程池会提前创建并启动所有的核心线程来等待任务。

  • maximumPoolSize:线程池允许创建的最大线程数。如果任务队列满了并且线程数小于 maximumPoolSize 时,则线程池仍旧会创建新的线程来处理任务。

  • keepAliveTime:非核心线程闲置的超时时间。超过这个时间则回收。如果任务很多,并且每个任务的执行事件很短,则可以调大 keepAliveTime 来提高线程的利用率。另外,如果设置 allowCoreThreadTimeOut 属性为 true 时,keepAliveTime 也会应用到核心线程上。

  • TimeUnit:keepAliveTime 参数的时间单位。可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)等。

  • workQueue:任务队列。如果当前线程数大于 corePoolSize,则将任务添加到此任务队列中。该任务队列是 BlockingQueue 类型的,也就是阻塞队列。

  • ThreadFactory:线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置该参数。

  • RejectedExecutionHandler:饱和策略。这是当任务队列和线程池都满了时所采取的应对策略,默认是 AbordPolicy,表示无法处理新任务,并抛出 RejectedExecutionException 异常。此外还有3种策略,它们分别如下:

    • CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    • DiscardPolicy:不能执行的任务,并将该任务删除。
    • DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

二、线程池的处理流程和原理

当提交一个新的任务到线程池时,线程池的处理流程如图1所示:

图1 线程池的处理流程

从图1可以得知线程的处理流程主要分为3个步骤,如下所示:

  1. 提交任务后,线程池先判断线程数是否达到了核心线程数(corePoolSize)。如果未达到核心线程数,则创建核心线程处理任务;否则,就执行下一步操作。

  2. 接着线程池判断任务队列是否满了。如果没满,则将任务添加到任务队列中;否则,就执行下一步操作。

  3. 接着因为任务队列满了,线程池就判断线程数是否达到了最大线程数。如果未达到,则创建非核心线程处理任务;否则,就执行饱和策略,默认会抛出 RejectedExecutionException 异常。

上面介绍了线程池的处理流程,但还不是很直观。下面结合图2,我们就能更好地了解线程池的原理了:

图2 线程池执行示意图

从图2中可以看到,如果我们执行 ThreadPoolExecutor 的 execute 方法,会遇到各种情况:

  1. 如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。

  2. 如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。

  3. 如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。

  4. 如果线程数超过了最大线程数,则执行饱和策略。

三、线程池的种类

通过直接或者间接地配置 ThreadPoolExecutor 的参数可以创建不同类型的 ThreadPoolExecutor,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor 和 ScheduledThreadPool。

下面分别介绍这4种线程池:

  1. FixedThreadPool

    FixedThreadPool 是可重用固定线程数的线程池。在 Executors 类中提供了创建 FixedThreadPool 的方法,它的创建源码如下所示:

     public static ExecutorService newFixedThreadPool(int nThreads) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue<Runnable>());
     }

    FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都设置为创建 FixedThreadPool 指定的参数 nThreads,也就意味着 FixedThreadPool 只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime 设置为 0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以 keepAliveTime 是无效的参数。另外,任务队列采用了无界的阻塞队列 LinkedBlockingQueue。

    FixedThreadPool 的 execute 方法的执行示意图如图3所示:

    图3 FixedThreadPool 的执行示意图

    从图3中可以看出,当执行 execute 方法时,如果当前运行的线程未达到 corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到 LinkedBlockingQueue 中。FixedThreadPool 就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过 corePoolSize 时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行。

  2. CachedThreadPool

    CachedThreadPool 是一个根据需要创建线程的线程池,它的创建源码如下所示:

     public static ExecutorService newCachedThreadPool() {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                       60L, TimeUnit.SECONDS,
                                       new SynchronousQueue<Runnable>());
     }

    CachedThreadPool 的 corePoolSize 为 0,maximumPoolSize 设置为 Integer.MAX_VALUE,这意味着 CachedThreadPool 没有核心线程,非核心线程是无界的。keepAliveTime 设置为 60L,则空闲线程等待新任务的最长时间为 60 秒。在此用了阻塞队列 SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。

    CachedThreadPool 的 execute 方法的执行示意图如图4所示:

    图4 CachedThreadPool 的执行示意图

    当执行 execute 方法时,首先会执行 SynchronousQueue 的 offer 方法来提交任务,并且查询线程池中是否有空闲的线程执行 SynchronousQueue 的 poll 方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行 SynchronousQueue 的 poll 方法,等待 SynchronousQueue 中新提交的任务。如果超过 60 秒没有新任务提交到 SynchronousQueue,则这个空闲线程将终止。因为 maximumPoolSize 是无界的,所以如果提交的任务大于线程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThreadPool 比较适于大量的需要立即处理并且耗时较少的任务。

  3. SingleThreadExecutor

    SingleThreadExecutor 是使用单个工作线程的线程池,它的创建源码如下所示:

     public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
             (new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>()));
     }

    corePoolSize 和 maximumPoolSize 都为 1,意味着 SingleThreadExecutor 只有一个核心线程。keepAliveTime 设置为 0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以 keepAliveTime 是无效的参数。另外,任务队列采用了无界的阻塞队列 LinkedBlockingQueue。

    SingleThreadExecutor的execute方法的执行示意图如图5所示:

    图5 SingleThreadExecutor 的执行示意图

  4. ScheduledThreadPool

    ScheduledThreadPool 是一个能实现定时和周期性任务的线程池,它的创建源码如下所示:

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
         return new ScheduledThreadPoolExecutor(corePoolSize);
     }

    这里创建了 ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,它主要用于给定延时之后的运行任务或者定期处理任务。ScheduledThreadPoolExecutor 的构造方法如下所示:

     public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE,
               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
               new DelayedWorkQueue());
     }

    从上面的代码可以看出,ScheduledThreadPoolExecutor 的构造方法最终调用的是 ThreadPoolExecutor 的构造方法。corePoolSize 是传进来的固定数值,maximumPoolSize 的值是 Integer.MAX_VALUE。因为采用的 DelayedWorkQueue 是无界的,所以 maximumPoolSize 这个参数是无效的。

    ScheduledThreadPoolExecutor 的 execute 方法的执行示意图如图6所示:

    图6 ScheduledThreadPoolExecutor 的执行示意图

    当执行 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay 方法时,会向 DelayedWorkQueue 添加一个实现 RunnableScheduledFuture 接口的 ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到 corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去 DelayedWorkQueue 中取 ScheduledFutureTask,然后去执行任务。如果运行的线程达到了 corePoolSize 时,则将任务添加到 DelayedWorkQueue 中。DelayedWorkQueue 会将任务进行排序,先要执行的任务放在队列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将 ScheduledFutureTask 中的 time 变量改为下次要执行的时间并放回到 DelayedWorkQueue 中。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant