且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

更新时间:2021-09-28 19:59:13

文章目录

一、线程池执行任务细节分析

二、线程池执行 execute 源码分析





一、线程池执行任务细节分析


线程池执行细节分析 :


核心线程数 10 1010 , 最大小成熟 20 2020 , 非核心线程数 10 1010 , 非核心线程空闲存活时间 60 6060 秒 , 阻塞队列大小 10 1010 个 ;


当有 Runnable 任务进入线程池后 ;


先查看 " 核心线程 " , 如果没有核心线程 , 先 创建核心线程 ;


如果有核心线程 , 则 查看核心线程是否有空闲的 ;


如果有空闲的核心线程 , 直接将该任务分配给该空闲核心线程 ;


如果没有空闲核心线程 , 则 查看核心线程数有没有满 ;


如果核心线程没有满 , 则 创建一个核心线程 , 然后执行该任务 ;


如果核心线程满了 , 将该任务放入 " 阻塞队列 " 中 , 查看阻塞队列是否已满 ;


如果阻塞队列没有满 , 直接 将任务放入阻塞队列中 ;


如果阻塞队列满了 , 则 查看是否能创建 " 非核心线程 " ;


如果能创建非核心线程 , 则 创建非核心线程 , 并执行该任务 ;


如果不能创建非核心线程 , 则 执行 " 拒绝策略 " ;






二、线程池执行 execute 源码分析


查看传入的 Runnable 任务是否为空 , 如果为空 , 就报异常 ;


   

if (command == null)
            throw new NullPointerException();


获取当前线程池的状态 , 根据不同的状态 , 执行不同的操作 ;


       

/*
         * 进行以下三个步骤处理:
         *
         * 1. 如果当前运行的线程 , 小于核心线程数 , 那么创建一个新的核心线程 , 
         * 将传入的任务作为该线程的第一个任务 . 
         * 调用 addWorker 方法 , 会原子性检查运行状态和任务数量 ; 
         * 如果在不应该添加线程的情况下执行添加线程操作 , 就会发出错误警报 ; 
         * 如果该方法返回 false , 说明当前不能添加线程 , 此时就不要执行添加线程的操作了 ; 
         *
         * 2. 如果任务被成功放入 线程池任务 队列 , 不管我们此时是否应该添加线程 , 都需要进行双重验证 ;
         * 双重验证 : 添加到任务队列时验证一次 , 添加到线程执行时验证一次 ; 
         * 可能存在这种情况 , 在上次验证线程运行状态之后 , 有可能该线程就立刻被销毁了 ;
         * 也可能存在进入该方法后 , 线程池被销毁的情况 ; 
         * 因此我们反复验证线程状态 , 如果需要在线程停止时回滚队列 , 如果没有线程就创建新线程 ;
         *
         * 3. 如果不能将任务放入队列中 , 尝试创建一个新线程 ; 
         * 如果创建线程失败 , 说明当前线程池关闭 , 或者线程池中线程饱和 , 此时拒绝执行该任务 ; 
         */
        int c = ctl.get();


上述 AtomicInteger ctl 线程池状态是很关键的原子变量 , 该原子变量中同时包含了线程池的线程数量 , 该值是一个组合的数值 ; 该 int 值 4 44 字节 32 3232 位 , 前 3 33 位是线程池的状态位 , 剩下的 29 2929 位是线程数 ;


/**
  * 主池控制状态ctl是一个原子整数
  * 两个概念领域
  * workerCount,指示有效线程数
  * 运行状态,指示是否运行、关闭等
  * 
  * 为了将它们打包成一个整数,我们将workerCount限制为
  * (2^29)-1(约5亿)个线程,而不是(2^31)-1(2
  * 10亿)否则可代表。如果这曾经是一个问题
  * 将来,变量可以更改为原子长度,
  * 下面的移位/遮罩常数已调整。但在需要之前
  * 因此,此代码使用int更快更简单。
  * 
  * workerCount是已注册的工人数
  * 允许启动,不允许停止。该值可能是
  * 与活动线程的实际数量暂时不同,
  * 例如,ThreadFactory在以下情况下无法创建线程:
  * 当退出线程仍在执行时
  * 终止前的簿记。用户可见池大小为
  * 报告为工作集的当前大小。
  * 
  * 运行状态提供主要的生命周期控制,具有以下值:
  * 
  * 正在运行:接受新任务和处理排队的任务
  * 关机:不接受新任务,但处理排队的任务
  * 停止:不接受新任务,不处理排队的任务,
  * 并中断正在进行的任务
  * 整理:所有任务都已终止,workerCount为零,
  * 正在转换为状态整理的线程
  * 将运行终止的()钩子方法
  * 终止:终止()已完成
  * 
  * 这些值之间的数字顺序很重要,以允许
  * 有序比较。运行状态随时间单调增加
  * 时间,但不需要击中每个状态。这些转变是:
  * 
  * 运行->关机
  * 在调用shutdown()时,可能隐式地在finalize()中
  * (运行或关闭)->停止
  * 在调用shutdownNow()时
  * 关机->整理
  * 当队列和池都为空时
  * 停止->整理
  * 当池为空时
  * 清理->终止
  * 当终止的()钩子方法完成时
  * 
  * 等待终止()的线程将在
  * 国家终止。
  * 
  * 检测从关闭到清理的过渡较少
  * 比您希望的简单,因为队列可能会
  * 非空后为空,关机状态下为空,但
  * 只有在看到它是空的之后,我们才能终止
  * workerCount为0(有时需要重新检查——请参阅
  * 下)。
  */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }


简单的机翻了下 , 如果查看详细的英文注释 , 查看 libcore/ojluni/src/main/java/java/util/concurrent/ThreadPoolExecutor.java 源码 ;



线程池的状态如下 , 有 5 55 种状态 ;


 

// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;


判断当前的工作线程数 workerCountOf(c) 是否小于核心线程数 corePoolSize ;


如果小于 , 则添加核心线程 addWorker(command, true) ;


这里注意 , 来了新任务后 , 不是先将任务放入阻塞队列 , 而是检查核心线程 , 先尝试将核心线程部署满 ;


     

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }


判断当前的线程池状态 isRunning(c) 是否正在执行处于 RUNNING 状态 , 如果当前线程池处于 RUNNING 状态 , 说明所有的核心线程都满了 , 则将任务队列放入阻塞队列中 workQueue.offer(command) ;


如果可以入队 , 重新检查状态 , 如果必要 回滚排队 ! isRunning(recheck) && remove(command) , 重新检查状态通过后 , addWorker(null, false) 将任务添加如阻塞队列中 ;


入队失败 , 尝试添加非核心线程 !addWorker(command, false) , 如果非核心线程也失败 , 则执行拒绝策略 reject(command) ;

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);