0%

线程池`ThreadPoolExecutor`详解

线程池ThreadPoolExecutor详解

内置四种构造器,但最终重载的是下面的构造函数

1
2
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit
,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

构造函数的参数说明

  • corePoolSize
    • 线程池子的核心线程数
  • maximumPoolSize
    • 池子的最大线程数,是指最大正在运行中的数量;当核心线程和阻塞队列都满了,若此时有新的线程过来,则会创建线程
  • keepAliveTime
    • 超过核心线程数据的多余空闲线程的最大存活时间
  • unit
    • 时间单位
  • workQueue
    • 线程排队等待的阻塞队列,当核心线程都满了,此时若有新的线程过来,则会在这个队列中排队
  • threadFactory
    • 线程构建工厂,默认使用default工厂
  • handler
    • 拒绝策略,当核心线程满了,阻塞队列也满了,运行中的线程数量也达到最大数,此时若有线程进来,则会根据该策略进行拒绝服务
      • 默认提供有四种拒绝策略:
        • CallerRunsPolicy由调用者执行新的线程
        • AbortPolicy 直接拒绝并抛异常(默认策略)
        • DiscardPolicy 直接丢弃新线程
        • DiscardOldestPolicy 丢弃最旧的线程(最先提交而没有得到执行的任务)

corePoolSizemaximumPoolSizeworkQueue三者之间的关系

1.若无空闲线程执行该任务且当前运行中线程数少于corePoolSize,则直接创建新线程执行该任务。

2.若无空闲线程执行该任务且当前运行中线程数等于corePoolSize且阻塞队列未满,则将任务入队列等待,而不创建新线程

3.若无空闲线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新线程执行该任务

4.若无空闲线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务

ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()afterExecute()terminated()三个接口实现的,

  • beforeExecute:线程池中任务运行前执行
  • afterExecute:线程池中任务运行完毕后执行
  • terminated:线程池退出后执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ExecutorService pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5)
    , (runnable)-> new Thread(runnable, "threadPool" + runnable.hashCode())
    , new ThreadPoolExecutor.CallerRunsPolicy()) {
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
    System.out.println("准备执行:" + t.getName());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
    System.out.println("执行完毕:" + t.getMessage());
    }
    @Override
    protected void terminated() {
    System.out.println("线程池退出");
    }
    };

工作原理

主要参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 接收新任务,并且执行缓存任务队列中的任务
private static final int RUNNING = -1 << 29 ;// 1010 0000 0000 0000 0000 0000 0000 0000
// 不在接收新的任务,但是会执行缓存中的任务
private static final int SHUTDOWN = 0 << 29 ;// 0000 0000 0000 0000 0000 0000 0000 0000
// 不接收新的任务,不执行缓存中的任务,中断正在运行的任务
private static final int STOP = 1 << 29 ;// 0010 0000 0000 0000 0000 0000 0000 0000
// 所有任务已经终止,workCount = 0;
private static final int TIDYING = 2 << 29 ;// 0100 0000 0000 0000 0000 0000 0000 0000
// terminated 方法调用完成
private static final int TERMINATED = 3 << 29 ;// 0110 0000 0000 0000 0000 0000 0000 0000

// 缓存任务阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 线程池主锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// mainLock上的终止条件量,用于支持awaitTermination
private final Condition termination = mainLock.newCondition();
// 记录曾经创建的最大线程数
private int largestPoolSize;
// 已经完成任务数
private long completedTaskCount;
// 新线程创建工厂
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 超过核心线程数据的多余空闲线程的最大存活时间
private volatile long keepAliveTime;
// 是否允许核心线程也使用 keepAliveTime 策略
private volatile boolean allowCoreThreadTimeOut;
// 核心线程
private volatile int corePoolSize;
// 池子的最大线程数
private volatile int maximumPoolSize;
// 设置默认任务拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

核心方法submit

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);//任务包装
execute(ftask);//执行任务
return ftask;
}

