Yang's blog Yang's blog
首页
后端开发
密码学
机器学习
命令手册
关于
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

xiaoyang

尽人事,听天命
首页
后端开发
密码学
机器学习
命令手册
关于
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • SpringCloud

    • 微服务架构介绍
    • SpringCloud介绍
    • Spring Cloud:生产者与消费者
    • Spring Cloud Eureka:构建可靠的服务注册与发现
    • Spring Cloud Ribbon:负载均衡
    • Spring Cloud Fegin:服务调用
    • Spring Cloud Hystrix:熔断器
    • Spring Cloud Zuul:统一网关路由
    • Spring Cloud Config:配置中心
  • Java后端框架

    • LangChain4j

      • 介绍
      • 快速开始
      • Chat and Language Models
      • Chat Memory
      • Model Parameters
      • Response Streaming
      • AI Services
      • Agent
      • Tools (Function Calling)
      • RAG
      • Structured Outputs
      • Classification
      • Embedding (Vector) Stores
      • Image Models
      • Quarkus Integration
      • Spring Boot Integration
      • Kotlin Support
      • Logging
      • Observability
      • Testing and Evaluation
      • Model Context Protocol
  • 八股文

    • 操作系统
    • JVM介绍
    • Java多线程
    • Java集合框架
    • Java反射
    • JavaIO
    • Mybatis介绍
    • Spring介绍
    • SpringBoot介绍
    • Mysql
    • Redis
    • 数据结构
    • 云计算
    • 设计模式
    • 计算机网络
    • 锁核心类AQS
    • Nginx
    • 面试场景题
    • 线程池源码解析
      • Java创建线程的几种方式
        • 继承 Thread 类,重写 run 方法
        • 实现 Runnable 接口,重写 run 方法
        • 实现 Callable 接口,重写 call 方法(带返回值)
        • 使用线程池创建线程
      • 工作中你如何使用线程的?
      • 线程池的7个核心参数
      • 线程池的工作原理
        • 线程池的创建
        • 任务提交
        • 线程分配
        • 任务执行
        • 线程回收
        • 任务完成与结果返回
        • 线程池的异常处理
      • 线程池的属性标识
        • 属性标识与状态变化
        • 线程池的状态变化
      • 线程池的execute方法
        • execute() 方法讲解
        • addWorker() 方法讲解
        • Worker 类讲解
        • runWorker() 方法讲解
      • 线程池中线程异常后,销毁还是复用?
  • 前端技术

    • 初识Vue3
    • Vue3数据双向绑定
    • Vue3生命周期
    • Vue-Router 组件
    • Pinia 集中式状态存储
  • 中间件

    • RocketMQ
  • 开发知识

    • 请求参数注解
    • 时间复杂度和空间复杂度
    • JSON序列化与反序列化
    • Timestamp vs Datetime
    • Java开发中必备能力单元测试
    • 正向代理和反向代理
    • 什么是VPN
    • 后端服务端主动推送消息的常见方式
    • 正则表达式
    • SseEmitter vs Flux 的本质区别与底层原理解析
    • Function Calling与MCP区别
    • 分布式事务
  • 后端开发
  • 八股文
xiaoyang
2025-10-31
目录

线程池源码解析

# 线程池ThreadPoolExecuter源码解析

# Java创建线程的几种方式

# 继承 Thread 类,重写 run 方法

这种方式是最基础的创建线程方法。通过继承 Thread 类并重写其中的 run() 方法来定义线程要执行的任务,启动时调用 start() 方法。

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("通过继承Thread类创建线程");
    }
}
new MyThread().start();
1
2
3
4
5
6
7

特点:实现简单,但由于 Java 不支持多继承,扩展性较差。


# 实现 Runnable 接口,重写 run 方法

实现 Runnable 接口的 run() 方法后,把该实例作为参数传入 Thread 构造函数,通过 start() 启动线程。

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("通过实现Runnable接口创建线程");
    }
}
new Thread(new MyRunnable()).start();
1
2
3
4
5
6
7

特点:避免了单继承限制,适合多个线程共享同一个任务对象。


# 实现 Callable 接口,重写 call 方法(带返回值)

Callable 接口和 Runnable 类似,但 call() 方法可以有返回值,也可以抛出异常。因为 Thread 构造函数只接收 Runnable,所以需要配合 FutureTask 使用。FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 同时继承了 Runnable 和 Future 接口,因此既能被线程执行,又能拿到返回结果。

import java.util.concurrent.*;

class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("通过Callable创建线程");
        return 123;
    }
}

FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
new Thread(futureTask).start();
System.out.println("返回结果:" + futureTask.get());
1
2
3
4
5
6
7
8
9
10
11
12
13

特点:可以获取线程执行结果,也能捕获异常,常用于需要返回值的异步任务。


