JUC源码详细分析之Part4——《线程池》

本系列笔记笔者将会详细的讲解JUC中核心类的源码,其中基于的JDK版本为1.8.0_291。由于JUC内容很多,因此笔者将笔记拆为四部分:

另外,本系列笔记部分内容参考自《深入浅出 Java Concurrency》

5. 线程池

在讲解线程池前先简单说下线程,我们都知道线程是由操作系统创建的,代表了任务调度的最小颗粒度。Java下使用Thread类代表线程,一个Thread对象就代表一个操作系统所创建线程的引用。Java下创建一个Thread对象与操作系统创建一个线程并不相等,当我们执行new Tnread()时只是创建了一个Thread对象,此时OS并未创建真实线程,但是当我们执行thread.start()时,操作系统会创建并启动一个线程。但是操作系统创建和启动线程属于系统调用,往往系统调用的开销比较大,如果我们频繁的创建线程和销毁线程肯定会降低系统的吞吐量,此时就需要将线程的生命周期与任务的执行解耦,引入线程池来管理线程,避免重复创建/销毁线程时的系统调用开销。

5.1 线程池体系简介

JUC下线程池体系如下:

image-20220715144644253

  1. Executor接口,最顶层接口,只有一个方法void execute(Runable command),提供了任务提交的基本方法
  2. ExecutorService接口,继承了 Executor 接口,最主要的就是获取异步任务执行结果和线程池销毁等方法
  3. ScheduledExecutorService 增加了定时任务,周期执行任务的功能
  4. AbstractExecutorService 默认实现了ExecutorServicesubmit()方法并且提供 newTaskFor() 方法将RunnableCallable转换为RunnableFuture,以便提交给 Executor 执行
  5. ThreadPoolExecutor 默认的线程池实现类
  6. ScheduledThreadPoolExecutor 默认的带有定时/周期执行任务的线程池实现类

线程池的创建是比较复杂的,为解决这一情况,JDK提供了Executors类,用于快速的创建线程池。

Executors类下常用API如下:

  • Executors.newCachedThreadPool() 无界线程池,可以进行自动线程回收
  • Executors.newFixedThreadPool(int) 固定大小的线程池
  • Executors.newSingleThreadExecutor() 单个线程的线程池
  • Executors.newScheduledThreadPool(int) 执行定时任务的线程池
  • Executors.newWorkStealingPool(int) 支持并行执行的线程池

但是阿里巴巴Java开发手册禁止使用使用Executors创建线程池,其描述如下:

【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPoolSingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPoolScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

第一个不显示创建线程的问题我们已经说过了;第二个不能用Executors去创建线程池,其与线程池创建时的参数有关,我们下面看一下ThreadPoolExecutor 创建的参数。

5.2 ThreadPoolExecutor参数和工作原理

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
  • corePoolSize:核心线程池的数量。为保证线程可以被复用,我们往往在线程执行完成后不销毁而是暂存起来。但暂存线程也是有内存开销的,因此不能暂存过多,其中corePoolSize是核心线程池的数量,也就是线程池暂存而不会销毁的线程数量。
  • BlockingQueue:任务队列。当核心线程都被用尽的时候,再提交任务的任务会先扔进队列里。这里的队列是阻塞队列。
  • maximumPoolSize:最大线程池数量。当队列也堆积满了,任务实在太多处理不过来的时候,会选择临时创建一些线程来执行新提交的任务,但这些线程也不能过多,总的最大线程数量不能超过maximumPoolSize。
  • keepAliveTime:线程的活跃时间。临时创建的线程执行完任务后也并非立马就销毁,有时如果任务比较多,销毁再建很费时,因此当这些临时线程闲置了keepAliveTime时间还没接到活后就被销毁了。
  • ThreadFactory:线程的构造工厂
  • RejectedExecutionHandler:拒绝策略。当任务多到新开的临时线程都大于maximumPoolSize后,还有任务提交过来,此时就得拒绝任务了,这里可以选择一个拒绝策略来拒绝。

因此整个线程池的工作模式如下:

img

我们可以以一个例子来形象的说明这一工作模式:

你现在有一家服装制造公司,公司里有一批员工,这批员工常年在公司工作,无论公司有活没活他们都在公司上班,这批员工总数大小就是corePoolSize。我们不妨叫他们核心员工。

当公司有活且核心员工还有人闲着的时候,我们就直接选择一个核心员工来干这个活。

