Java多线程
# Java多线程
Java 提供了强大的并发支持,包括基础的 Thread
、Runnable
机制,以及更高级的JUC(java.util.concurrent)并发工具包,涵盖线程池、锁机制、并发集合等,极大地简化了高并发场景下的开发。合理利用多线程不仅能充分发挥多核 CPU 的计算能力,还能优化资源管理,提高系统吞吐量。然而,多线程编程也伴随着线程安全、资源竞争等挑战,需要开发者在设计时合理选择并发工具,确保程序的稳定性和高效性。
# 1. 线程和进程的区别
在操作系统中,进程(Process) 是程序的运行实例,每个进程拥有独立的内存空间,是操作系统进行资源分配的基本单位,而 线程(Thread) 是进程中的执行单元,多个线程共享同一进程的资源,是CPU调度的基本单位。主要区别如下:
对比项 | 进程(Process) | 线程(Thread) |
---|---|---|
资源独立性 | 进程之间相互独立,拥有独立的内存空间 | 线程共享进程的堆、方法区等资源 |
通信方式 | 进程间通信(IPC)复杂,如管道、消息队列 | 线程间共享数据,通信更简单 |
切换开销 | 进程切换开销大,需要操作系统调度 | 线程切换开销小,效率更高 |
稳定性 | 一个进程崩溃不会影响其他进程 | 线程崩溃可能影响整个进程 |
在 Java 中,每个 Java 应用程序运行在 JVM 进程中,并可创建多个线程以实现并发执行。
# 2. 并发和并行的区别
- 并发 适用于 单核 CPU,指的是多个任务交替执行,每个任务在一定时间内执行一部分,然后切换到下一个任务。这是因为单核 CPU 在同一时刻只能执行一个任务。并发的主要目的是 提高系统的响应性和吞吐量,使多个任务能够共享 CPU 的时间片,从而提升整体效率。
- 并行 适用于 多核 CPU,指的是多个任务真正同时执行,每个任务都由独立的处理器核心负责,并行处理不同的指令。并行的主要目的是 提升计算能力和执行性能,使多个任务能同步处理,加快任务完成速度。
单核 CPU 只能实现并发,无法实现并行。换句话说,并行计算只有在 多核 CPU 环境下才可能发生。而在 多核 CPU 中,并发和并行通常会 同时存在:多个任务可以在不同的核心上并行执行,而每个任务内部可能还包含并发逻辑,以处理不同的子任务。这种组合方式能够最大程度地提升系统性能和响应效率。
# 3. 线程的创建方式
在 Java 中,创建线程的方式有以下几种:继承 Thread
类、实现 Runnable
接口、实现 Callable
接口(配合 FutureTask
使用)。无论采用哪种方式,本质上所有线程最终都会通过 new Thread(Runnable target)
创建并执行,底层都是实现Runable接口。
# 3.1 继承 Thread 类
通过继承 Thread
类并重写 run()
方法来创建线程。
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
public class ThreadTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
2
3
4
5
6
7
8
9
10
11
12
13
特点
- 直接继承
Thread
,代码简洁。 run()
方法定义线程的执行逻辑。- 局限性:Java 只支持单继承,继承
Thread
后无法继承其他类。
# 3.2 实现 Runnable 接口
创建线程的推荐方式。通过实现 Runnable
接口,并将 Runnable
对象传递给 Thread
来启动线程。
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable thread running: " + Thread.currentThread().getName());
}
}
public class RunnableTest {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
特点
Runnable
方式更灵活,不影响类的继承(可同时继承其他类)。- 适用于 多个线程共享同一个任务 的情况。
Thread
类本质上也是依赖Runnable
来执行任务。
# 3.3 实现 Callable 接口
Callable
接口比 Runnable
更加强大,它可以返回执行结果,并抛出异常。
import java.util.concurrent.*;
class MyCallable implements Callable<String> {
@Override
public String call() {
return "Callable task executed by " + Thread.currentThread().getName();
}
}
public class CallableTest {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new MyCallable());
System.out.println(future.get()); // 获取任务执行结果
executor.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
特点
call()
方法可以返回值,相比Runnable
更加强大。- 需要使用
FutureTask
或ExecutorService.submit()
来管理Callable
任务。 - 适用于需要获取执行结果的任务(如异步计算)。
# 4. 线程的 6 种状态详解
在 Java 中,线程在生命周期中只可能处于以下 6 种状态之一,并会在不同状态之间切换。
状态 | 说明 | 如何进入 | 如何退出 |
---|---|---|---|
NEW(新建) | 线程创建但未启动 | new Thread() | 调用 start() 进入 RUNNABLE |
RUNNABLE(可运行) | 线程已启动,等待 CPU 调度 | 调用 start() | 获得 CPU 资源变为 RUNNING,或因等待变为 BLOCKED/WAITING/TIMED_WAITING |
BLOCKED(阻塞) | 线程等待获取锁 | 竞争 synchronized 锁失败 | 锁释放后进入 RUNNABLE |
WAITING(等待) | 线程无限期等待其他线程通知 | 调用 wait() 、join() 、LockSupport.park() | 其他线程 notify() 、notifyAll() 或 interrupt() |
TIMED_WAITING(超时等待) | 线程等待固定时间 | 调用 sleep(time) 、join(time) 、wait(time) 、LockSupport.parkNanos() 、parkUntil() | 计时结束或提前被唤醒 |
TERMINATED(终止) | 线程执行完成或异常退出 | run() 方法结束 | 线程无法再次启动 |
# 5. wait/notify 机制
在 Java 多线程编程中,wait/notify
机制是线程间通信的核心手段之一。它允许多个线程协作,使得线程能够有序地等待和唤醒,避免不必要的资源浪费。
由前面线程的状态转化图可知,当调用wait()方法后,线程会进入WAITING(等待状态),后续被notify()后,并没有立即被执行,而是进入等待获取锁的阻塞队列。
# 5.1 wait/notify 是什么?
wait/notify
机制是基于 Object
类 的方法,包括:
wait()
:当前线程释放锁,并进入等待状态,直到其他线程调用notify()
或notifyAll()
唤醒它。notify()
:随机唤醒一个正在wait()
的线程。notifyAll()
:唤醒所有正在wait()
的线程,但只有一个线程能获取锁,其余线程继续等待。
⚠️ 注意:wait()
、notify()
和 notifyAll()
必须在 synchronized
代码块中调用,否则会抛出 IllegalMonitorStateException
异常!
# 5.2 工作原理
当一个线程调用 wait()
方法时,它会进入 等待队列,同时释放锁,进入阻塞状态; 当其他线程调用 notify()
或 notifyAll()
,等待的线程会被唤醒,重新争夺锁。
示例:生产者-消费者模型
class SharedResource {
private int data;
private boolean available = false;
public synchronized void produce(int value) throws InterruptedException {
while (available) { // 若已有数据,则等待消费
wait();
}
data = value;
available = true;
System.out.println("Produced: " + value);
notify(); // 唤醒等待的消费者
}
public synchronized void consume() throws InterruptedException {
while (!available) { // 若无数据,则等待生产
wait();
}
System.out.println("Consumed: " + data);
available = false;
notify(); // 唤醒等待的生产者
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
生产者线程
class Producer extends Thread {
private SharedResource resource;
public Producer(SharedResource resource) { this.resource = resource; }
public void run() {
try {
for (int i = 1; i <= 5; i++) {
resource.produce(i);
Thread.sleep(1000);
}
} catch (InterruptedException e) { e.printStackTrace(); }
}
}
2
3
4
5
6
7
8
9
10
11
12
消费者线程
class Consumer extends Thread {
private SharedResource resource;
public Consumer(SharedResource resource) { this.resource = resource; }
public void run() {
try {
for (int i = 1; i <= 5; i++) {
resource.consume();
Thread.sleep(1500);
}
} catch (InterruptedException e) { e.printStackTrace(); }
}
}
2
3
4
5
6
7
8
9
10
11
12
启动线程
public class WaitNotifyExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
new Producer(resource).start();
new Consumer(resource).start();
}
}
2
3
4
5
6
7
# 5.3 wait/notify 需要注意的点
- 必须在
synchronized
代码块/方法中调用,否则会抛出IllegalMonitorStateException
。 wait()
释放锁,而notify()
只是唤醒,不会立刻释放锁,被唤醒的线程仍需等待锁释放后才能继续执行。- 避免死锁:应使用
while
而非if
进行条件判断,防止虚假唤醒(spurious wakeup)。 notify()
不能保证唤醒哪个线程,如果有多个wait()
线程,notify()
可能随机选择一个线程。
# 6. 多线程的并发安全问题
线程安全(Thread Safety) 指的是当多个线程并发访问共享资源时,不会导致数据不一致、程序异常或系统崩溃。线程不安全的情况通常会引发以下问题:
# 6.1 竞态条件(Race Condition)
发生在多个线程并发访问和修改共享资源时,资源访问缺乏互斥。程序的执行结果依赖于线程的调度顺序,导致不可预测的错误。例如,两个线程同时对一个变量 x
递增,最终 x
的值可能小于预期。
示例(Java):
class Counter {
private int count = 0;
public void increment() {
count++; // 非线程安全
}
public int getCount() {
return count;
}
}
public class RaceConditionDemo {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final count: " + counter.getCount()); // 可能小于 2000
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
问题: count++
不是原子操作,多个线程可能同时读取 count
,导致最终结果不正确。
# 6.2 死锁(Deadlock)
发生在多个线程互相等待对方释放资源,导致程序无法继续执行。例如,线程 A 持有资源 X,并等待资源 Y,而线程 B 持有资源 Y,并等待资源 X,最终两个线程都无法继续执行。
死锁的四个必要条件(同时满足才会发生死锁)
- 互斥条件(Mutual Exclusion)
- 资源一次只能被一个线程占有,其他线程必须等待。
- 占有且等待(Hold and Wait)
- 线程已持有部分资源,同时又在等待其他线程释放它需要的资源。
- 不可抢占(No Preemption)
- 已经被占有的资源不能被强制剥夺,只能由持有线程主动释放。
- 循环等待(Circular Wait)
- 线程之间形成环形依赖,即线程 A 等待线程 B 的资源,线程 B 等待线程 C,最后某个线程又等待线程 A,形成死锁。
⚠️ 解决死锁的关键是 打破其中一个条件!
示例(Java):
class DeadlockExample {
private static final Object resource1 = new Object();
private static final Object resource2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (resource1) {
System.out.println("Thread 1: Locked resource 1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (resource2) {
System.out.println("Thread 1: Locked resource 2");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (resource2) {
System.out.println("Thread 2: Locked resource 2");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (resource1) {
System.out.println("Thread 2: Locked resource 1");
}
}
});
t1.start();
t2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
问题:
- 线程 1 先锁住
resource1
,然后等待resource2
。 - 线程 2 先锁住
resource2
,然后等待resource1
。 - 结果:两个线程相互等待,形成死锁,程序卡死。
# 6.3 资源争用(Resource Contention)
发生在多个线程需要访问同一个资源(如 CPU、内存、文件、数据库连接)时,导致性能下降。例如,大量线程同时访问数据库,可能会导致连接池耗尽,影响系统吞吐量。
示例(Java):
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class DatabaseConnection {
public void queryDatabase() {
System.out.println(Thread.currentThread().getName() + " querying database...");
try { Thread.sleep(500); } catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() + " finished querying.");
}
}
public class ResourceContentionExample {
public static void main(String[] args) {
DatabaseConnection db = new DatabaseConnection();
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
executor.execute(db::queryDatabase);
}
executor.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
问题:
- 线程池大小为 2,但有 5 个任务同时查询数据库。
- 可能出现连接池耗尽,影响系统吞吐量。
这些问题会导致程序行为异常、运行效率降低,甚至系统崩溃,因此在多线程编程中需要特别关注。
# 7. 如何解决线程的并发安全问题?
在 Java 开发中,为了确保线程安全,常见的方法包括以下几类:
# 7.1 使用 synchronized 关键字(互斥同步)
synchronized
关键字用于确保在同一时刻只有一个线程可以访问同步代码块或同步方法,从而避免数据竞争。
- 锁的机制:
- 一把锁只能同时被一个线程持有,未获得锁的线程只能等待。
- 每个实例对象都有自己的锁(即
this
),不同实例之间的锁互不影响。 - 当锁对象是
*.class
或synchronized
修饰的是静态方法时,所有对象共享同一把锁。 - 无论同步方法正常执行完毕还是抛出异常,都会释放锁。
synchronized
的使用方式:- 同步实例方法:锁住当前实例对象
this
,进入同步代码前要获得当前对象实例的锁。 - 同步静态方法:锁住类对象
Class
,进入同步代码前要获得当前类对象的锁。 - 同步代码块:可指定具体的锁对象,对给定对象加锁,进入同步代码前要获得给定对象的锁。
- 同步实例方法:锁住当前实例对象
# 1. 同步实例方法(锁住当前实例 this
)
class SynchronizedExample {
public synchronized void instanceMethod() {
System.out.println(Thread.currentThread().getName() + " 获取到了实例锁 this");
try {
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 释放实例锁");
}
}
public class Main {
public static void main(String[] args) {
SynchronizedExample obj = new SynchronizedExample();
// 创建两个线程访问同一个对象的同步方法
new Thread(obj::instanceMethod, "线程1").start();
new Thread(obj::instanceMethod, "线程2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
说明:
synchronized
修饰实例方法,相当于对this
加锁。- 同一对象的多个线程访问此方法时会阻塞,但不同实例的对象之间互不影响。
# 2. 同步静态方法(锁住类对象 Class
)
class StaticSyncExample {
public static synchronized void staticMethod() {
System.out.println(Thread.currentThread().getName() + " 获取到了类锁 Class");
try {
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 释放类锁");
}
}
public class Main {
public static void main(String[] args) {
// 两个线程调用静态方法
new Thread(StaticSyncExample::staticMethod, "线程1").start();
new Thread(StaticSyncExample::staticMethod, "线程2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
说明:
synchronized
修饰静态方法,相当于对StaticSyncExample.class
加锁。- 所有对象共享同一把锁,同一时刻只能有一个线程执行该方法。
# 3. 同步代码块(可指定具体的锁对象)
class SyncBlockExample {
private final Object lock = new Object();
public void syncBlockMethod() {
System.out.println(Thread.currentThread().getName() + " 进入方法,但未加锁");
synchronized (lock) { // 这里使用自定义的锁对象
System.out.println(Thread.currentThread().getName() + " 获取了自定义锁 lock");
try {
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 释放自定义锁 lock");
}
System.out.println(Thread.currentThread().getName() + " 方法执行结束");
}
}
public class Main {
public static void main(String[] args) {
SyncBlockExample obj = new SyncBlockExample();
// 两个线程访问同步代码块
new Thread(obj::syncBlockMethod, "线程1").start();
new Thread(obj::syncBlockMethod, "线程2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
说明:
synchronized (lock)
1使代码块加锁,多个线程访问
syncBlockMethod()
1时:
- 在进入
synchronized (lock)
代码块之前,多个线程可以同时访问方法的非同步部分。 - 只有一个线程可以进入
synchronized (lock)
代码块,其余线程必须等待。 lock
也可以是this
,但使用独立的lock
对象可以更灵活控制同步范围。
- 在进入
# 7.2 使用 volatile
关键字
volatile
关键字用于保证多线程访问共享变量时的可见性,但不能保证操作的原子性。
# 1. volatile
关键字的作用
- 保证可见性:当一个线程修改
volatile
变量后,其他线程能够立即看到最新值。 - 防止指令重排序:
volatile
变量的读写具有一定的顺序性,避免 CPU 进行指令重排优化导致线程安全问题。 - 不保证原子性:如果
volatile
变量的修改涉及复合操作(如count++
),则可能出现竞态条件(race condition)。
# 2. volatile
保证可见性
class VolatileVisibility {
private volatile boolean running = true; // 共享变量,保证可见性
public void runTask() {
System.out.println("任务开始...");
while (running) {
// 如果没有 volatile 关键字,可能会一直循环,不会看到其他线程修改的值
}
System.out.println("任务结束");
}
public void stop() {
running = false; // 另一个线程修改变量
System.out.println("任务已请求停止");
}
public static void main(String[] args) throws InterruptedException {
VolatileVisibility task = new VolatileVisibility();
Thread thread1 = new Thread(task::runTask);
thread1.start();
Thread.sleep(1000); // 让线程1 先运行
task.stop(); // 修改 `running` 变量,thread1 应该能看到
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
⚠️ 没有 volatile
时的问题:
thread1
可能无法看到running = false
,导致无法退出循环(JVM 可能缓存running
变量)。volatile
关键字可以强制线程每次都从主内存中读取最新值,避免缓存导致的不可见问题。
# 3. volatile
不能保证原子性
虽然 volatile
可以保证可见性,但不能保证原子性,例如以下代码:
class VolatileAtomicity {
private volatile int count = 0;
public void increase() {
count++; // 不是原子操作,可能出现竞态条件
}
}
2
3
4
5
6
7
为什么 count++
不是原子操作?
count++实际上包含了 3 个步骤:
- 读取
count
的值 - 计算
count + 1
- 写回
count
多个线程执行 count++
时,可能会出现丢失更新的问题,即两个线程都读取了旧值,然后都加 1
,最终 count
只增加 1
,而不是 2
。
# 4. 指令重排序问题
as-if-serial 语义规定:单线程程序的执行结果不能因重排序而改变,即编译器和 CPU 可以优化执行顺序,但最终效果必须与按顺序执行相同。
单线程下的重排序
在单线程环境下,编译器可以调整无依赖关系的语句顺序,以优化性能。例如:
a = 1; // 1
b = 2; // 2
c = a + 1; // 3
2
3
可能会被优化为:
b = 2; // 2
a = 1; // 1
c = a + 1; // 3
2
3
因为语句 2 和 1、3 无依赖关系,重排序不会影响最终结果。
但 c = a + 1
不能提前执行,因为 c
依赖 a
。
多线程下的指令重排序问题
多线程环境中,CPU 和编译器无法感知线程间的数据依赖关系,可能导致非预期的执行顺序。
示例:
private static int value;
private static boolean flag;
public static void init() {
value = 8; // 语句 1
flag = true; // 语句 2
}
public static void getValue() {
if (flag) {
System.out.println(value);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
如果 init()
方法发生指令重排序:
flag = true; // 语句 2
value = 8; // 语句 1
2
在多线程环境下,线程 A 调用 init()
,线程 B 调用 getValue()
,可能导致 flag
为 true
但 value
仍是默认值 0
,造成线程安全问题。
如何防止指令重排序?
✅ 使用 volatile
:
volatile
关键字禁止指令重排序,保证写入 volatile
变量前的代码不会被重排到 volatile
变量之后。
private static volatile boolean flag;
private static volatile int value;
2
这样,init()
的 value=8
一定会先执行,避免 getValue()
读取到未初始化的 value
。
# 7.3 使用 ThreadLocal(线程隔离,每个线程独立变量)
ThreadLocal
是 Java 提供的一个用于实现线程本地存储(Thread Local Storage, TLS)的工具类。它允许在同一个线程内存储和访问独立的变量,而不会被其他线程干扰。
# 1. ThreadLocal 的原理
ThreadLocal
通过 Thread 类中的 ThreadLocalMap
来存储每个线程的独立变量:
Thread
类内部有一个ThreadLocalMap
类型的变量,每个线程访问ThreadLocal
时,都会在自己的ThreadLocalMap
里存储变量,而不是共享全局变量。ThreadLocalMap
采用 弱引用(WeakReference) 作为 key,这样可以避免一定程度上的内存泄露,但仍然需要注意使用方式。
# 2. ThreadLocal 使用示例
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocalValue = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
Runnable task = () -> {
int currentValue = threadLocalValue.get();
threadLocalValue.set(currentValue + 1);
System.out.println(Thread.currentThread().getName() + " -> " + threadLocalValue.get());
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
执行结果(可能不同):
Thread-0 -> 1
Thread-1 -> 1
2
这里每个线程都有自己独立的 ThreadLocal
变量,互不干扰,如果需要存储多个变量可以创建多个ThreadLocal
。
# 3. ThreadLocal 可能导致的内存泄露问题
尽管 ThreadLocalMap
采用弱引用存储 ThreadLocal
变量的 key,但 ThreadLocalMap
的 value(实际存储的对象)是强引用,如果 ThreadLocal
没有手动清理,可能会导致内存泄露(Memory Leak),尤其是在**线程池(ThreadPool)**环境下。
- 线程池的线程不会立即销毁:在线程池中,线程可能会被复用,而不会立刻销毁。如果
ThreadLocal
变量未被手动清除,下次复用这个线程时,ThreadLocalMap
可能仍然持有之前的 value,导致对象无法被回收。 - ThreadLocalMap 的 value 没有被自动清除:即使
ThreadLocal
被 GC 了,ThreadLocalMap
的 value 仍然是强引用,无法释放,造成内存泄露。
示例:导致内存泄露的错误使用方式
public class MemoryLeakExample {
private static ThreadLocal<byte[]> localVariable = new ThreadLocal<>();
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
localVariable.set(new byte[1024 * 1024 * 10]); // 10MB 数据
System.out.println(Thread.currentThread().getName() + " set data");
// localVariable.remove(); // 没有手动 remove,可能导致内存泄露
});
}
executor.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
分析:
- 线程池中的线程不会立即销毁,
ThreadLocalMap
可能仍然持有10MB
的对象,无法被 GC 释放,导致 内存泄露。 - 解决方案是 手动调用
remove()
。
# 4. 如何避免 ThreadLocal 内存泄露?
使用后手动清除:
threadLocal.remove();
1在
finally
代码块中执行remove()
,确保不管是否发生异常,都会清理ThreadLocal
数据。在短生命周期线程中使用 ThreadLocal: 避免在线程池或长生命周期的线程中使用
ThreadLocal
,如果必须使用,确保手动清理变量。避免存放大对象: 避免在
ThreadLocal
变量中存放大对象,如果存放的是List
、Map
等集合对象,要及时清空内容。使用
InheritableThreadLocal
(仅适用于父子线程共享变量):InheritableThreadLocal
允许子线程继承父线程的ThreadLocal
变量,但仍然需要注意清理,否则可能导致子线程也持有无用的引用。
# 8. JUC 并发工具概述
java.util.concurrent
(简称 JUC)是 Java 并发工具包,它提供了一套高性能的并发工具,包括 线程池、原子变量、锁机制、并发容器、并发工具类等,可以更高效地处理并发编程问题。
在 Java 5 之前,并发编程主要依赖:
synchronized
关键字(方法锁、代码块锁)wait()
、notify()
进行线程通信volatile
关键字保证变量可见性Thread
类直接操作线程
问题:
synchronized
是重量级锁,容易导致性能下降。wait()
/notify()
使用不直观,容易出错。Thread
的创建和管理开销大,不适合高并发场景。
JUC 解决方案: JUC 提供了高效的并发工具,如:
- 无锁原子变量(CAS 机制)
- 高性能锁(如
ReentrantLock
) - 线程池(高效管理线程)
- 并发安全的容器(
ConcurrentHashMap
、CopyOnWriteArrayList
) - 并发工具类(
CountDownLatch
、Semaphore
)
为什么 JUC 比 synchronized 强?
特性 | synchronized | JUC(Lock、CAS) |
---|---|---|
实现方式 | JVM 内置锁 | 基于 CAS(无锁)或 AQS |
是否阻塞 | 线程竞争时阻塞 | CAS 失败时自旋,不阻塞 |
粒度控制 | 只能锁整个代码块或方法 | 可以尝试锁、公平锁、读写分离 |
性能 | 竞争激烈时,线程切换成本高 | CAS 轻量级、锁优化提高吞吐量 |
功能 | 仅支持互斥锁 | 读写锁、可重入锁、超时获取等 |
# 9. 线程安全的原子操作
# 9.1 CAS(Compare-And-Swap)
# 1. 什么是 CAS?
CAS(Compare-And-Swap,比较并交换)是一种 无锁(Lock-Free)同步机制,用于 实现原子操作,常见于 AtomicInteger
、AtomicLong
、AtomicReference
等类。
工作原理:
- CAS 操作涉及 三个值:
- 期望值(expected):当前变量的原值
- 新值(newValue):希望更新的值
- 内存地址(memoryAddress):变量在内存中的位置
- 如果当前变量值 == 期望值,则更新为新值,否则不更新。
示例:
AtomicInteger atomicInteger = new AtomicInteger(10);
boolean success = atomicInteger.compareAndSet(10, 20); // 期望值是10,修改为20
System.out.println(success); // true,修改成功
2
3
如果 atomicInteger
当前值是 10
,那么 compareAndSet(10, 20)
会成功更新为 20
,否则更新失败。
# 2. CAS 的底层实现
CAS 依赖于 Unsafe
类和 CPU 指令,保证操作的原子性。
1️⃣ CAS 在 Unsafe
类中的实现
AtomicInteger
的 compareAndSet
方法底层使用 Unsafe
类:
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
2
3
其中:
unsafe
:Unsafe
类的实例,提供底层操作。compareAndSwapInt()
:用的本地方法其他语言编写的,使用 CAS 指令 来修改变量的值。valueOffset
:变量在内存中的偏移量,确保 CAS 作用在正确的内存位置。
2️⃣ CAS 底层的 CPU 指令
在 x86 处理器 上,CAS 依赖于 CPU 的原子指令,如:
cmpxchg
(Compare and Exchange)lock cmpxchg
(带锁的比较交换,防止多个 CPU 并发修改)
💡 CPU 原子性保证:
CAS 操作不需要加锁,而是直接依赖 CPU 指令 确保数据修改的原子性,因此性能比 synchronized
更高效。
# 3. CAS 的优缺点
✅ 优势
- 无锁并发,效率高
- CAS 不会引起线程阻塞,相比
synchronized
具有更好的性能。 - 适用于 高并发场景,如
AtomicInteger
、ConcurrentHashMap
。
- CAS 不会引起线程阻塞,相比
- 减少线程上下文切换
synchronized
可能导致线程阻塞,线程切换成本高。- CAS 采用 自旋操作(失败时重试),避免不必要的上下文切换。
❌ 缺点
1️⃣ ABA 问题
问题:如果某个变量从
A → B → A
,CAS 认为变量没变,但实际上中途发生了修改,可能导致错误。示例:
AtomicReference<Integer> atomicRef = new AtomicReference<>(100); atomicRef.compareAndSet(100, 200); // 线程1修改为200 atomicRef.compareAndSet(200, 100); // 线程2又改回100 boolean success = atomicRef.compareAndSet(100, 300); // 线程3认为100没变,但其实变过
1
2
3
4解决方案:
AtomicStampedReference
(带版本号)
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 1); int stamp = ref.getStamp(); // 获取版本号 ref.compareAndSet(100, 200, stamp, stamp + 1); // 修改时增加版本号
1
2
3
2️⃣ 自旋消耗 CPU
- 问题:CAS 操作失败时,会不断重试(自旋),如果 高并发竞争激烈,线程可能长时间 占用 CPU。
- 解决方案:
- 结合
synchronized
(自旋失败一定次数后,使用synchronized
) - JVM 优化:例如
LongAdder
通过 分段累加,减少 CAS 冲突。
- 结合
3️⃣ 只能保证单个变量的原子性
- 问题:CAS 仅能保证 一个变量 的原子性,无法保证 多个变量同时修改的原子性。
- 解决方案:
- 使用
AtomicReference
封装多个变量 - 使用
ReentrantLock
或synchronized
- 使用
# 4. 乐观锁和悲观锁
1. 悲观锁(Pessimistic Lock)
悲观锁的核心思想是“假设最坏情况”,即认为每次访问共享资源时,都会有其他线程进行修改,因此必须在访问前先加锁,确保数据的一致性。
特点:
- 适用于高并发、冲突频繁的场景,例如数据库中的
SELECT ... FOR UPDATE
语句会触发行锁(Row Lock)。 - 线程在持有锁的情况下操作资源,其他线程必须等待锁释放后才能访问该资源,可能会影响系统吞吐量。
- 一般依赖数据库本身的锁机制(如行锁、表锁)或 Java 中的
synchronized
关键字和ReentrantLock
等机制来实现。
示例:
synchronized (lock) {
// 线程安全的操作
}
2
3
或使用 ReentrantLock
:
Lock lock = new ReentrantLock();
lock.lock();
try {
// 线程安全的操作
} finally {
lock.unlock();
}
2
3
4
5
6
7
2. 乐观锁(Optimistic Lock)
乐观锁的核心思想是“假设最好情况”,即认为大多数情况下,多个线程不会同时修改共享资源,因此不会上锁,而是采用一种“先操作、再验证”的方式来保证数据的一致性。
特点:
- 适用于读多写少、冲突概率较低的场景,例如商品库存更新、用户账户余额更新等。
- 采用版本号(Version)机制或**CAS(Compare-And-Swap)**机制实现。
- 可能会出现数据更新失败的情况,需要业务层进行重试逻辑。
CAS 示例(Java AtomicInteger):
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger count = new AtomicInteger(0);
count.compareAndSet(0, 1); // 只有当 count 仍然是 0 时,才会更新为 1
2
3
4
在数据库中,可以通过版本号机制来实现乐观锁:
UPDATE account
SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 当前版本号;
2
3
如果 version
发生了变化,表示数据已被其他线程修改,更新失败,需要重试。
3. 总结
悲观锁 | 乐观锁 | |
---|---|---|
加锁策略 | 访问前加锁 | 访问后检查冲突 |
适用场景 | 高并发、冲突频繁 | 读多写少、冲突概率低 |
实现方式 | synchronized 、数据库行锁等 | 版本号、CAS 操作 |
性能 | 可能影响吞吐量 | 可能导致重试消耗 CPU |
选择建议:
- 如果系统中写操作较多,且冲突概率高,建议使用悲观锁,避免数据不一致。
- 如果系统以读操作为主,且冲突概率低,建议使用乐观锁,减少锁竞争,提高性能。
# 9.2 线程安全类(Atomic原子操作)
Java 并发包(JUC)的 java.util.concurrent.atomic
提供了一组**无锁(Lock-Free)**的原子类,利用 CAS(Compare-And-Swap) 机制,确保线程安全,比 synchronized
性能更高。
# 1. 基本原子类
AtomicInteger
、AtomicLong
、AtomicBoolean
这些类提供了 整数、长整型、布尔值 的原子操作,避免了 synchronized
的开销。
常见方法:
get()
:获取当前值set(x)
:设置值incrementAndGet()
/decrementAndGet()
:自增/自减compareAndSet(expect, update)
:CAS 方式更新值
示例:
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet(); // +1,变成 1
atomicInt.compareAndSet(1, 10); // 期望 1,修改为 10
System.out.println(atomicInt.get()); // 10
2
3
4
# 2. 引用类型原子类
AtomicReference<T>
适用于 对象 类型的原子操作,常用于无锁共享数据。
示例:
AtomicReference<String> atomicRef = new AtomicReference<>("Hello");
atomicRef.compareAndSet("Hello", "World"); // CAS 修改
System.out.println(atomicRef.get()); // World
2
3
# 3. 解决 ABA 问题
问题: CAS 存在 ABA 问题(值从 A → B → A,CAS 认为没变)。
解决方案: AtomicStampedReference
通过版本号(stamp) 解决 ABA 问题。
示例:
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 1);
int stamp = ref.getStamp(); // 获取版本号
ref.compareAndSet(100, 200, stamp, stamp + 1); // 修改时增加版本号
2
3
✅ 保证每次 CAS 操作都检查版本号,避免 ABA 问题。
# 4. 高并发优化
LongAdder
(比 AtomicLong
更适合高并发)
问题: AtomicLong
竞争激烈时,CAS 失败会导致 自旋重试,降低性能。
解决方案: LongAdder
分段存储多个值,减少竞争,适用于 高并发计数(如请求计数、流量统计)。
示例:
LongAdder counter = new LongAdder();
counter.increment(); // 线程安全自增
System.out.println(counter.sum()); // 获取最终值
2
3
✅ 比 AtomicLong
更快,适合高并发计数场景。
# 10. 并发容器
# 10.1 CopyOnWriteArrayList(适用于读多写少)
# 1. ArrayList 线程不安全的问题
ArrayList 是非线程安全的集合类,主要体现在以下几个方面:
- 并发修改问题:多线程同时调用 add() 方法时,底层数组可能只会添加其中一个元素
// ArrayList 的 add 方法
public boolean add(E e) {
ensureCapacityInternal(size + 1); // 确保容量足够
elementData[size++] = e; // 非原子操作,涉及读写两步,size++ 可能导致元素覆盖
return true;
}
2
3
4
5
6
- 扩容问题:扩容过程中的数组复制操作不是原子的,可能导致数据丢失或不一致
- 迭代问题:在迭代过程中如果另一个线程修改了集合,会抛出 ConcurrentModificationException
# 2. CopyOnWriteArrayList 的写时复制原理
CopyOnWriteArrayList 采用写时复制(Copy-On-Write)策略解决并发问题:
// 核心属性
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
2
3
4
5
6
写操作源码分析:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 获取锁,保证写操作的互斥性
try {
Object[] elements = getArray(); // 获取当前数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 创建新数组,长度+1
newElements[len] = e; // 在新数组中添加元素
setArray(newElements); // 原子性地替换老数组
return true;
} finally {
lock.unlock(); // 释放锁
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
读操作源码分析:
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index]; // 无锁读取,直接访问数组元素
}
2
3
4
5
6
7
优缺点:
- 优点:读操作完全无锁,性能高;写操作互斥但不影响读;不会抛出 ConcurrentModificationException
- 缺点:每次写操作都会复制整个数组,内存占用高;写操作开销大;只能保证最终一致性,不能保证实时一致性
# 10.2 ConcurrentHashMap
# 1. HashTable遗弃的原因
Hashtable
是早期 Java 提供的线程安全 Map
实现,但由于同步机制过于粗暴,在高并发环境下效率极低,逐渐被淘汰。
Hashtable
所有方法都加了 synchronized
,导致多个线程访问时,必须串行执行,并发效率极低,这里就提到了之前说的**synchronized
锁住当前实例对象 this
**。
示例:线程 1 在 put()
时,线程 2 不能 get()
,必须等待
public synchronized V put(K key, V value) {
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % table.length;
table[index] = value; // 修改操作
return value;
}
2
3
4
5
6
- 锁住整个
Hashtable
,即使只是读数据,也可能被阻塞。 - 线程越多,竞争越激烈,性能越低。
# 2. ConcurrentHashMap 的分段锁
JDK 1.7 使用分段锁(Segment)在高并发环境下,Hashtable
的性能较差,因为所有线程都必须竞争同一把锁。ConcurrentHashMap
通过锁分段技术提高并发性能,它将数据分成多个片段(Segment
),每个片段有自己的锁。这样,多个线程可以同时访问不同的片段,减少锁竞争,提高并发效率。
此外,ConcurrentHashMap
读取数据时不需要加锁,写入数据时也仅锁定对应的片段,而不是整个 ConcurrentHashMap
,从而进一步优化性能。
JDK 1.7 中的 ConcurrentHashMap 采用 Segment 分段锁设计,其大致工作流程如下:
- 分段锁设计:
- 整个 Map 被分为若干个 Segment(默认16个),每个 Segment 相当于一个小的 HashMap
- 每个 Segment 都有自己的锁(继承自 ReentrantLock)
- 不同 Segment 可以并发修改,最大并发度等于 Segment 数量
- 数据结构:
- 二级哈希表:第一级是 Segment 数组,第二级是 HashEntry 数组
- 每个 Segment 维护一个 HashEntry<K,V>[] table 数组(两次hash的方式找到)
- 元素以链表形式存储在 HashEntry 数组中
- 写操作流程:
- 确定 key 所在的 Segment
- 对整个 Segment 加锁(而非 JDK 1.8 的桶级别锁定)
- 在 Segment 内部完成元素查找、添加或更新操作
- 释放 Segment 锁
- 读操作特点:
- 读操作不需要加锁,利用 volatile 变量保证可见性
- HashEntry 中的 value 和 next 指针使用 volatile 修饰
- 扩容机制:
- 每个 Segment 独立扩容,只需锁住当前 Segment
- 扩容门槛为 Segment 容量的 0.75(负载因子)
- size 操作:
- 先尝试无锁统计两次,如果两次结果相同则返回
- 如果不同,则对所有 Segment 加锁后再统计
ConcurrentHashMap 的内部结构
- Segment(分段)
- 继承自
ReentrantLock
,用于管理数据片段的锁。 - 维护
HashEntry
数组,每个HashEntry
是一个链表的头结点。 - 只对某个片段加锁,不影响其他片段,提高并发性能。
- 继承自
- HashEntry(键值存储单元)
- 采用不可变结构(除
value
变量外,其他变量如key
、next
都是final
)。 - 读操作不会修改结构,写操作会创建新的
HashEntry
并更新引用,避免锁冲突。
- 采用不可变结构(除
读(get)操作
get()
方法不会加锁,直接读取数据。- 使用
volatile
变量保证可见性,确保线程可以读取最新数据。 - 由于
HashEntry
除value
外的字段都是final
,所以多线程读取不会影响数据完整性。 - 仅当读取到
null
值时,才会加锁进行二次检查,以保证获取的数据是完整的。
写(put)操作
- 加锁:写入前必须获取
Segment
的锁,以保证线程安全。 - 扩容:如果元素数量超过阈值,则会触发
rehash()
进行扩容,但仅对某个片段操作,而不会影响整个ConcurrentHashMap
。 - 头插法:新元素会被插入链表头部,而不是修改原有链表结构,以减少锁冲突。
- 更新值:如果键已存在,则更新
value
,并使用volatile
确保可见性。
# 3. ConcurrentHashMap 的分段锁 & CAS 优化
JDK1.8 改用 CAS + synchronized 对每个桶位独立加锁 ConcurrentHashMap 采用了全新设计:
流程:
- 懒加载初始化:第一次使用时才初始化表(通过 CAS 操作确保只初始化一次)
- 写入操作优化:
- 桶位为 null:直接使用 CAS 无锁插入,避免不必要的锁竞争
- 桶位不为 null:对当前桶的头节点加锁(分段锁思想,粒度更细)
- 数据结构处理:
- 普通链表:遍历查找,更新已有节点或在尾部插入新节点
- 红黑树:按照红黑树规则查找、更新或插入节点,必要时进行树的平衡调整
- 优化操作:
- 链表过长(默认阈值为 8)时转为红黑树,提高查询效率
- 元素数量超过阈值时触发扩容,多线程可协同扩容(transfer 方法)
这种设计比起 JDK 1.7 的 Segment 分段锁有了很大的改进,锁粒度更细,并发度更高,同时利用了 CAS 操作减少了锁竞争,显著提升了性能。
- 数据结构:与 HashMap 相同,采用数组 + 链表 + 红黑树结构
// 核心字段
/**
* bin 数组(桶数组)。在首次插入时懒初始化。
* 大小始终为 2 的幂。迭代器可直接访问。
*/
transient volatile Node<K,V>[] table;
/**
* 下一个要使用的表;仅在扩容时非空。
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 基础计数器值,主要在无竞争时使用,
* 但在表初始化竞争时也可作为备用方案。
* 通过 CAS 进行更新。
*/
private transient volatile long baseCount;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
put 操作源码分析:
/**
* 插入键值对的方法。
*
* @param key 键,不能为空
* @param value 值,不能为空
* @param onlyIfAbsent 如果为 true,则仅在键不存在时插入
* @return 如果键已存在,返回旧值;否则返回 null
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算哈希值并扩散
int binCount = 0; // 记录 bin( 内的节点数量
for (Node<K,V>[] tab = table;;) { //自旋操作
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 如果表未初始化,则进行初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果 bin(数组桶的节点) 为空,直接尝试 CAS 插入新节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // 无需加锁,直接插入
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 处理扩容中的数据迁移
else {
V oldVal = null;
synchronized (f) { // 对 bin 头节点加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 普通链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 如果 key 相同,更新值
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 追加新节点到链表末尾
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 处理红黑树结构
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 如果 bin 节点数达到阈值,转为红黑树
if (oldVal != null)
return oldVal; // 返回旧值
break;
}
}
}
addCount(1L, binCount); // 计数 +1,并检查是否需要扩容
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
size 操作优化:
public int size() {
long n = sumCount(); // 使用 baseCount 和 counterCells 数组计算总数
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
2
3
4
5
6
特点:
- 锁粒度更细(桶级别)
- 使用 CAS 操作减少锁竞争
- 支持高并发读写
- 扩容过程支持多线程协作
# 5.3 BlockingQueue
阻塞队列是并发编程中常用的数据结构,主要用于生产者-消费者模式。
# ArrayBlockingQueue
基于数组实现的有界阻塞队列:
// 核心属性
/** 存储元素的数组 */
final Object[] items;
/** 队列中的元素数量 */
int count;
/** 获取元素的索引 */
int takeIndex;
/** 添加元素的索引 */
int putIndex;
/** 控制并发访问的锁 */
final ReentrantLock lock;
/** 等待获取元素的条件 */
private final Condition notEmpty;
/** 等待添加元素的条件 */
private final Condition notFull;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
put 操作源码分析:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取可中断锁
try {
while (count == items.length) // 队列已满,等待
notFull.await(); // 等待队列不满条件
enqueue(e); // 入队
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; // 放入元素
if (++putIndex == items.length) // 环形数组,索引到达末尾则重置为0
putIndex = 0;
count++; // 增加计数
notEmpty.signal(); // 通知等待的消费者
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
take 操作源码分析:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空,等待
notEmpty.await(); // 等待队列不空条件
return dequeue(); // 出队
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 获取元素
items[takeIndex] = null; // 置空,帮助 GC
if (++takeIndex == items.length) // 环形数组
takeIndex = 0;
count--; // 减少计数
notFull.signal(); // 通知等待的生产者
return x;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# LinkedBlockingQueue
基于链表实现的可选有界阻塞队列:
// 核心属性
/** 链表容量,默认为 Integer.MAX_VALUE */
private final int capacity;
/** 元素数量 */
private final AtomicInteger count = new AtomicInteger();
/** 链表头节点 */
transient Node<E> head;
/** 链表尾节点 */
private transient Node<E> last;
/** 取元素的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 等待获取元素的条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 放元素的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 等待放入元素的条件 */
private final Condition notFull = putLock.newCondition();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
特点:
- 使用两把锁,takeLock 和 putLock,实现头尾操作并行
- count 使用 AtomicInteger,保证计数准确性
# DelayQueue(定时任务)
一个支持延迟获取元素的无界阻塞队列:
// 核心属性
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
private Thread leader = null;
2
3
4
5
元素必须实现 Delayed 接口:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit); // 返回剩余延迟时间
}
2
3
offer 操作源码分析:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 添加到优先队列
if (q.peek() == e) { // 如果是队首元素
leader = null; // 重置 leader 线程
available.signal(); // 唤醒等待的消费者
}
return true;
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
take 操作源码分析:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); // 查看队首元素
if (first == null) // 如果队列为空
available.await(); // 等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 到期了,可以取出
return q.poll();
// 还没到期,需要等待
first = null; // 避免保持引用
if (leader != null) // 已有线程等待
available.await(); // 等待通知
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; // 设置当前线程为 leader
try {
available.awaitNanos(delay); // 等待指定时间
} finally {
if (leader == thisThread)
leader = null; // 重置 leader
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒下一个等待线程
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
应用场景:
- 定时任务调度
- 缓存超时清理
- 请求超时处理
这三种阻塞队列各有特点:
- ArrayBlockingQueue:有界,基于数组,FIFO
- LinkedBlockingQueue:可选有界,基于链表,FIFO,头尾操作解耦
- DelayQueue:无界,基于优先队列,按延迟时间排序
通过这些并发容器,JDK 为我们提供了丰富的线程安全集合工具,适用于不同的并发场景。
# 11. 显式锁 Lock(如 ReentrantLock)
# 1. 类的继承关系
ReentrantLock
实现了 Lock
接口,Lock
接口定义了 lock
和 unlock
相关操作,并提供了 newCondition
方法用于生成 Condition
条件对象。
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock
主要依赖 AQS (AbstractQueuedSynchronizer) 实现同步机制。Lock
接口提供基本锁操作,如lock()
、unlock()
,以及newCondition()
用于创建条件变量。
# 2. 类的内部类
ReentrantLock
内部有 三个相关的内部类:
Sync
(抽象类):继承自AbstractQueuedSynchronizer
(AQS)。NonfairSync
(非公平锁):继承Sync
。FairSync
(公平锁):继承Sync
。
继承关系:
AbstractQueuedSynchronizer
↑
Sync
↑
┌───────┐
│ │
FairSync NonfairSync
2
3
4
5
6
7
# 3. Sync 类
Sync
继承 AbstractQueuedSynchronizer
,它定义了 ReentrantLock
的核心锁机制。
private final Sync sync; // ReentrantLock 的核心同步器
Sync 类源码
// AQS中获取锁的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && // ① 尝试获取锁,失败则进入等待队列(必须子类实现)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // ② 进入 AQS 阻塞队列并等待
selfInterrupt(); // ③ 处理中断
}
// AQS中释放锁的方法
public final boolean release(int arg) {
// 尝试释放锁(必须子类重写,减少 state 计数或完全释放)
if (tryRelease(arg)) {
Node h = head; // 获取同步队列的头结点
// 如果头结点存在且其 waitStatus 不为 0(表示有线程在等待)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒同步队列中的下一个等待线程
return true; // 释放成功
}
return false; // 释放失败
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 抽象方法,由子类 FairSync / NonfairSync 具体实现锁的获取逻辑
abstract void lock();
/**
* 非公平获取锁的核心逻辑
* @param acquires 需要获取的锁计数
* @return true:成功获取锁,false:获取失败
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前锁的状态(0 代表未被占用)
// ① 如果锁当前是空闲状态(state == 0),直接 CAS 竞争锁(可能发生抢占)
if (c == 0) {
// CAS 方式设置 state,如果成功就认为获取到了锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); // 记录当前线程为持有者
return true;
}
}
// ② 如果当前线程已经持有锁(可重入)
else if (current == getExclusiveOwnerThread()) {
setState(c + acquires); // 递增 state,记录重入次数
return true;
}
return false; // 其他线程获取失败
}
/**
* 释放锁的逻辑
* @param releases 释放的锁计数
* @return true:锁完全释放,false:仍然持有锁(重入锁)
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 计算释放后的 state 值
// 如果当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false; // 标记是否完全释放锁
if (c == 0) { // 释放到 0,表示锁完全释放
free = true;
setExclusiveOwnerThread(null); // 清空持有线程
}
setState(c); // 更新锁的状态
return free;
}
// 判断当前线程是否独占资源
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 生成Condition对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获取锁拥有者
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取锁的重入次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判断锁是否被占用
final boolean isLocked() {
return getState() != 0;
}
// 反序列化时重置状态
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
Sync 关键方法:
nonfairTryAcquire(int acquires)
:非公平尝试获取锁。tryRelease(int releases)
:释放锁。isHeldExclusively()
:判断当前线程是否持有锁。newCondition()
:生成ConditionObject
,用于条件等待。getOwner()
:获取当前持有锁的线程。
# 4. NonfairSync 类(非公平锁)
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
2
3
4
5
6
7
8
9
10
11
12
核心解析:
- 第一步:CAS 直接抢占锁
compareAndSetState(0, 1)
:如果锁未被占用,直接获取锁。
- 第二步:失败后进入 AQS 队列
- 如果 CAS 失败,则调用
acquire(1)
,进入 AQS 阻塞队列,等待锁释放。
- 如果 CAS 失败,则调用
- 非公平锁允许插队
- 线程可以直接竞争锁,即使队列中已有等待的线程。
适用场景:
- 适用于高并发场景,线程竞争激烈时,非公平锁能够减少上下文切换,提高吞吐量。
# 5. FairSync 类(公平锁)
**特点:**按照 FIFO 规则获取锁,不会发生“插队”。
// 公平锁的实现,保证等待时间最长的线程最优先获取锁
static final class FairSync extends Sync {
// 公平锁的获取方法
final void lock() {
acquire(1); // 调用 AQS 的 acquire() 方法,尝试获取锁
}
/**
* 尝试获取锁(公平策略)
* @param acquires 获取锁的数量(一般是 1)
* @return true 表示成功获取锁,false 表示获取失败
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取锁的当前状态(state=0 表示未被占用)
if (c == 0) { // 锁未被占用
// 关键点:检查当前线程前面是否有等待的线程(FIFO 先来先得)
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS 修改 state,成功获取锁
setExclusiveOwnerThread(current); // 设置当前线程为持有锁的线程
return true;
}
}
// 可重入锁逻辑:如果当前线程已经持有锁,则增加 state(重入次数)
else if (current == getExclusiveOwnerThread()) {
setState(c + acquires); // 增加重入计数
return true;
}
return false; // 其他线程无法获取锁,返回 false,进入 AQS 阻塞队列等待
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
核心解析
- 第一步:先检查是否有等待线程
hasQueuedPredecessors()
:如果队列中已有等待的线程,则当前线程不会抢占锁。
- 第二步:CAS 竞争锁
- 只有队列前面没有线程,且
compareAndSetState(0, acquires)
成功,才允许获取锁。
- 只有队列前面没有线程,且
适用场景:
- 适用于任务处理较为均衡的场景,保证线程获取锁的先后顺序。
- 减少线程“饥饿”问题,适合任务执行时间较长的应用。
# 6. ReentrantLock 类
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
// 默认构造函数(非公平锁)
public ReentrantLock() {
sync = new NonfairSync();
}
// 指定公平性
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- 默认使用非公平锁 (
NonfairSync
)。 - 也可使用 公平锁 (
FairSync
)。
# 7. 示例分析
示例:使用公平锁
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class MyThread extends Thread {
private Lock lock;
public MyThread(String name, Lock lock) {
super(name);
this.lock = lock;
}
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread() + " running");
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class LockDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock(true); // 使用公平锁
MyThread t1 = new MyThread("t1", lock);
MyThread t2 = new MyThread("t2", lock);
MyThread t3 = new MyThread("t3", lock);
t1.start();
t2.start();
t3.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行结果 (某次执行):
Thread[t1,5,main] running
Thread[t2,5,main] running
Thread[t3,5,main] running
2
3
公平策略:线程按顺序执行,避免“插队”现象。
# 12. Synchronized vs. ReentrantLock
在 Java 多线程编程中,锁是解决并发问题的核心机制。Synchronized
和 ReentrantLock
是两种常见的同步工具,它们在锁的获取方式、公平性、灵活性等方面存在明显区别。
# 12.1 锁的获取方式
# Synchronized(隐式锁)
- 当线程进入 同步代码块或方法 时,JVM 自动加锁,退出时自动释放。
- 无需手动管理,由 JVM 处理锁的加解锁,代码简洁,但缺乏灵活性。
示例:同步方法
public synchronized void method() {
// 线程安全代码
}
2
3
示例:同步代码块
public void method() {
synchronized (this) {
// 线程安全代码
}
}
2
3
4
5
# ReentrantLock(显式锁)
- 需要 手动调用
lock()
获取锁,再手动unlock()
释放锁。 - 提供更灵活的控制,例如支持超时获取锁、可中断锁、Condition 变量等。
- 适用于复杂并发场景,但如果
unlock()
释放锁的代码遗漏,会导致死锁问题。
示例:手动加锁/释放锁
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 线程安全代码
} finally {
lock.unlock(); // 确保锁释放
}
2
3
4
5
6
7
# 12.2 公平锁 vs. 非公平锁
# 什么是公平锁?
- 公平锁:按照线程的等待顺序依次获取锁,先等待的线程优先执行。
- 非公平锁:新来的线程可能会直接抢占锁,即使其他线程已经等待了更长时间。
# Synchronized 的公平性
- Synchronized 只能使用非公平锁,无法手动设置公平性。
- 因为锁竞争时,JVM 允许新来的线程插队,可能导致某些线程长期得不到执行(线程饥饿)。
# ReentrantLock 的公平性
默认是非公平锁(吞吐量更高)。
支持公平锁,可以通过构造参数设定:
ReentrantLock lock = new ReentrantLock(true); // 创建公平锁
1公平锁的优点:避免线程饥饿,提高系统稳定性。
公平锁的缺点:性能稍差,吞吐量比非公平锁低。
# 对比
特性 | 非公平锁(NonfairSync) | 公平锁(FairSync) |
---|---|---|
加锁方式 | 允许直接竞争锁 | 必须排队等待 |
是否有序 | 线程可能插队 | 线程严格 FIFO |
性能 | 吞吐量高 | 公平性好 |
适用场景 | 适用于高并发 | 适用于任务均衡 |
✅ ReentrantLock
使用建议
- 默认使用非公平锁(默认构造
ReentrantLock()
使用NonfairSync
)。 - 高并发场景优先非公平锁,减少线程切换,提高性能。
- 有严格先后顺序需求时使用公平锁,避免“线程饥饿”问题。
✅ 核心源码流程梳理
lock()
加锁- 非公平锁:先 CAS 竞争,失败则进入队列。
- 公平锁:先检查队列,有等待线程则排队。
acquire()
失败进入 AQS 队列tryRelease()
释放锁state == 0
,释放锁,唤醒队列中的下一个线程。
# 12.3 ReentrantLock 提供的高级功能
相较于 Synchronized
,ReentrantLock
具备更强的灵活性,提供如下高级功能:
# ✅ 1. 可中断锁
- Synchronized 不支持中断等待,如果线程一直无法获取锁,只能等待。
- ReentrantLock 支持
lockInterruptibly()
,允许线程在等待锁时响应中断。
lock.lockInterruptibly(); // 允许在等待锁时被中断
适用场景:避免线程无限制阻塞,可以在业务需要时终止等待锁的线程。
# ✅ 2. 超时获取锁
- Synchronized 不支持超时机制,如果获取不到锁,只能一直等待。
- ReentrantLock 提供
tryLock(time, unit)
,如果在设定时间内无法获取锁,就返回false
,防止线程长时间等待。
if (lock.tryLock(2, TimeUnit.SECONDS)) { // 2秒内尝试获取锁
try {
// 执行业务逻辑
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败,执行其他操作");
}
2
3
4
5
6
7
8
9
适用场景:适用于高并发环境,避免线程无限等待导致死锁。
# ✅ 3. Condition 变量(等待/通知机制)
- Synchronized 依赖
wait()
和notify()
进行线程通信。 - ReentrantLock 提供 Condition 变量,可以更灵活地控制多个线程的等待/唤醒逻辑。
Condition condition = lock.newCondition();
lock.lock();
try {
condition.await(); // 线程等待
condition.signal(); // 唤醒等待线程
} finally {
lock.unlock();
}
2
3
4
5
6
7
8
9
适用场景:生产者-消费者模型、线程通信机制等。
# 12.4 适用场景
适用情况 | Synchronized | ReentrantLock |
---|---|---|
简单同步(代码块/方法) | ✅ 推荐 | ❌ 过于复杂 |
需要公平锁 | ❌ 不支持 | ✅ 支持 |
支持中断 | ❌ 不支持 | ✅ 支持 lockInterruptibly() |
超时等待锁 | ❌ 不支持 | ✅ 支持 tryLock(time, unit) |
等待/通知机制 | ⚠️ 依赖 wait/notify | ✅ Condition 更灵活 |
# 13. 工作中你如何使用多线程
我通过通过使用线程池管理线程资源,避免手动创建线程导致资源浪费。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
优点:
- 减少资源消耗:线程池复用已有线程,避免频繁创建销毁。
- 提高响应速度:任务提交后可快速分配线程执行,提高系统吞吐量。
- 统一管理线程:支持线程超时回收、任务队列等机制,增强控制能力。
# 14.线程池的创建方式
在 Java 中,我们可以通过 ThreadPoolExecutor
或 Executors
来创建线程池,其中 推荐使用 ThreadPoolExecutor
进行精确控制,以避免 Executors
默认配置可能带来的问题(如 CachedThreadPool
可能导致 OOM)。
# 方式一:使用 ThreadPoolExecutor构造函数创建线程池(推荐)
ThreadPoolExecutor
提供了更灵活的线程池管理方式,可以精细控制线程数、队列大小及拒绝策略,适用于高并发场景。
1. ThreadPoolExecutor
构造方法
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 线程空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(可自定义线程命名)
RejectedExecutionHandler handler // 拒绝策略
)
2
3
4
5
6
7
8
9
2. 代码示例
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
10, TimeUnit.SECONDS, // 线程存活时间
new LinkedBlockingQueue<>(3), // 任务队列,最大可排队 3 个任务
Executors.defaultThreadFactory(), // 线程工厂,默认
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 提交任务
for (int i = 1; i <= 10; i++) {
int taskNumber = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskNumber);
try {
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
threadPool.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
3. 线程池参数解释
参数 | 含义 | 作用 |
---|---|---|
corePoolSize | 核心线程数 | 线程池会维持的最小线程数量,即使它们处于空闲状态 |
maximumPoolSize | 最大线程数 | 线程池允许创建的最大线程数 |
keepAliveTime | 线程存活时间 | 当线程数大于 corePoolSize 时,多余空闲线程的存活时间 |
workQueue | 任务队列 | 当核心线程满后,新任务进入队列等待 |
threadFactory | 线程工厂 | 创建线程的方式(可自定义线程命名) |
handler | 拒绝策略 | 当任务队列满时的处理方式 |
4. 拒绝策略(RejectedExecutionHandler)
当任务超出 最大线程数 + 队列容量 时,线程池会触发拒绝策略:
拒绝策略 | 作用 |
---|---|
AbortPolicy (默认) | 丢弃任务,抛出异常(适用于重要任务,需手动处理异常) |
CallerRunsPolicy | 由调用线程执行任务,不会抛异常(适用于主线程能承担任务执行) |
DiscardPolicy | 丢弃任务,不抛异常(适用于不重要的任务,如日志) |
DiscardOldestPolicy | 丢弃队列中最早的任务,然后尝试执行新任务 |
# 方式二:使用 Executors 工具类创建线程池(不推荐)
Executors
提供了一些简单的方法来创建常见类型的线程池,但默认配置容易导致资源问题,例如:
CachedThreadPool
线程数无限增长,可能导致 OOM。FixedThreadPool
和SingleThreadExecutor
使用无界队列,可能导致任务堆积,内存占满。
1. 常见线程池
// 1. 固定大小线程池(适用于任务量稳定场景)
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 2. 缓存线程池(适用于短任务高并发场景)
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 3. 单线程池(适用于串行执行任务)
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 4. 调度线程池(适用于定时任务)
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
2
3
4
5
6
7
8
9
10
11
2. 示例代码
import java.util.concurrent.*;
public class ExecutorsExample {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 5; i++) {
int taskNumber = i;
fixedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 执行任务 " + taskNumber);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
fixedThreadPool.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 15. 为什么推荐 ThreadPoolExecutor而不推荐 Executors?
Executors
提供的方法容易导致资源管理问题:
Executors.newFixedThreadPool(nThreads)
- 使用
LinkedBlockingQueue
(无界队列),可能导致任务堆积,占用大量内存,最终 OOM。
- 使用
Executors.newCachedThreadPool()
- 使用
SynchronousQueue
(无容量队列),如果任务提交过快,线程会无限创建,可能导致 OOM。
- 使用
Executors.newSingleThreadExecutor()
- 适用于单任务串行执行,但同样使用
LinkedBlockingQueue
,任务积压可能导致 OOM。
- 适用于单任务串行执行,但同样使用
# 16. 线程池的底层工作原理
# 16.1 线程池的创建
在使用线程池之前,首先需要创建线程池。线程池的创建通常涉及以下参数:
- 核心线程数(Core Pool Size):始终存活的线程数量,即使没有任务也不会销毁。
- 最大线程数(Maximum Pool Size):线程池能创建的最大线程数量。
- 任务队列(Blocking Queue):用于存储等待执行的任务。
- 线程存活时间(Keep Alive Time):当线程数超过核心线程数时,空闲线程的存活时间。
- 线程工厂(Thread Factory):用于创建新线程的工厂方法。
- 拒绝策略(Rejected Execution Handler):当任务队列已满且线程数达到最大值时的处理策略。
# 16.2 任务提交
当有任务需要执行时,线程池提供两种方式提交任务:
execute(Runnable command)
:适用于不需要返回结果的任务。submit(Callable<T> task)
:适用于需要返回结果的任务,会返回Future<T>
对象,可通过get()
方法获取执行结果。
# 16.3 线程分配
- 任务提交后,线程池优先使用核心线程执行任务。
- 如果核心线程已满,任务会进入任务队列等待执行。
- 当任务队列也满了,并且线程数未达到最大线程数时,线程池会创建新临时线程执行任务。
- 如果线程数已达最大值并且任务队列满了,则触发拒绝策略。
# 16.4 任务执行
线程池中的工作线程会不断从任务队列中取出任务并执行。当任务执行完成后,线程不会立即销毁,而是继续等待新任务。
# 16.5 线程回收
线程池的线程不会无限增长,而是根据配置参数进行回收:
- 核心线程默认不会被回收,即使空闲也会存活。
- 非核心线程(当线程数超过核心线程数时创建的临时线程)在**
keepAliveTime
** 设定的时间内没有新任务时,会被销毁。
# 16.6 任务完成与结果返回
- 如果任务是
Callable
类型,线程池会返回Future<T>
对象,可通过get()
方法获取结果。 - 如果是
Runnable
任务,则无返回值,任务执行完即结束。
# 16.7 线程池的异常处理
线程池内部会对任务执行过程中抛出的异常进行处理:
- 使用
execute()
提交任务:如果任务中有未捕获的异常,线程会终止,线程池会创建新线程替换它。 - 使用
submit()
提交任务:异常会被封装在Future
中,线程不会终止,需要调用get()
方法捕获ExecutionException
。
# 17. 线程池中线程异常后,销毁还是复用?
直接说结论,需要分两种情况:
- 使用execute()提交任务:当任务通过execute()提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 -
- 使用submit()提交任务:对于通过submit()提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
简单来说:使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()时,异常被封装在Future中,线程继续复用。 这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而execute()则适用于那些不需要关注执行结果的场景。
具体的源码分析可以参考这篇:线程池中线程异常后:销毁还是复用? - 京东技术 (opens new window)。
# 18. 线程的关闭和线程池的关闭
在多线程编程中,线程的关闭和线程池的关闭有一些关键区别。
- 线程的关闭:
interrupt()
:这个方法用于通知线程中断,线程的执行状态会变成“中断”,但是它不会立即停止线程的执行。线程需要在适当的位置检查是否被中断,并响应中断请求,例如通过抛出InterruptedException
或检查Thread.interrupted()
标志。stop()
:这个方法曾经是用来强制终止线程的,但它已经被标记为已弃用(自 Java 1.2 起)。使用stop()
方法来停止线程是不安全的,因为它会强制线程退出,可能会导致不一致的状态或资源泄漏。因此,不推荐使用该方法。
- 线程池的关闭:
shutdown()
:这个方法会平滑地关闭线程池,停止接收新任务,并且在完成所有已提交的任务后退出。shutdownNow()
:这个方法会尝试立即停止所有正在执行的任务,并返回尚未执行的任务列表。