# 使用线程池创建线程

线程池是通过 Executor 框架提供的,常见实现有 ThreadPoolExecutor。
线程池可以有效复用线程资源,避免频繁创建和销毁线程造成的开销。

import java.util.concurrent.*;

ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
    int id = i;
    pool.submit(() -> {
        System.out.println("任务 " + id + " 在线程 " + Thread.currentThread().getName() + " 中执行");
    });
}
pool.shutdown();
1
2
3
4
5
6
7
8
9
10

特点:适合高并发场景下的任务提交和管理,能提高系统性能和资源利用率。

# 工作中你如何使用线程的?

在工作中我们主要通过线程池来管理和复用线程,避免频繁创建和销毁线程带来的性能开销。一般不会直接使用 new Thread() 创建线程,除非是非常临时、一次性的任务。

只在以下两种情况使用 new Thread():

  1. 非常简单的一次性任务,例如初始化时启动一个后台监控线程;
  2. 任务数量可控且确定只执行一次,例如某个模块启动阶段的单次异步操作。

我通常是自定义线程池,而不是通过 Executors 工具类创建。因为 Executors 提供的几种线程池(如 newFixedThreadPool、newCachedThreadPool 等)虽然方便,但存在潜在风险,主要是队列不可控或线程数量不可控,容易导致 OOM(内存溢出) 或系统过载。

# 线程池的7个核心参数

ThreadPoolExecutor 提供了更灵活的线程池管理方式,可以精细控制线程数、队列大小及拒绝策略,适用于高并发场景。

1. ThreadPoolExecutor 构造方法

public ThreadPoolExecutor(
    int corePoolSize,        // 核心线程数
    int maximumPoolSize,     // 最大线程数
    long keepAliveTime,      // 线程空闲时间
    TimeUnit unit,           // 时间单位
    BlockingQueue<Runnable> workQueue, // 任务队列
    ThreadFactory threadFactory,       // 线程工厂(可自定义线程命名)
    RejectedExecutionHandler handler   // 拒绝策略
)
1
2
3
4
5
6
7
8
9

2. 线程池参数解释

参数 含义 作用
corePoolSize 核心线程数 线程池会维持的最小线程数量,即使它们处于空闲状态
maximumPoolSize 最大线程数 线程池允许创建的最大线程数
keepAliveTime 线程存活时间 当线程数大于 corePoolSize 时,多余空闲线程的存活时间
workQueue 任务队列 当核心线程满后,新任务进入队列等待
threadFactory 线程工厂 创建线程的方式(可自定义线程命名)
handler 拒绝策略 当任务队列满时的处理方式

在设置时,maximumPoolSize 一定要大于等于 corePoolSize。否则会抛出 IllegalArgumentException。因为最大线程数必须大于等于核心线程数,线程池才能在高峰期扩容。
一般我会根据任务类型来区分:

  • CPU密集型:core 和 max 设成一样,等于 CPU核心数+1;
  • IO密集型:max 设为 core 的 2~3 倍,增加并发度。keepAliveTime 控制非核心线程在高峰过后能自动回收。这样既能保证性能,又防止线程过多导致资源竞争。

4. 拒绝策略(RejectedExecutionHandler)

当任务超出 最大线程数 + 队列容量 时,线程池会触发拒绝策略:

拒绝策略 作用
AbortPolicy(默认) 丢弃任务,抛出异常(适用于重要任务,需手动处理异常)
CallerRunsPolicy 由调用线程执行任务,不会抛异常(适用于主线程能承担任务执行,小心阻塞)
DiscardPolicy 丢弃任务,不抛异常(适用于不重要的任务,如日志)
DiscardOldestPolicy 丢弃队列中最早的任务,然后尝试执行新任务

实际项目中可以定义 RejectedExecutionHandler,比如把被拒绝的任务写入本地缓存或消息队列,稍后由异步线程重试,样可以保证任务不丢失,同时避免系统直接崩溃。

# 线程池的工作原理

image-20250220162925362

# 线程池的创建

在使用线程池之前,首先需要创建线程池。线程池的创建通常涉及以下参数:

  • 核心线程数(Core Pool Size):始终存活的线程数量,即使没有任务也不会销毁。
  • 最大线程数(Maximum Pool Size):线程池能创建的最大线程数量。
  • 任务队列(Blocking Queue):用于存储等待执行的任务。
  • 线程存活时间(Keep Alive Time):当线程数超过核心线程数时,空闲线程的存活时间。
  • 线程工厂(Thread Factory):用于创建新线程的工厂方法。
  • 拒绝策略(Rejected Execution Handler):当任务队列已满且线程数达到最大值时的处理策略。

# 任务提交

当有任务需要执行时,线程池提供两种方式提交任务:

  • execute(Runnable command):适用于不需要返回结果的任务。
  • submit(Callable<T> task):适用于需要返回结果的任务,会返回 Future<T> 对象,可通过 get() 方法获取执行结果。