但如果公司生意比较好,每个人都很忙,再有新活到来的时候,我们往往会选择将订单压一压,先存起来,等手里这批活干完再干存起来的这批。存活的容器就是BlockingQueue

但如果你公司生意真的非常好,存的活太多都存不下了,这时你就想着先招一批临时工应应急。但临时工又不能超太多,多了公司也负担不起,因此临时工+正式工数量不能超过maximumPoolSize就好。如果订单已经存满了,再来新的订单就让那些临时工干吧。

临时工干完活后最好不要轻易辞退,万一辞了又有活来还得再招人,挺麻烦的。但临时工又不能一直这样闲养着,要是一段时间他们都没活了就让他们滚蛋吧,这个时间间隔就是keepAliveTime

假设你的厂今年生意异常的好,临时工都招满了,但还是有新订单过来,你一寻思,再来订单一没地方搁二没人接,为了不得罪人家,索性直接早点拒绝吧,因此在这种情况下,你选择了拒绝新来的任务,而这就是RejectedExecutionHandler

不同的是,在上例中我们将工人划分为了核心员工和临时工,在线程池中,线程没有核心与临时的区分,只要线程池数小于等于corePoolSize,都不会被释放,而如果大于corePoolSize,每个线程都有可能被释放,一视同仁。

关于这几个参数更详细的解释可以见ThreadPoolExecutor类的源码注释:

  • 核心和最大池大小:

    ThreadPoolExecutor将根据核心线程池大小(参见getCorePoolSize)和最大线程池大小(参见getMaximumPoolSize)的设定自动调整线程池大小(参见getPoolSize)。当通过execute(Runnable)方法提交一个新任务时,如果运行的线程数少于核心线程池大小,将创建一个新线程来处理该请求,即使其他工作线程处于空闲状态。如果运行的线程数超过核心线程池大小但小于最大线程池大小,只有当队列已满时才会创建新线程。通过将核心线程池大小和最大线程池大小设置为相同的值,可以创建一个固定大小的线程池。通过将最大线程池大小设置为类似Integer.MAX_VALUE这样的近乎无限的值,可以允许线程池容纳任意数量的并发任务。通常情况下,核心线程池大小和最大线程池大小只在构造时设置一次,但也可以使用setCorePoolSizesetMaximumPoolSize动态地进行更改。

  • 创建新线程:

    新线程是使用ThreadFactory创建的。如果没有指定其他线程工厂,将使用Executors.defaultThreadFactory,它会创建位于相同ThreadGroup中,具有相同NORM_PRIORITY优先级和非守护状态的线程。通过提供不同的ThreadFactory,可以修改线程的名称、线程组、优先级、守护状态等。如果ThreadFactorynewThread中返回null时无法创建线程,执行器将继续运行,但可能无法执行任何任务。线程应该具有"modifyThread" RuntimePermission。如果使用线程池的工作线程或其他线程不具备此权限,服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持在可以终止但未完成的状态。

  • 保活时间:

    如果线程池当前的线程数超过核心线程池大小,那么多余的线程将在空闲时间超过keepAliveTime时终止(参见getKeepAliveTime(TimeUnit))。这提供了在池不活跃时减少资源消耗的方法。如果池后来变得更活跃,将会创建新线程。可以使用setKeepAliveTime(long, TimeUnit)方法动态地更改此参数。使用Long.MAX_VALUE TimeUnit.NANOSECONDS的值可以有效地禁用空闲线程在关闭之前终止。默认情况下,保持活动策略仅适用于运行线程数超过核心线程池大小的情况。但是,方法allowCoreThreadTimeOut(boolean)可以用于将此超时策略应用于核心线程,只要keepAliveTime值不为零即可。

  • 排队:

    可以使用任何BlockingQueue来传输和保存提交的任务。使用这个队列会影响池的大小: 如果运行的线程数少于核心线程池大小,Executor总是更喜欢添加新线程而不是排队。 如果运行的线程数等于或超过核心线程池大小,Executor总是更喜欢将请求排队而不是添加新线程。 如果无法将请求排队,将创建一个新线程,除非这会超过最大线程池大小,否则任务将被拒绝。 有三种常见的排队策略:

    1. 直接移交。对于工作队列来说,一个很好的默认选择是SynchronousQueue,它会将任务直接移交给线程,而不会保存它们。如果没有线程立即可用于运行任务,尝试排队任务将失败,因此将创建一个新线程。这种策略避免了处理可能具有内部依赖关系的请求集时出现死锁。直接移交通常需要无界的maximumPoolSize来避免拒绝新提交的任务。这反过来可能导致线程无限增长,当命令平均到达速度比它们被处理得更快时,这种情况会出现。
    2. 无界队列。使用无界队列(例如没有预定义容量的LinkedBlockingQueue)将导致新任务在所有corePoolSize线程都忙碌时等待在队列中。因此,最多只会创建corePoolSize个线程。(maximumPoolSize的值因此没有任何影响。)这种策略可能适用于每个任务完全独立于其他任务的情况,因此任务之间不会影响彼此的执行;例如,在一个网页服务器中。虽然这种排队方式可以平滑处理短暂的请求爆发,但是当命令的到达速度平均比它们能被处理的速度更快时,会导致无界的工作队列增长的可能性。
    3. 有界队列。使用有界队列(例如,ArrayBlockingQueue)可以帮助在使用有限的maximumPoolSizes时防止资源耗尽,但调整和控制可能会更加困难。队列大小和最大池大小可以相互权衡:使用大队列和小池可以最小化CPU使用率、操作系统资源和上下文切换开销,但可能会导致人为地降低吞吐量。如果任务经常被阻塞(例如,如果它们受到I/O限制)。使用小队列通常需要更大的池大小,这使得CPU更忙碌,但可能会遇到不可接受的调度开销,从而降低吞吐量。
  • 拒绝策略:

    Executor 关闭时,以及 Executor 对最大线程和工作队列容量都使用有限的界限,并且已经饱和时,在方法execute(Runnable)中提交的新任务将被拒绝。无论哪种情况, execute方法都会调用其RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。提供了四个预定义的处理程序策略:

    1. 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException
    2. ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
    3. ThreadPoolExecutor.DiscardPolicy中,无法执行的任务被简单地丢弃。
    4. ThreadPoolExecutor.DiscardOldestPolicy中,如果 executor 没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能再次失败,导致重复此操作。)

    可以自定义其他类型的RejectedExecutionHandler类。但这样做需要小心谨慎,尤其是当策略设计为仅在特定容量或排队策略下工作时。

  • 挂钩方法:
    此类提供protectedbeforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)方法,这些方法在执行每个任务之前和之后调用。这些可用于操纵执行环境;例如,重新初始化 ThreadLocals、收集统计信息或添加日志条目。此外,可以重写方法terminated以执行任何特殊处理,一旦 Executor 完全终止需要完成。
    如果钩子或回调方法抛出异常,内部工作线程可能会依次失败并突然终止。

