 线程池源码解析
线程池源码解析
  # 线程池ThreadPoolExecuter源码解析
# Java创建线程的几种方式
# 继承 Thread 类,重写 run 方法
这种方式是最基础的创建线程方法。通过继承 Thread 类并重写其中的 run() 方法来定义线程要执行的任务,启动时调用 start() 方法。
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("通过继承Thread类创建线程");
    }
}
new MyThread().start();
2
3
4
5
6
7
特点:实现简单,但由于 Java 不支持多继承,扩展性较差。
# 实现 Runnable 接口,重写 run 方法
实现 Runnable 接口的 run() 方法后,把该实例作为参数传入 Thread 构造函数,通过 start() 启动线程。
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("通过实现Runnable接口创建线程");
    }
}
new Thread(new MyRunnable()).start();
2
3
4
5
6
7
特点:避免了单继承限制,适合多个线程共享同一个任务对象。
# 实现 Callable 接口,重写 call 方法(带返回值)
Callable 接口和 Runnable 类似,但 call() 方法可以有返回值,也可以抛出异常。因为 Thread 构造函数只接收 Runnable,所以需要配合 FutureTask 使用。FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 同时继承了 Runnable 和 Future 接口,因此既能被线程执行,又能拿到返回结果。
import java.util.concurrent.*;
class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("通过Callable创建线程");
        return 123;
    }
}
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
new Thread(futureTask).start();
System.out.println("返回结果:" + futureTask.get());
2
3
4
5
6
7
8
9
10
11
12
13
特点:可以获取线程执行结果,也能捕获异常,常用于需要返回值的异步任务。
# 使用线程池创建线程
线程池是通过 Executor 框架提供的,常见实现有 ThreadPoolExecutor。
线程池可以有效复用线程资源,避免频繁创建和销毁线程造成的开销。
import java.util.concurrent.*;
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
    int id = i;
    pool.submit(() -> {
        System.out.println("任务 " + id + " 在线程 " + Thread.currentThread().getName() + " 中执行");
    });
}
pool.shutdown();
2
3
4
5
6
7
8
9
10
特点:适合高并发场景下的任务提交和管理,能提高系统性能和资源利用率。
# 工作中你如何使用线程的?
在工作中我们主要通过线程池来管理和复用线程,避免频繁创建和销毁线程带来的性能开销。一般不会直接使用 new Thread() 创建线程,除非是非常临时、一次性的任务。
只在以下两种情况使用 new Thread():
- 非常简单的一次性任务,例如初始化时启动一个后台监控线程;
- 任务数量可控且确定只执行一次,例如某个模块启动阶段的单次异步操作。
我通常是自定义线程池,而不是通过 Executors 工具类创建。因为 Executors 提供的几种线程池(如 newFixedThreadPool、newCachedThreadPool 等)虽然方便,但存在潜在风险,主要是队列不可控或线程数量不可控,容易导致 OOM(内存溢出) 或系统过载。
# 线程池的7个核心参数
ThreadPoolExecutor 提供了更灵活的线程池管理方式,可以精细控制线程数、队列大小及拒绝策略,适用于高并发场景。
1. ThreadPoolExecutor 构造方法
public ThreadPoolExecutor(
    int corePoolSize,        // 核心线程数
    int maximumPoolSize,     // 最大线程数
    long keepAliveTime,      // 线程空闲时间
    TimeUnit unit,           // 时间单位
    BlockingQueue<Runnable> workQueue, // 任务队列
    ThreadFactory threadFactory,       // 线程工厂(可自定义线程命名)
    RejectedExecutionHandler handler   // 拒绝策略
)
2
3
4
5
6
7
8
9
2. 线程池参数解释
| 参数 | 含义 | 作用 | 
|---|---|---|
| corePoolSize | 核心线程数 | 线程池会维持的最小线程数量,即使它们处于空闲状态 | 
| maximumPoolSize | 最大线程数 | 线程池允许创建的最大线程数 | 
| keepAliveTime | 线程存活时间 | 当线程数大于 corePoolSize时,多余空闲线程的存活时间 | 
| workQueue | 任务队列 | 当核心线程满后,新任务进入队列等待 | 
| threadFactory | 线程工厂 | 创建线程的方式(可自定义线程命名) | 
| handler | 拒绝策略 | 当任务队列满时的处理方式 | 
在设置时,maximumPoolSize 一定要大于等于 corePoolSize。否则会抛出 IllegalArgumentException。因为最大线程数必须大于等于核心线程数,线程池才能在高峰期扩容。
一般我会根据任务类型来区分:
- CPU密集型:core 和 max 设成一样,等于 CPU核心数+1;
- IO密集型:max 设为 core 的 2~3 倍,增加并发度。keepAliveTime 控制非核心线程在高峰过后能自动回收。这样既能保证性能,又防止线程过多导致资源竞争。
4. 拒绝策略(RejectedExecutionHandler)
当任务超出 最大线程数 + 队列容量 时,线程池会触发拒绝策略:
| 拒绝策略 | 作用 | 
|---|---|
| AbortPolicy(默认) | 丢弃任务,抛出异常(适用于重要任务,需手动处理异常) | 
| CallerRunsPolicy | 由调用线程执行任务,不会抛异常(适用于主线程能承担任务执行,小心阻塞) | 
| DiscardPolicy | 丢弃任务,不抛异常(适用于不重要的任务,如日志) | 
| DiscardOldestPolicy | 丢弃队列中最早的任务,然后尝试执行新任务 | 
实际项目中可以定义 RejectedExecutionHandler,比如把被拒绝的任务写入本地缓存或消息队列,稍后由异步线程重试,样可以保证任务不丢失,同时避免系统直接崩溃。
# 线程池的工作原理