# 线程分配

  • 任务提交后,线程池优先使用核心线程执行任务。
  • 如果核心线程已满,任务会进入任务队列等待执行。
  • 当任务队列也满了,并且线程数未达到最大线程数时,线程池会创建新临时线程执行任务。
  • 如果线程数已达最大值并且任务队列满了,则触发拒绝策略。

# 任务执行

线程池中的工作线程会不断从任务队列中取出任务并执行。当任务执行完成后,线程不会立即销毁,而是继续等待新任务。

# 线程回收

线程池的线程不会无限增长,而是根据配置参数进行回收:

  • 核心线程默认不会被回收,即使空闲也会存活。
  • 非核心线程(当线程数超过核心线程数时创建的临时线程)在**keepAliveTime** 设定的时间内没有新任务时,会被销毁。

# 任务完成与结果返回

  • 如果任务是 Callable 类型,线程池会返回 Future<T> 对象,可通过 get() 方法获取结果。
  • 如果是 Runnable 任务,则无返回值,任务执行完即结束。

# 线程池的异常处理

线程池内部会对任务执行过程中抛出的异常进行处理:

  • 使用 execute() 提交任务:如果任务中有未捕获的异常,线程会终止,线程池会创建新线程替换它。
  • 使用 submit() 提交任务:异常会被封装在 Future 中,线程不会终止,需要调用 get() 方法捕获 ExecutionException。

# 线程池的属性标识

# 属性标识与状态变化

线程池通过一个 AtomicInteger ctl 同时表示两种信息:

  • 高3位:线程池的运行状态(runState)
  • 低29位:当前线程池中工作线程的数量(workerCount)
// ctl 是一个原子整型,用来同时表示两个信息:线程池的运行状态 + 当前工作线程数
// 高3位表示线程池状态(runState)
// 低29位表示线程数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// Integer.SIZE = 32,所以 COUNT_BITS = 29,代表低29位用于表示线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000111...111(二进制共29个1)——用于屏蔽高3位,只保留线程数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

/* 
 * ---------------- 线程池运行状态(runState) ----------------
 * 线程池的状态用高3位表示,总共有5种状态:
 * 111:RUNNING
 * 000:SHUTDOWN
 * 001:STOP
 * 010:TIDYING
 * 011:TERMINATED
 */
private static final int RUNNING    = -1 << COUNT_BITS; // 高3位是111,表示运行中(接受新任务 + 处理队列任务)
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高3位是000,表示不接收新任务,但继续执行队列任务
private static final int STOP       =  1 << COUNT_BITS; // 高3位是001,表示不接收任务,也中断正在执行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 高3位是010,表示所有任务结束,正在清理资源
private static final int TERMINATED =  3 << COUNT_BITS; // 高3位是011,表示线程池完全终止

/* 
 * ---------------- 状态与线程数的打包与拆解 ----------------
 * ctl 的高3位存状态,低29位存线程数,通过位运算实现分离。
 */
private static int runStateOf(int c)     { return c & ~COUNT_MASK; } // 提取高3位(状态)
private static int workerCountOf(int c)  { return c & COUNT_MASK; }  // 提取低29位(线程数)
private static int ctlOf(int rs, int wc) { return rs | wc; }         // 将状态与线程数打包成一个整型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 线程池的状态变化

  • RUNNING:正常运行,接收新任务并执行。
  • SHUTDOWN:调用 shutdown() 后进入此状态,不再接收新任务,但会继续执行队列中任务。
  • STOP:调用 shutdownNow() 后进入此状态,不接收新任务,中断正在执行的线程。
  • TIDYING:所有任务执行完毕,线程池清理资源中。
  • TERMINATED:完全终止状态,线程池生命周期结束。

image-20251031104956734

# 线程池的execute方法

# execute() 方法讲解

// ThreadPoolExecutor.execute() 核心入口
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException(); // 参数校验,防止空任务

    int c = ctl.get(); // 获取线程池当前状态和线程数
    // 核心线程数未满,优先创建核心线程处理任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // true 表示核心线程
            return; // 创建成功直接返回
        c = ctl.get(); // 创建失败,重新读取 ctl
    }

    // 核心线程已满,任务尝试加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); // 再次检查状态,避免 race condition
        if (!isRunning(recheck) && remove(command))
            reject(command); // 队列加入后线程池已关闭,触发拒绝策略
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 队列中没有线程运行,创建非核心线程处理任务
    }
    // 队列已满且无法创建线程,直接拒绝任务
    else if (!addWorker(command, false))
        reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

关键点:

  • 优先创建核心线程 → 再入队列 → 再创建非核心线程 → 都失败执行拒绝策略
  • 保证任务不会丢失,同时控制线程数和队列容量