其中newTaskFor是用来包装传入的任务,注意入口方法execute如期而至,下面我们就一点点揭开它那神秘的面纱以观其美妙的。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前运行中的线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//则直接创建一个线程并将核心线程计数加一,不是对线程进行标记是否为核心
if (addWorker(command, true))
return;//若添加成功则直接返回,若失败则进入下一步
c = ctl.get();
}
//判断当前是否可运行且队列是否未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次校验当前是否可运行
if (! isRunning(recheck) && remove(command))
reject(command);//若不可则调用拒绝策略
//判定核心线程运行数是否为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果核心与队列均满则直接添加任务
else if (!addWorker(command, false))
reject(command);
}

待揭开后发现还是未能触那美好的。。。别急,继续揭开核心方法addWorker(Runnable firstTask, boolean core)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private boolean addWorker(Runnable firstTask, boolean core) {
retry://标签,break或者continue到这里
for (;;) { //外层 死循环
int c = ctl.get();
//当前线程的状态
int rs = runStateOf(c);
//1. 如果线程池已经已经STOP,或者TIDYING,或者TERMINATED,并且firstTask == null,并且队列并不为空的情况下,
// 就直接return false
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) )
return false;
for (;;) { //内层 死循环
int wc = workerCountOf(c);
//2. 当线程池中的线程数量达到上限以后,返回false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//3. 如果线程数量能够增加,则break到retry标签,(跳过内外层两层循环),继续往下执行
if (compareAndIncrementWorkerCount(c)) // CAS操作
break retry;
c = ctl.get(); // 重新读取 ctl 的值
//4. 如果线程池的状态发生了改变,就continue到retry标签,重新执行外层循环
if (runStateOf(c) != rs)
continue retry;
//5. 如果是因为工作线程数量的变化导致CAS操作失败,那么继续执行内层循环
}
}
//6. 开始添加Worker的逻辑
//标记新添加的工作线程是否启动
boolean workerStarted = false;
//标记新的线程是否添加成功
boolean workerAdded = false;
//工作线程
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {//说明线程工厂成功的创建了一个线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
int rs = runStateOf(ctl.get());//再次检查
//如果线程池状态是RUNNING 或 线程池状态是SHUTDOWN 并且fistTask==null
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 表示线程已经被启动过了,则抛出异常
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//修改最大线程数
largestPoolSize = s;
workerAdded = true;//标记任务添加成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//如果成功添加任务,启动任务
t.start();
workerStarted = true;
}
}
} finally {
//最后判断启动任务如果失败,就把任务从工作集中移除,减小最大线程数largestPoolSize,并试着终止线程池
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;//返回任务是否成功启动
}

继续分析Worker中的runWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //允许中断
boolean completedAbruptly = true;
try {
//1. 从队列中取出任务可能会导致阻塞。
while (task != null || (task = getTask()) != null) {//如果task不为null或者从队列中取出的任务不为null
w.lock();
//2. 线程中断,下面有详细说明
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try { task.run(); //3. 执行任务
}catch (RuntimeException x) { thrown = x; throw x;
}catch (Error x) { thrown = x; throw x;
}catch (Throwable x) { thrown = x; throw new Error(x);
}finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//如果异常终止,则把当前任务从工作集中移除,试着结束线程池,添加一个新线程等等
processWorkerExit(w, completedAbruptly);
}
}

线程中断wt

  • 如果线程池处于STOP,TIDYING,TERMINATED状态并且wt没有被中断,则中断 wr
  • 如果当前线程被中断了并且线程池处于STOP,TIDYING,TERMINATED状态并且wt没有被中断,则中断 wr

getTask()从队列得到一个task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 执行阻塞或定时等待任务,具体取决于当前配置设置,或者当前worker由于以下原因必须停止的时候返回null;
* 1. worker的数量大于maximumPoolSize
* 2. 线程池停止了(STOP)
* 3. 线程池 shutdown 了并且任务队列empty
* 4. worker在等待任务的时候超时(非核心线程池超过存活时间,或者核心线程池超过存活时间),
* 超过存活时间的worker会终止,如果任务队列不为空,则当前线程不是线程池中最后一个线程
*
* @return 返回一个任务;或者当 worker必须退出的时候返回null,在这种情况下,workerCount 会减少。
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 在必要的条件下检查任务队列是否为empty
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//减少线程池中线程数量,返回null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务并返回,如果是定时等待任务,则使用poll方法,否则使用take方法
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}