# 线程池的创建
在使用线程池之前,首先需要创建线程池。线程池的创建通常涉及以下参数:
- 核心线程数(Core Pool Size):始终存活的线程数量,即使没有任务也不会销毁。
- 最大线程数(Maximum Pool Size):线程池能创建的最大线程数量。
- 任务队列(Blocking Queue):用于存储等待执行的任务。
- 线程存活时间(Keep Alive Time):当线程数超过核心线程数时,空闲线程的存活时间。
- 线程工厂(Thread Factory):用于创建新线程的工厂方法。
- 拒绝策略(Rejected Execution Handler):当任务队列已满且线程数达到最大值时的处理策略。
# 任务提交
当有任务需要执行时,线程池提供两种方式提交任务:
- execute(Runnable command):适用于不需要返回结果的任务。
- submit(Callable<T> task):适用于需要返回结果的任务,会返回- Future<T>对象,可通过- get()方法获取执行结果。
# 线程分配
- 任务提交后,线程池优先使用核心线程执行任务。
- 如果核心线程已满,任务会进入任务队列等待执行。
- 当任务队列也满了,并且线程数未达到最大线程数时,线程池会创建新临时线程执行任务。
- 如果线程数已达最大值并且任务队列满了,则触发拒绝策略。
# 任务执行
线程池中的工作线程会不断从任务队列中取出任务并执行。当任务执行完成后,线程不会立即销毁,而是继续等待新任务。
# 线程回收
线程池的线程不会无限增长,而是根据配置参数进行回收:
- 核心线程默认不会被回收,即使空闲也会存活。
- 非核心线程(当线程数超过核心线程数时创建的临时线程)在**keepAliveTime** 设定的时间内没有新任务时,会被销毁。
# 任务完成与结果返回
- 如果任务是 Callable类型,线程池会返回Future<T>对象,可通过get()方法获取结果。
- 如果是 Runnable任务,则无返回值,任务执行完即结束。
# 线程池的异常处理
线程池内部会对任务执行过程中抛出的异常进行处理:
- 使用 execute()提交任务:如果任务中有未捕获的异常,线程会终止,线程池会创建新线程替换它。
- 使用 submit()提交任务:异常会被封装在Future中,线程不会终止,需要调用get()方法捕获ExecutionException。
# 线程池的属性标识
# 属性标识与状态变化
线程池通过一个 AtomicInteger ctl 同时表示两种信息:
- 高3位:线程池的运行状态(runState)
- 低29位:当前线程池中工作线程的数量(workerCount)
// ctl 是一个原子整型,用来同时表示两个信息:线程池的运行状态 + 当前工作线程数
// 高3位表示线程池状态(runState)
// 低29位表示线程数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE = 32,所以 COUNT_BITS = 29,代表低29位用于表示线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000111...111(二进制共29个1)——用于屏蔽高3位,只保留线程数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
/* 
 * ---------------- 线程池运行状态(runState) ----------------
 * 线程池的状态用高3位表示,总共有5种状态:
 * 111:RUNNING
 * 000:SHUTDOWN
 * 001:STOP
 * 010:TIDYING
 * 011:TERMINATED
 */