# addWorker() 方法讲解

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // 检查线程池状态,如果处于 SHUTDOWN 或以上且不允许创建线程,返回 false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            // 核心线程数或最大线程数限制
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // CAS 原子增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); // CAS 失败重试
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建 Worker 对象,同时生成 Thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 锁住 workers 集合,保证线程安全
            try {
                int c = ctl.get();
                // 核心线程池运行中,或者非核心线程且 firstTask 为 null
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException(); // 线程必须是 NEW 状态
                    workers.add(w); // 加入线程池集合
                    workerAdded = true;
                    if (workers.size() > largestPoolSize)
                        largestPoolSize = workers.size(); // 更新历史最大线程数
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 启动线程执行 run()
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w); // 创建失败回滚
    }
    return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

关键点:

  • 核心逻辑:状态检查 → 线程数限制 → CAS 增加线程数 → Worker 创建 → 启动线程
  • 保证线程池并发安全,避免超过最大线程数

# Worker 类讲解

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;      // Worker 对应的线程对象
    Runnable firstTask;       // 第一个任务,可能为空
    volatile long completedTasks; // 已完成任务计数

    Worker(Runnable firstTask) {
        setState(-1);         // 先禁止中断
        this.firstTask = firstTask;
        // 通过线程工厂创建 Thread 对象
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this); // 委托线程池执行任务循环
    }
  	/**
  	......
  	*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

关键点:

  • 封装线程和任务,每个线程绑定一个 Worker
  • run() 委托 runWorker() 循环处理任务
  • 内部维护完成任务计数,用于统计和管理线程生命周期

# runWorker() 方法讲解

// runWorker 是每个 Worker 线程的主执行逻辑
// 当线程池启动线程时,线程会进入这里不断从队列中取任务执行
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 当前执行任务的线程
    Runnable task = w.firstTask;        // Worker 创建时传入的第一个任务(可能为空)
    w.firstTask = null;                 // 释放引用,帮助 GC
    w.unlock();                         // 允许被中断(Worker 构造时 setState(-1) 禁止中断)
    boolean completedAbruptly = true;   // 标记线程是否异常退出(用于后续回收处理)

    try {
        // 主循环:不断从任务队列中取任务执行
        // 若 firstTask 不为空,则先执行它
        // 若为空,则调用 getTask() 从队列取任务(可能阻塞)
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取 Worker 独占锁,保证同一线程执行任务过程的同步

            // 判断线程池状态:如果线程池在 STOP 状态,需要中断当前线程
            // 否则要保证线程未被误中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // 钩子方法:任务执行前调用(可用于监控、日志、统计)
                beforeExecute(wt, task);
                try {
                    task.run(); // 真正执行任务逻辑(调用 Runnable.run())
                    // 钩子方法:任务执行后调用(用于记录耗时或异常监控)
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 如果任务执行中抛异常,依然调用 afterExecute
                    afterExecute(task, ex);
                    throw ex; // 再抛出异常让外层 finally 处理
                }
            } finally {
                // 当前任务执行完毕
                task = null;
                w.completedTasks++; // 增加任务完成计数
                w.unlock(); // 释放 Worker 锁
            }
        }
        // 能执行到这里说明:getTask() 返回了 null(队列空 + 无需新任务)
        completedAbruptly = false; // 正常退出
    } finally {
        // 线程退出时的清理逻辑
        // 包括从 worker 集合移除,调整线程池大小等
        processWorkerExit(w, completedAbruptly);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

关键点

  • 核心逻辑:获取任务 → 执行任务(含钩子) → 检查状态 → 正常或异常退出
  • 任务来源:优先执行 firstTask,其后不断从阻塞队列 getTask() 获取
  • 执行前后调用钩子:beforeExecute() 和 afterExecute(),可扩展监控逻辑
  • processWorkerExit():统一清理退出线程,维护线程池稳定性
  • 保证线程池在任务执行、异常退出和关闭时都能安全过渡

# 线程池中线程异常后,销毁还是复用?

直接说结论,需要分两种情况:

  • 使用execute()提交任务:当任务通过execute()提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 -
  • 使用submit()提交任务:对于通过submit()提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。

简单来说:使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()时,异常被封装在Future中,线程继续复用。 这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而execute()则适用于那些不需要关注执行结果的场景。

具体的源码分析可以参考这篇:线程池中线程异常后:销毁还是复用? - 京东技术 (opens new window)。

编辑 (opens new window)
上次更新: 2025/10/31, 08:51:39

← 面试场景题 初识Vue3→

最近更新
01
分布式事务
10-27
02
面试场景题
08-22
03
Function Calling与MCP区别
06-25
更多文章>
Theme by Vdoing | Copyright © 2023-2025 xiaoyang | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式