5.3 ThreadPoolExecutor源码

ThreadPoolExecutor

ThreadPoolExecutor的主要属性如上图所示,其中WorkerThreadPoolExecutor的内部类,一个Worker对象会绑定一个Thread对象,而ThreadPoolExecutor又会持有Worker对象集合 workers,这也就代表了ThreadPoolExecutor对象持有一个线程集合。

另外poolSize用于记录当前线程池正存在的线程数量,runState是线程池的生命周期,线程池的生命周期可以分为运行(RUNNING)、关闭(SHUTDOWN)、、停止(STOP)、整理(TIDYING)和结束(TERMINATED)五个状态,他们分为等于-1 << Integer.SIZE-30 << Integer.SIZE-31 << Integer.SIZE-32 << Integer.SIZE-33 << Integer.SIZE-3,因此RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

现在就让我们分析下当调用ThreadPoolExecutor.submit(Runnable task)时发生了什么

5.3.1 submit()

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    //将任务封装为RunnableFuture对象,其实是FutureTask对象,执行结果将放入这个对象里
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //执行任务
    execute(ftask);
    return ftask;
}
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获得当前ctl值,ctl是一个原子属性,一个int变量分高低位代表线程池状态和当前线程池内的线程数量
    int c = ctl.get();
    //如果当前线程池数量小于corePoolSize 就创建一个线程来执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //否则(代表当前线程数大于corePoolSize)如果任务入队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        //入队成功依然要检查线程数量,因为有可能入队期间线程池关闭了
        //如果线程池关闭了就从队列里删除任务并拒绝
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //否则判断当前线程数量是否等于0,如果等于0则创建一个新线程允许任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //否则(代表线程数大于corePoolSize且入队列失败),则启用一个新的线程来执行任务
    else if (!addWorker(command, false))
        //如果这也失败了,代表队列也满了,线程也满了,那就启用拒绝策略
        reject(command);
}