private static final int RUNNING    = -1 << COUNT_BITS; // 高3位是111,表示运行中(接受新任务 + 处理队列任务)
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高3位是000,表示不接收新任务,但继续执行队列任务
private static final int STOP       =  1 << COUNT_BITS; // 高3位是001,表示不接收任务,也中断正在执行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 高3位是010,表示所有任务结束,正在清理资源
private static final int TERMINATED =  3 << COUNT_BITS; // 高3位是011,表示线程池完全终止
/* 
 * ---------------- 状态与线程数的打包与拆解 ----------------
 * ctl 的高3位存状态,低29位存线程数,通过位运算实现分离。
 */
private static int runStateOf(int c)     { return c & ~COUNT_MASK; } // 提取高3位(状态)
private static int workerCountOf(int c)  { return c & COUNT_MASK; }  // 提取低29位(线程数)
private static int ctlOf(int rs, int wc) { return rs | wc; }         // 将状态与线程数打包成一个整型
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 线程池的状态变化
- RUNNING:正常运行,接收新任务并执行。
- SHUTDOWN:调用 shutdown()后进入此状态,不再接收新任务,但会继续执行队列中任务。
- STOP:调用 shutdownNow()后进入此状态,不接收新任务,中断正在执行的线程。
- TIDYING:所有任务执行完毕,线程池清理资源中。
- TERMINATED:完全终止状态,线程池生命周期结束。

# 线程池的execute方法
# execute() 方法讲解
// ThreadPoolExecutor.execute() 核心入口
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException(); // 参数校验,防止空任务
    int c = ctl.get(); // 获取线程池当前状态和线程数
    // 核心线程数未满,优先创建核心线程处理任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // true 表示核心线程
            return; // 创建成功直接返回
        c = ctl.get(); // 创建失败,重新读取 ctl
    }
    // 核心线程已满,任务尝试加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); // 再次检查状态,避免 race condition
        if (!isRunning(recheck) && remove(command))
            reject(command); // 队列加入后线程池已关闭,触发拒绝策略
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 队列中没有线程运行,创建非核心线程处理任务
    }
    // 队列已满且无法创建线程,直接拒绝任务
    else if (!addWorker(command, false))
        reject(command);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
关键点:
- 优先创建核心线程 → 再入队列 → 再创建非核心线程 → 都失败执行拒绝策略
- 保证任务不会丢失,同时控制线程数和队列容量
# addWorker() 方法讲解
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // 检查线程池状态,如果处于 SHUTDOWN 或以上且不允许创建线程,返回 false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        for (;;) {
            // 核心线程数或最大线程数限制
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // CAS 原子增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); // CAS 失败重试
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建 Worker 对象,同时生成 Thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 锁住 workers 集合,保证线程安全
            try {
                int c = ctl.get();
                // 核心线程池运行中,或者非核心线程且 firstTask 为 null
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException(); // 线程必须是 NEW 状态
                    workers.add(w); // 加入线程池集合
                    workerAdded = true;
                    if (workers.size() > largestPoolSize)
                        largestPoolSize = workers.size(); // 更新历史最大线程数
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 启动线程执行 run()
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w); // 创建失败回滚
    }
    return workerStarted;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
关键点:
- 核心逻辑:状态检查 → 线程数限制 → CAS 增加线程数 → Worker 创建 → 启动线程
- 保证线程池并发安全,避免超过最大线程数
# Worker 类讲解
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;      // Worker 对应的线程对象
    Runnable firstTask;       // 第一个任务,可能为空
    volatile long completedTasks; // 已完成任务计数
    Worker(Runnable firstTask) {
        setState(-1);         // 先禁止中断
        this.firstTask = firstTask;
        // 通过线程工厂创建 Thread 对象
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this); // 委托线程池执行任务循环
    }
  	/**
  	......
  	*/
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
关键点:
- 封装线程和任务,每个线程绑定一个 Worker
- run()委托- runWorker()循环处理任务
- 内部维护完成任务计数,用于统计和管理线程生命周期
# runWorker() 方法讲解
// runWorker 是每个 Worker 线程的主执行逻辑
// 当线程池启动线程时,线程会进入这里不断从队列中取任务执行
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 当前执行任务的线程
    Runnable task = w.firstTask;        // Worker 创建时传入的第一个任务(可能为空)
    w.firstTask = null;                 // 释放引用,帮助 GC
    w.unlock();                         // 允许被中断(Worker 构造时 setState(-1) 禁止中断)
    boolean completedAbruptly = true;   // 标记线程是否异常退出(用于后续回收处理)
    try {
        // 主循环:不断从任务队列中取任务执行
        // 若 firstTask 不为空,则先执行它
        // 若为空,则调用 getTask() 从队列取任务(可能阻塞)
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取 Worker 独占锁,保证同一线程执行任务过程的同步
            // 判断线程池状态:如果线程池在 STOP 状态,需要中断当前线程
            // 否则要保证线程未被误中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子方法:任务执行前调用(可用于监控、日志、统计)
                beforeExecute(wt, task);
                try {
                    task.run(); // 真正执行任务逻辑(调用 Runnable.run())
                    // 钩子方法:任务执行后调用(用于记录耗时或异常监控)
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 如果任务执行中抛异常,依然调用 afterExecute
                    afterExecute(task, ex);
                    throw ex; // 再抛出异常让外层 finally 处理
                }
            } finally {
                // 当前任务执行完毕
                task = null;
                w.completedTasks++; // 增加任务完成计数
                w.unlock(); // 释放 Worker 锁
            }
        }
        // 能执行到这里说明:getTask() 返回了 null(队列空 + 无需新任务)
        completedAbruptly = false; // 正常退出
    } finally {
        // 线程退出时的清理逻辑
        // 包括从 worker 集合移除,调整线程池大小等
        processWorkerExit(w, completedAbruptly);
    }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
关键点
- 核心逻辑:获取任务 → 执行任务(含钩子) → 检查状态 → 正常或异常退出
- 任务来源:优先执行 firstTask,其后不断从阻塞队列 getTask() 获取
- 执行前后调用钩子:beforeExecute()和afterExecute(),可扩展监控逻辑
- processWorkerExit():统一清理退出线程,维护线程池稳定性
- 保证线程池在任务执行、异常退出和关闭时都能安全过渡
# 线程池中线程异常后,销毁还是复用?
直接说结论,需要分两种情况:
- 使用execute()提交任务:当任务通过execute()提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 -
- 使用submit()提交任务:对于通过submit()提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
简单来说:使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()时,异常被封装在Future中,线程继续复用。 这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而execute()则适用于那些不需要关注执行结果的场景。
具体的源码分析可以参考这篇:线程池中线程异常后:销毁还是复用? - 京东技术 (opens new window)。