这里需要解释的是ctl属性,ctl是一个AtomicInteger,我们在这里可以简单将其理解为一个int类型,我们知道一个int有32位,其中ctl的高3位用于存储当前线程池的状态(由于线程池有5个状态,所以至少要3个bit才能表示),剩下的低29位用于记录当前线程池的线程数量。因此ThreadPoolExecutor最大支持的线程数量不是Integer.MAX,而是2^29-1,大概是5亿多个。

其中ctl.get()调用的是AtomicInteger.get()获得当前原子类型对应的int值,runStateOf()是取int的高3位得到当先线程池的运行状态,isRunning()是判断运行状态是否等于RUNNING,而workerCountOf()是取int的低29位,获得当前线程池的线程数量。

可以看到上面的逻辑与我们说的线程池的工作原理基本一致。

//创建新的工作线程,如果创建成功返回true,否则返回false,其中core代表是否用core线程运行任务
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //取高3位得到运行状态
        int rs = runStateOf(c);
        
        //a && !(b && c && !d) = a && (!b || !c || d) = a && !b || a && !c || a && d
        //因此这句话是说,如果当前线程池状态不是RUNNING且也不是SHUTDOWN直接返回false(其他状态下不执行任何任务,包括已有的)
        //或者当前线程池状态不是RUNNING且传入的任务不为空,直接返回false(非RUNNING状态不接收新任务,SHUTDOWN会执行完已有的)
        //或者当前线程池状态不是RUNNING且任务队列为空,直接返回false(没有待运行任务,且状态不是RUNNING就不必创建新的线程)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //获得线程数量
            int wc = workerCountOf(c);
            //如果大于2^29-1
            //或者用核心线程的情况下但已有数量大于核心线程(核心线程都跑满了,无法再建一个为它服务)
            //或者不用工作线程,但线程总数大于maximumPoolSize了
            //都返回false,不建新的线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //走到这代表可以建线程,因为线程池状态既是ok的,线程数量也是允许的
            //通过CAS操作将线程数量+1,如果成功跳出双层for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //否则如果线程池状态改了,继续双层for
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    
    //走到这代表CAS操作已经成功,计数上的线程数量已经+1了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个worker,worker会绑定一个线程
        w = new Worker(firstTask);
        //拿到绑定的线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //获得当前允许状态
                int rs = runStateOf(ctl.get());
                //在获得 mainLock 的锁定期间,再次检查线程池的运行状态,并确保线程池没有被关闭
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将新创建的worker加入到workers集合
                    workers.add(w);
                    int s = workers.size();
                    //统计操作,如果当前线程数量大于历史最高就更新历史
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //如果创建线程成功就启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

可以看到这段代码主要是基于当前线程池状态和线程池内的线程数来决定是否创建一个新的线程。

//创建一个worker对象,其实就是用ThreadFactory来创建一个线程
Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker对象是一个Runnable实现类,因此调用t.start();时会调到Worker.run()

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果状态是STOP且STOP之后的状态,且线程设置了中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //就响应中断,表示线程中断,不再运行
                wt.interrupt();
            try {
                //运行任务,任务运行的前后加了beforeExecute和afterExecute钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                //释放锁和更新记录
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这里需要注意的是,如果task==null,会执行getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        //拿到线程池的运行状态
        int rs = runStateOf(c);

        //异常检查
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        //拿到线程池的线程数量
        int wc = workerCountOf(c);

        //allowCoreThreadTimeOut是一个属性,当核心线程池也可以配置keepAlive时为true,默认是fasle
        //timed用于标识这个线程是否需要keepalived
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //异常检查
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //从队列里获取任务,对于timed情况就是设置一个最大阻塞时间,而非timed不需要
            //BlockingQueue我们之前已经讲过
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            //返回任务
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

5.4 ScheduledThreadPoolExecutor源码

相比于ThreadPoolExecutorScheduledThreadPoolExecutor提供了更丰富的定时功能,主要是提供了4个api:

//延迟执行任务
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);

//延迟执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit);

//在指定的initialDelay时间后执行任务,并具有周期性
//在上一个任务启动period后再次启动一个新的任务(无论上一个任务是否完成以及何时完成)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);
//在指定的initialDelay时间后执行任务,并具有周期性
//在上一个任务执行完成period后再次启动一个新的任务(不完成就不会启动下一个)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

在讲上面这些方法的源码前,我们先看下ScheduledThreadPoolExecutor的构造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

首先ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,但在构造方法里我们不难发现ScheduledThreadPoolExecutor不支持指定BlockingQueuemaximumPoolSize以及keepAliveTime,这主要是因为ScheduledThreadPoolExecutor的阻塞队列是自己实现的DelayedWorkQueue,这个队列是无限长的队列,因此有了任务会直接入队列,自然就不需要maximumPoolSize,不需要maximumPoolSize代表线程都是核心线程,也即不需要keepAliveTime做保活判断。

有了这些基础后,我们再来看一下上述这些方法的源码:

5.4.1 schedule()

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //将任务和一些时间属性构造成RunnableScheduledFuture对象
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //延迟执行任务
    delayedExecute(t);
    return t;
}

任务会首先被封装成ScheduledFutureTask类,这个类是ScheduledThreadPoolExecutor的内部类,它里面有几个重要的属性:

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    //任务和任务的运行结果
    private Callable<V> callable;
    private final long sequenceNumber;

    //任务开始运行的时间
    private long time;
    
    //任务的运行周期
    private final long period;
}

其中time就是当前系统时间+延迟运行的时间,如果当前任务是延迟任务而非周期任务时,period = 0,否则period等于周期时间。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果线程池状态是非运行态,直接拒绝任务
    if (isShutdown())
        reject(task);
    else {
        //将任务入队,但这里入队的就是DelayedWorkQueue
        super.getQueue().add(task);
        //判断当前任务是否可以在线程池SHUTDOWN的时候继续运行,可以由参数设置
        //如果不能且当前线程池状态是SHUTDOWN,则删除任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //否则初始化一些线程
            ensurePrestart();
    }
}
//初始化一些线程,留着,不一定运行任务
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

DelayedWorkQueue是一个阻塞队列用于装submit的任务,这个队列本质是一个优先队列(二叉堆),排序的规则是任务的开始时间,也即队列里的元素是ScheduledFutureTask对象,它们根据属性的time排序,二叉堆的特性和实现我们在前面JUC源码详细分析之Part3——《常见的线程安全集合》已经讲过。

我们在上面调用addWorker()的时候,传入的task == null,这样当线程启动的时候,会从workQueue里获取task,由于我们的workQueue实现是DelayedWorkQueue,因此当去take()的时候

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            //获得第一个元素,因为二叉堆的特性决定了第一个元素最小,也即开始运行时间最小,最先开始运行
            RunnableScheduledFuture<?> first = queue[0];
            //这里采用Lock Condition,我们在锁那章中已经讲过,如果队列为null,就条件阻塞,会在有数据入队列的时候被唤醒
            if (first == null)
                available.await();
            else {
                //getDelay是用ScheduledFutureTask的time属性减去当前时间,也即还剩多少等待时间
                long delay = first.getDelay(NANOSECONDS);
                //如果等待时间小于0,也即可以开始运行了
                if (delay <= 0)
                    //就直接将任务poll出来,与二叉堆源码类似,走sink操作
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                //这里会涉及一个leader属性,它是一个Thread类,类似于AQS里的exclusiveOwnerThread,指的都是当前获取锁的线程
                //由于操作队列需要线程安全,因此会加锁
                //每个线程去队列里poll数据的时候,都是会poll第一条数据,如果第一条数据还未到时间,
                //对于leader属性还未设置的,就先让自己抢占leader,然后挂起等待对应等待时间,条件变量挂起的时候会释放锁
                //这样相当于第一个任务就已经属于这个线程了,其他线程再进来会看到leader是已被赋值的 != null
                //代表已经有线程对第一个任务负责了,因此直接await()挂起。
                //那为何不先判断leader != null,而是先拿第一条数据判断是否能运行,如果能直接运行呢?
                //这其实和非公平锁的原理差不多,leader线程从唤醒到执行可能还得一段时间,但如果此时插入了别的线程,就可以直接拿来执行
                //此时被唤醒的leader线程会再次获取当前队列最新的第一个任务
                if (leader != null)
                    available.await();
                else {
                    //如果leader不为null,就设置leader为自己
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //这个任务属于自己了,因此挂起相应的时间
                        available.awaitNanos(delay);
                    } finally {
                        //最后poll出任务后,将leader置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //leader poll出任务后 唤醒其他被阻塞的线程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

总的来讲,当我们执行schedule()的时候,任务会被封装为ScheduledFutureTask,然后进入DelayedWorkQueueDelayedWorkQueue底层是一个二叉堆,会根据任务的开始运行时间排序,先运行的排在前面。然后当现在线程数量 workCount < corePoolSize的时候我们会创建一个tasknull的线程,这个线程创建后启动,会从DelayedWorkQueue里拉取任务。拉取的时候如果任务还未到运行时间就通过Lock Condition 条件变量挂起等待,醒来后继续拉取。拉取到任务后就运行任务。

Worker.runWorker()的运行是while true的形式,由于ScheduledThreadPoolExecutor的阻塞队列DelayedWorkQueue是无限长的,因此ScheduledThreadPoolExecutor中的线程都是core Pool,不存在销毁的情况(如果不设置core poolkeepalive的话),这时候每个线程运行完一个任务后会继续从队列里拉取下一个任务运行。整体相当于一个借助BlockingQueue的生产消费模型,我们提交任务的时候就是生产,任务入队列,线程池会从队列里poll任务来消费运行。

5.4.2 scheduleAtFixedRate()与scheduleWithFixedDelay()

scheduleAtFixedRate()scheduleWithFixedDelay()schedule()基本相同,唯一 不同的是在task的执行前判断是否是周期执行,如果是周期执行会设置下一次的运行时间,ScheduledFutureTask类继承自Runnable接口,调用Thread.start()的时候会先到达Worker.run(),然后到达ScheduledFutureTask.run(),下面是ScheduledFutureTask.run()的源码

public void run() {
    //判断是否是周期执行,我们之前说过如果是非周期执行,period是0,否则是周期值
    boolean periodic = isPeriodic();
    //异常判断
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //非周期执行的话,直接执行即可
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //周期性执行的话就执行任务并设置任务的下一次运行时间然后将任务重新入队
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

//判断是否是周期运行
public boolean isPeriodic() {
    return period != 0;
}

//设置下一次的运行时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

//任务重新入队执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //如果能继续运行就入队继续运行,否则取消任务
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

这里唯一需要说的是scheduleAtFixedRate()scheduleWithFixedDelay(),这两个方法在设置周期的时候,为区分两种情况scheduleAtFixedRate()的任务ScheduledFutureTask里的period就是正常的周期值,但scheduleWithFixedDelay()的任务ScheduledFutureTask里的period周期值取了反:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}


public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 //正常的周期
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      //可以看到这里取了反
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

因此ScheduledFutureTask.period小于0的就是scheduleWithFixedDelay,等于0的是非周期任务,大于0的是scheduleAtFixedRate

这一设置会在上面的setNextRunTime()中得以体现:

private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

可以看到,当周期大于0的时候,下一次运行时间是上一次开始运行的时间+周期,但周期小于0的,是当前时间+周期,这也就对应了scheduleAtFixedRatescheduleWithFixedDelay两种情况。

6. 结尾的一些话

计算机行业的发展似乎总是日新月异,新的技术和名词层出不穷,越来越多的人评价这个行业内卷,需要持续的学习。

金庸先生的武侠小说中有一门武功叫九阳真经,这是门心法武功,学会九阳真经后再学所有天下武功都是触类旁通,一学即会。天下武功,虽千变万化但最终的心法终究是一样的,掌握住这个最核心的心法,便掌握了天下的所有武功。

计算机的学习也是如此,而计算机界的九阳真经就是我们大学学的那些《操作系统》、《数据结构》、《算法》、《计算机网络》等知识。

笔者在写这篇文档的时候是2022年,距离Doug Lea完成JUC已经17年之久,而笔者重点参考的博客《深入浅出 Java Concurrency》也距今有12年之久。

那么多年,新的技术层出不穷,但JUC依然是现在Java面试的必考题。且不论面试题的八股与否,JUC源码的学习确实让笔者成长了不少,而JUC也是笔者首次揭开源码世界的大门。

笔者也坚持认为,像JUC这样的知识,它就是Java界的九阳真经(其实JUC的知识已经远超JAVA范畴),多读这些“核心功法”的源码,提升自己的开发内功,才能触类旁通,学什么都很快。

参考文档

《深入浅出 Java Concurrency》目录 - xylz,imxylz - BlogJava

从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队 (meituan.com)

不可不说的Java“锁”事 - 美团技术团队 (meituan.com)

Java 读写锁 ReentrantReadWriteLock 源码分析_Javadoop

《操作系统导论》

多读少写的场景 如何提高性能_awesome_go的博客-CSDN博客_读多写少

阅读 JDK 源码:线程池 ThreadPoolExecutor_Sumkor的博客-CSDN博客

最后修改:2023 年 07 月 23 日
如果觉得我的文章对你有用,请随意赞赏