# Java 并发概述


# 什么是进程和线程?

# 何为进程?

进程是程序的一次执行过程,是系统进行资源分配的基本单位。系统运行一个程序即是一个进程从创建,运行到消亡的过程。

在 Java 中,当我们启动 main 函数时其实就是启动了一个 JVM 的进程,而 main 函数所在的线程就是这个进程中的一个线程,也称主线程

如下图所示,在 Windows 中通过查看任务管理器的方式,我们就可以清楚看到 Windows 当前运行的进程( .exe 文件的运行)。

进程示例图片-Windows

# 何为线程?

线程属于进程中的一个实体,是一个基本的 CPU 执行单元,是系统独立调度的基本单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是,同类的多个线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈。所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程

Java 程序天生就是多线程程序,我们可以通过 JMX 来看看一个普通的 Java 程序有哪些线程,代码如下。

public class MultiThread {
	public static void main(String[] args) {
		// 获取 Java 线程管理 MXBean
	ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
		// 不需要获取同步的 monitor 和 synchronizer 信息,仅获取线程和线程堆栈信息
		ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
		// 遍历线程信息,仅打印线程 ID 和线程名称信息
		for (ThreadInfo threadInfo : threadInfos) {
			System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
		}
	}
}

上述程序输出如下(输出内容可能不同,不用太纠结下面每个线程的作用,只用知道 main 线程执行 main 方法即可):

[5] Attach Listener //添加事件
[4] Signal Dispatcher // 分发处理给 JVM 信号的线程
[3] Finalizer //调用对象 finalize 方法的线程
[2] Reference Handler //清除 reference 线程
[1] main //main 线程,程序入口

从上面的输出内容可以看出:一个 Java 程序(进程)的运行是 main 线程和多个其他线程同时运行

# 进程和线程的关系,区别,优缺点?

  • 调度:线程是调度的基本单位(PC,状态码,通用寄存器,线程栈及栈指针);进程是拥有资源的基本单位(打开文件,堆,静态区,代码段等)。
  • 并发性:一个进程内多个线程可以并发(最好和 CPU 核数相等);多个进程可以并发。
  • 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位。
  • 系统开销:线程创建销毁只需要处理 PC 值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁 task_struct 结构。

# 图解进程与线程的关系

下图是 Java 8 后的的内存结构(运行时数据区),通过下图我们从 JVM 的角度来说一下线程和进程之间的关系。

Java 运行时数据区域(JDK1.8 之后)

image-20231008002249305

从上图可以看出:一个进程中可以有多个线程,多个线程共享进程的方法区 (JDK1.8 之后的元空间) 资源,但是每个线程有自己的程序计数器虚拟机栈本地方法栈

进程与线程的对比总结:

  • 线程是进程划分成的更小的运行单位
  • 线程和进程最大的不同在于,各进程基本上是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。
  • 线程执行开销小,上下文切换的成本低,但不利于资源的管理和保护;而进程正相反。

# 为什么程序计数器的私有的?

程序计数器主要有下面两个作用:

  1. 字节码解释器通过改变程序计数器来依次读取指令,从而实现代码的流程控制,如:顺序执行、选择、循环、异常处理。
  2. 在多线程的情况下,程序计数器用于记录当前线程执行的位置,从而当线程被切换回来的时候能够知道该线程上次运行到哪儿了。

需要注意的是,如果执行的是 native 方法,那么程序计数器记录的是 undefined 地址,只有执行的是 Java 代码时,程序计数器记录的才是下一条指令的地址

所以,程序计数器私有主要是为了线程切换后能恢复到正确的执行位置

# 为什么虚拟机栈和本地方法栈是私有的?

  • ** 虚拟机栈:** 每个 Java 方法在执行之前会创建一个栈帧用于存储调用方法的局部变量表、操作数栈、常量池引用等信息。从方法调用直至执行完成的过程,就对应着一个栈帧在 Java 虚拟机栈中入栈和出栈的过程。

    image-20231008001952766

  • ** 本地方法栈:** 和虚拟机栈所发挥的作用非常相似,区别是:** 虚拟机栈为虚拟机执行 Java 方法 (也就是字节码)服务,而本地方法栈则为虚拟机使用到的 Native 方法服务。** 在 HotSpot 虚拟机中和 Java 虚拟机栈合二为一。

所以,为了保证线程中的局部变量不被别的线程访问到,虚拟机栈和本地方法栈是线程私有的。

# 为什么堆和方法区是共享的?

  • :是进程中最大的一块内存,主要用于存放新创建的对象 (几乎所有对象都在这里分配内存)
  • 方法区:主要用于存放已被加载的类信息、常量等数据

# 并发和并行的区别?

  • 并发:两个及两个以上的作业在同一 时间段 内执行。
  • 并行:两个及两个以上的作业在同一 时刻 执行。

最关键的点是:是否是 同时 执行。

# 同步和异步的区别?

  • 同步:发出一个调用之后,在没有得到结果之前,该调用就不可以返回,一直等待
  • 异步:调用在发出之后,不用等待返回结果,该调用直接返回

# 为什么要使用多线程?

先从总体上来说:

  • 从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。
  • 从当代互联网发展趋势来说: 现在的系统动不动就要求百万级甚至千万级的并发量,而多线程并发编程正是开发高并发系统的基础,利用好多线程机制可以大大提高系统整体的并发能力以及性能。

再深入到计算机底层来探讨:

  • 单核时代:在单核时代多线程主要是为了提高单进程利用 CPU 和 IO 系统的效率。 假设只运行了一个 Java 进程的情况,当我们请求 IO 的时候,如果 Java 进程中只有一个线程,此线程被 IO 阻塞则整个进程被阻塞。CPU 和 IO 设备只有一个在运行,那么可以简单地说系统整体效率只有 50%。当使用多线程的时候,一个线程被 IO 阻塞,其他线程还可以继续使用 CPU。从而提高了 Java 进程利用系统资源的整体效率。
  • 多核时代:多核时代多线程主要是为了提高进程利用多核 CPU 的能力。举个例子:假如我们要计算一个复杂的任务,我们只用一个线程的话,不论系统有几个 CPU 核心,都只会有一个 CPU 核心被利用到。而创建多个线程,这些线程可以被映射到底层多个 CPU 上执行,在任务中的多个线程没有资源竞争的情况下,任务执行的效率会有显著性的提高,约等于(单核时执行时间 / CPU 核心数)。

# 多线程会带来什么问题?

并发编程的目的就是为了能提高程序的执行效率,提高程序运行速度。但是并发编程并不总是能提高程序运行速度的,而且并发编程可能会遇到很多问题,比如:内存泄漏、死锁、线程不安全等。

# 如何理解线程安全和不安全?

线程安全和不安全是在多线程环境下对于同一份数据的访问是否能保证共享资源的正确性和一致性的描述。

  • 线程安全:在多线程环境下,对于同一份数据,不管有多少个线程同时访问,都能保证这份数据的正确性和一致性。
  • 线程不安全:在多线程环境下,对于同一份数据,多个线程同时访问时可能会导致数据混乱、错误或者丢失。

# 单核 CPU 上运行多个线程效率一定会高吗?

单核 CPU 同时运行多个线程的效率是否会高,取决于线程的类型和任务的性质

一般来说,有两种类型的线程:

  • CPU 密集型:主要进行计算和逻辑处理,需要占用大量的 CPU 资源。
  • IO 密集型:主要进行输入输出操作,如读写文件、网络通信等,需要等待 IO 设备的响应,而不占用太多的 CPU 资源。

在单核 CPU 上,同一时刻只能有一个线程在运行,其他线程需要等待 CPU 的时间片分配。如果线程是 CPU 密集型的,那么多个线程同时运行会导致频繁的线程切换,增加了系统的开销,降低了效率。如果线程是 IO 密集型的,那么多个线程同时运行可以利用 CPU 在等待 IO 时的空闲时间,提高了效率。

因此,对于单核 CPU 来说,如果任务是 CPU 密集型的,那么多线程会影响效率;如果任务是 IO 密集型的,那么多线程会提高效率。当然,这里的 “多” 也要适度,不能超过系统能够承受的上限。

# 线程

# 线程的创建方式

# 继承 Thread 类

Thread 创建线程方式:创建线程类,匿名内部类方式

  • start () 方法底层其实是给 CPU 注册当前线程,并且触发 run () 方法执行
  • 线程的启动必须调用 start () 方法,如果线程直接调用 run () 方法,相当于变成了普通类的执行,此时主线程将只有执行该线程
  • 建议线程先创建子线程,主线程的任务放在之后,否则主线程(main)永远是先执行完

Thread 构造器:

  • public Thread()
  • public Thread(String name)
public class ThreadDemo {
    public static void main(String[] args) {
        Thread t = new MyThread();
        t.start();
       	for(int i = 0 ; i < 100 ; i++ ){
            System.out.println("main线程" + i)
        }
        //main 线程输出放在上面 就变成有先后顺序了,因为是 main 线程驱动的子线程运行
    }
}
class MyThread extends Thread {
    @Override
    public void run() {
        for(int i = 0 ; i < 100 ; i++ ) {
            System.out.println("子线程输出:"+i)
        }
    }
}

继承 Thread 类的优缺点:

  • 优点:编码简单
  • 缺点:线程类已经继承了 Thread 类,无法继承其他类了,功能不能通过继承拓展(单继承的局限性)

# 实现 Runnable 接口

Runnable 创建线程方式:创建线程类,匿名内部类方式

Thread 的构造器:

  • public Thread(Runnable target)
  • public Thread(Runnable target, String name)
public class ThreadDemo {
    public static void main(String[] args) {
        Runnable target = new MyRunnable();
        Thread t1 = new Thread(target,"1号线程");
		t1.start();
        Thread t2 = new Thread(target);//Thread-0
    }
}
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for(int i = 0 ; i < 10 ; i++ ){
            System.out.println(Thread.currentThread().getName() + "->" + i);
        }
    }
}

Thread 类本身也是实现了 Runnable 接口,Thread 类中持有 Runnable 的属性,执行线程 run 方法底层是调用 Runnable#run:

public class Thread implements Runnable {
    private Runnable target;
    
    public void run() {
        if (target != null) {
          	// 底层调用的是 Runnable 的 run 方法
            target.run();
        }
    }
}

Runnable 方式的优缺点:

  • 缺点:代码复杂一点。
  • 优点:
    1. 线程任务类只是实现了 Runnable 接口,可以继续继承其他类,避免了单继承的局限性
    2. 同一个线程任务对象可以被包装成多个线程对象
    3. 适合多个线程去共享同一个资源
    4. 实现解耦操作,线程任务代码可以被多个线程共享,线程任务代码和线程独立
    5. 线程池可以放入实现 Runnable 或 Callable 线程任务对象

# 实现 Callable 接口

实现 Callable 接口:

  1. 定义一个线程任务类实现 Callable 接口,声明线程执行的结果类型
  2. 重写线程任务类的 call 方法,这个方法可以直接返回执行的结果
  3. 创建一个 Callable 的线程任务对象
  4. 把 Callable 的线程任务对象包装成一个 FutureTask 对象
  5. 把 FutureTask 对象包装成 Thread 对象
  6. 调用线程的 start () 方法启动线程
public class ThreadDemo {
    public static void main(String[] args) {
        Callable call = new MyCallable();
        FutureTask<String> task = new FutureTask<>(call);
        Thread t = new Thread(task);
        t.start();
        try {
            String s = task.get(); // 获取 call 方法返回的结果(正常 / 异常结果)
            System.out.println(s);
        }  catch (Exception e) {
            e.printStackTrace();
        }
    }
public class MyCallable implements Callable<String> {
    @Override// 重写线程任务类方法
    public String call() throws Exception {
        return Thread.currentThread().getName() + "->" + "Hello World";
    }
}

public FutureTask(Callable<V> callable) :未来任务对象,在线程执行完后得到线程的执行结果

  • FutureTask 就是 Runnable 对象,因为 Thread 类只能执行 Runnable 实例的任务对象,所以把 Callable 包装成未来任务对象
  • 线程池部分详解了 FutureTask 的源码

public V get() :同步等待 task 执行完毕的结果,如果在线程中获取另一个线程执行结果,会阻塞等待,用于线程同步

  • get () 线程会阻塞等待任务执行完成
  • run () 执行完后会把结果设置到 FutureTask 的一个成员变量,get () 线程可以获取到该变量的值

优缺点:

  • 优点:同 Runnable,并且能得到线程执行的结果
  • 缺点:编码复杂

# Runnable 和 Callable 的区别?

  1. 接口类型:Runnable 是一个接口类型,而 Callable 是一个通用接口类型
  2. 返回值:Runnable 接口的 run () 方法没有返回值,它只能执行某个任务。Callable 接口的 call () 方法可以返回一个结果
  3. 异常处理:Runnable 接口的 run () 方法不能抛出任何已检查异常,而 Callable 接口的 call () 方法可以抛出异常
  4. 使用方式:Runnable 接口通常用于创建多线程任务,它不能直接返回结果或抛出异常。Callable 接口通常用于创建线程任务,它可以返回结果或抛出异常。
  5. 多线程执行返回的结果:Runnable 接口无法直接获取执行结果,而 Callable 接口通过 Future 接口的 get () 方法可以获取线程执行的结果

# Thread 类的常用 API

方法说明
public void start()启动一个新线程,Java 虚拟机调用此线程的 run () 方法
public void run()线程启动后,调用该方法
public void setName(String name)给当前线程取名字
public void getName()获取当前线程的名字。线程存在默认名称:子线程是 Thread - 索引,主线程是 main
public final int getPriority()返回此线程的优先级
public final void setPriority(int priority)更改此线程的优先级,常用 1 5 10
public void interrupt()中断这个线程,异常处理机制
public boolean isInterrupted()判断当前线程是否被打断,不清除打断标记
public final void join()等待这个线程结束
public final void join(long millis)等待这个线程死亡 millis 毫秒,0 意味着永远等待
public final native boolean isAlive()线程是否存活(还没有运行完毕)
public final void setDaemon(boolean on)将此线程标记为守护线程或用户线程
public static boolean interrupted()判断当前线程是否被打断,清除打断标记
public static Thread currentThread()获取当前线程对象
public static void sleep(long time)让当前线程休眠 time 毫秒,同时让出 CPU 时间片。Thread.sleep(0) : 让操作系统立刻重新进行一次 CPU 竞争
public static native void yield()提示线程调度器让出当前线程对 CPU 的使用

# start () 与 run ()

直接调用 start() 方法的话,可启动新的线程(进入 Runnable 状态),通过新线程间接执行 run() 方法。

直接调用 run() 方法的话,是在 main 线程下作为普通方法执行,不会启动新线程。

run () 方法中的异常不能抛出,只能 try/catch

  • 因为父类中没有抛出任何异常,子类不能比父类抛出更多的异常
  • 异常不能跨线程传播回 main () 中,因此必须在本地进行处理

# sleep () 与 yield ()

sleep:

  • 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(计时等待)
  • sleep () 方法的过程中,线程不会释放对象锁
  • 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
  • 睡眠结束后的线程未必会立刻得到执行,需要抢占 CPU
  • 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性

yield:

  • 调用 yield 会让提示线程调度器让出当前线程对 CPU 的使用
  • 具体的实现依赖于操作系统的任务调度器
  • 会放弃 CPU 资源,但不会释放锁资源

# sleep () 与 wait ()

共同点:两者都可以暂停线程的执行。

区别:

  • sleep() 方法没有释放锁,而 wait() 方法释放了锁
  • wait() 通常被用于线程间交互 / 通信, sleep() 通常被用于暂停执行
  • wait() 方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify() 或者 notifyAll() 方法。 sleep() 方法执行完成后,线程会自动苏醒,或者也可以使用 wait(long timeout) 超时后线程会自动苏醒。
  • sleep()Thread 类的静态本地方法, wait() 则是 Object 类的本地方法

# 说说线程的生命周期和状态?

java.lang.Thread.State 这个枚举类中给出了 Java 线程的六种线程状态:

线程状态导致状态发生条件
NEW(初始)线程刚被创建,但还没有调用 start() 。只有线程对象,没有线程特征。
RUNNABLE(运行)线程调用了 start() ,可能正在RUNNING(运行中),也可能在READY(等待运行),这取决于操作系统处理器。
BLOCKED(锁阻塞)需要等待锁释放。当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入 Blocked 状态;当该线程持有锁时,该线程将变成 Runnable 状态。
WAITING(无限等待)表示该线程需要等待其他线程做出一些特定动作(通知或中断)。一个线程在等待另一个线程时,该线程进入 Waiting 状态,进入这个状态后不能自动唤醒,必须等待另一个线程调用 notify 或者 notifyAll 方法才能唤醒。
TIME_WAITING(计时等待)可以在指定的时间后自行返回,而不是像 WAITING 那样一直等待。有几个方法有超时参数,调用将进入 Timed Waiting 状态,这一状态将一直保持到超时期满或者接收到唤醒通知。带有超时参数的常用方法有 Thread.sleep 、Object.wait
TERMINATED(终止)表示该线程已经运行完毕。run 方法正常退出而死亡,或者因为没有捕获的异常终止了 run 方法而死亡。

Java 线程状态转移图:

图源:挑错 |《Java 并发编程的艺术》中关于线程状态的三处错误

Java 线程状态变迁图

由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。

  • 当线程执行 wait() 方法之后,线程进入 WAITING(无限等待) 状态,需要依靠其他线程的通知才能够返回到 RUNNABLE 状态。

  • TIMED_WAITING (计时等待) 状态相当于在 WAITING 状态的基础上增加了超时限制,比如通过 sleep(long millis) 方法或 wait(long millis) 方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后,线程将会返回到 RUNNABLE 状态。

  • 当线程进入 synchronized 方法 / 块或者调用 wait 后(被 notify )重新进入 synchronized 方法 / 块,但是锁被其它线程占有,这个时候线程就会进入 BLOCKED(锁阻塞) 状态。

  • 线程在执行完了 run() 方法之后将会进入到 TERMINATED(终止) 状态。

image-20220524203355448

# 什么是线程上下文切换?

线程在执行过程中会有自己的运行条件和状态(也称上下文),在上下文切换过程中,需要保存和恢复的资源包括:

  • 寄存器
  • 程序计数器(PC):保存当前任务执行的下一条指令的地址,以便在切换回来时继续执行。
  • 栈指针(SP)
  • 内存管理单元(MMU):保存当前任务的页表、段表等内存管理信息,以便在切换回来时继续使用该任务的内存映射。
  • 文件描述符表
  • 环境变量

当出现如下情况的时候,线程会从占用 CPU 状态中退出:

  • 主动让出 CPU,比如调用了 sleep() , wait() 等。
  • 时间片用完,因为操作系统要防止一个线程或者进程长时间占用 CPU 导致其他线程或者进程饿死。
  • 调用了阻塞类型的系统中断,比如请求 IO,线程被阻塞。
  • 被终止或结束运行。

这其中前三种都会发生线程切换,线程切换意味着需要保存当前线程的上下文,留待线程下次占用 CPU 时恢复现场。并加载下一个将要占用 CPU 的线程上下文。这就是所谓的上下文切换。

上下文切换是现代操作系统的基本功能,因其每次需要保存信息恢复信息,这将会占用 CPU,内存等系统资源进行处理,也就意味着效率会有一定损耗,如果频繁切换就会造成整体效率低下

# 什么是线程死锁?

面试题:说说你了解的死锁?包括死锁产生原因、必要条件、处理方法、死锁恢复以及死锁预防等(死锁相关问题大总结,超全!)

线程死锁描述的是这样一种情况:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。死锁是指两个(多个)线程相互等待对方所占用的资源而导致同时阻塞的过程。死锁的产生会导致程序卡死,不解锁程序将永远无法进行下去。

如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程就会互相等待,而进入死锁状态

线程死锁示意图

# 死锁产生的原因

理论上认为死锁产生有以下四个必要条件,缺一不可:

  1. 互斥:该资源必须处于非共享模式,即任意时刻只有一个线程可以使用。
  2. 占有并等待:一个线程在等待一个被其他线程所占有的资源时,对自身已占有的资源保持不放。
  3. 非抢占:资源不能被抢占,只能在持有资源的线程完成任务后,该资源才会被释放。
  4. 循环等待:若干线程之间形成一种头尾相接的循环等待资源关系。

# 如何预防、避免死锁?

如何预防死锁?破坏死锁的产生的必要条件即可:

  1. 破坏请求并保持条件:一次性申请所有的资源
  2. 破坏不剥夺条件:占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放自身占有的资源
  3. 破坏循环等待条件:靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。

如何避免死锁?避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行计算评估,使其进入安全状态

安全状态:指的是系统能够按照某种线程推进顺序(P1、P2、P3.....Pn)来为每个线程分配所需资源,直到满足每个线程对资源的最大需求,使每个线程都可顺利完成。称 <P1、P2、P3.....Pn> 序列为安全序列。

# 为什么 wait () 定义在 Object 类中,而不是 Thread 类中?

wait() 是让获得对象锁的线程实现等待,会自动释放当前线程占有的对象锁。每个对象( Object )都拥有对象锁,既然要释放当前线程占有的对象锁,并让其进入 WAITING 状态,自然是要操作对应的对象( Object )而非当前的线程( Thread )。

类似的问题:为什么 sleep() 方法定义在 Thread 中?

因为 sleep() 是让当前线程暂停执行,不涉及到对象类,也不需要获得对象锁

# 可以直接调用 Thread 类的 run () 吗?

new 一个 Thread ,线程进入了新建状态。调用 start() 方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。 start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,这是真正的多线程工作。但是,直接执行 run() 方法,会把 run() 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。

总结:

  • 直接调用 start() 方法的话,可启动新的线程(进入 Runnable 状态),通过新线程间接执行 run() 方法。
  • 直接调用 run() 方法的话,是在 main 线程下作为普通方法执行,不会启动新线程。

# 同步

# 临界区

临界资源:一次仅允许一个进程使用的资源成为临界资源

临界区:访问临界资源的代码块

竞态条件:多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

一个程序运行多个线程是没有问题,多个线程读共享资源也没有问题,在多个线程对共享资源读写操作时发生指令交错,就会出现问题

为了避免临界区的竞态条件发生(解决线程安全问题):

  • 阻塞式的解决方案:synchronized,lock
  • 非阻塞式的解决方案:原子变量

管程(monitor):也称监视器,指的是管理共享变量以及对共享变量的操作过程,让它们支持并发。翻译为 Java 就是 **管理类的成员变量和成员方法,让这个类是线程安全的**。实现了同一时刻,只有一个线程在执行管程的某个子程序

synchronized:对象锁,保证了临界区内代码的原子性,采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程获取这个对象锁时会阻塞,保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。

互斥和同步都可以采用 synchronized 关键字来完成,区别:

  • 互斥:同一时刻只能有一个线程执行临界区代码
  • 同步:是由于线程执行的先后、顺序不同,需要一个线程等待其它线程运行到某个点

性能:

  • 线程安全:性能差
  • 线程不安全:性能好,假如开发中不会存在多线程安全问题,建议使用线程不安全的设计类

以下内容,参考《JVM 笔记.md》的第五章

# 乐观锁和悲观锁

# 数据库悲观锁和乐观锁的原理和应用场景分别有什么?

  • 悲观锁先获取锁,再进行业务操作。例如 synchronizedReentrantLock 。一般就是利用类似 SELECT … FOR UPDATE 这样的语句,对数据加锁,避免其他事务意外修改数据。
    当数据库执行 SELECT … FOR UPDATE 时会获取被 select 中的数据行的行锁, select for update 获取的行锁会在当前事务结束时自动释放,因此必须在事务中使用。

  • 乐观锁先进行业务操作,只在最后实际更新数据时检查数据是否被更新过。Java 并发包中的 AtomicFieldUpdater 类似,也是利用 CAS 机制,并不会对数据加锁,而是通过对比数据的时间戳或者版本号,来实现乐观锁需要的版本判断。

# synchronized 关键字

独占锁(悲观锁)

# 为什么把 JDK 1.6 之前的 synchronized 称为重量级锁?

  1. JDK 1.6 之前的 synchronized 是重量级锁主要是因为:其实现方式会导致性能上的较大损耗。
  2. 在 JDK 1.6 之前,synchronized 关键字使用的是对象级别的锁,即每个对象都有一个相关的锁。在获取和释放锁的过程中需要进行用户态和内核态的切换,这个切换的代价比较高,消耗的时间比较多,因此被称为重量级锁。
  3. 在 JDK 1.6 之前的 synchronized 在获取锁时使用的是互斥量(Mutex)来实现,这是一种悲观锁,即当某个线程获取了锁之后,其他的线程将被阻塞,直到该线程释放锁。这种阻塞和唤醒线程的操作需要操作系统层面的支持,因此会有较高的开销。
  4. 另外,JDK 1.6 之前的 synchronized 的实现没有进行优化,每一次锁的获取和释放都需要进行同步操作,无法做到细粒度的控制,导致锁的粒度较大,同一时间只能一个线程访问被锁定的代码块,效率较低。

因此,JDK 1.6 之前的 synchronized 被称为重量级锁,主要是因为它在性能和资源消耗方面存在一些不足之处。不过随着 JDK 的不断更新,synchronized 的实现方式得到了改善,JDK 1.6 之后引入了偏向锁、轻量级锁等优化措施,使得 synchronized 的性能也得到了提升。

# synchronized 锁升级过程(偏向锁、轻量级锁、重量级锁)

  1. 偏向锁
  • 在锁对象初始化时,对象头中的 Mark Word 会记录当前线程 ID,表示该锁对象偏向于该线程。
  • 当另一个线程也尝试获取这个锁对象时,会检查该对象头的 Mark Word,
    • 如果记录的线程 ID 是自己,表示可以直接获取锁。
    • 如果记录的线程 ID 不是自己,会撤销偏向锁,升级为轻量级锁
  1. 轻量级锁
  • 当多个线程尝试获取锁对象时,JVM 会为参与竞争的各个线程的栈帧中各自分配 ** 锁记录( Lock Record )** 空间,并将锁对象的 Mark Word 拷贝到其中,称为 Displaced Mark Word
  • 然后,一个线程使用 CAS 操作尝试将对象头的 Mark Word 替换为指向自己线程栈帧中的锁记录的指针。
    • 如果 CAS 成功,表示获取锁成功。
    • 如果 CAS 失败,那么当前线程也不会阻塞,而是通过自旋的方式不断尝试获取锁,当达到一定次数仍未获得锁时,会进一步膨胀为重量级锁
  1. 重量级锁
  • 当一个线程尝试获取对象锁时,发现对象已经是轻量级锁状态,但是锁的拥有者不是自己时,此时线程会进入 BLOCKED(锁阻塞)状态,开始锁膨胀流程。
  • 升级为重量级锁时,虚拟机会在操作系统层面申请一个 ** 互斥量(Mutex)** 来保护整个对象。
  • 获取重量级锁失败的线程将会进入阻塞状态,只有当拥有锁的线程释放锁后,其他线程才有机会获取锁。

需要注意的是,锁的升级过程中是逐级升级的,即从偏向锁 -> 轻量级锁 -> 重量级锁。而且锁只能升级,不能降级。这是为了防止频繁锁的请求和释放造成的性能损耗。

# 内存

# Java 内存模型(JMM)

# volatile 关键字

# volatile 实现什么能力,怎么实现的?

  • 能保证变量的可见性:如果我们将变量声明为 volatile ,表明这个变量是共享且不稳定的,每次读写都强制发生在主内存中
  • 禁止指令重排:当一个变量被 volatile 修饰时,编译器和处理器会禁止对其进行指令重排,从而保证程序的正确性。

# happens-before 原则

# 无锁

# CAS

# Atomic 原子类

  1. Atomic 原子类是 Java.util.concurrent 包中的一个类,用于提供线程安全原子操作

  2. Atomic 原子类可以保证在多线程环境下对变量的原子操作,即操作过程不会被其他线程中断,从而避免了竞态条件的发生。

  3. Atomic 原子类提供了一系列的方法,可以对变量进行读取、写入、比较并设置等操作,如 get、set、compareAndSet 等。

  4. Atomic 原子类支持不同的数据类型

    • 基本类型的原子类: AtomicInteger (整型)、 AtomicLong (长整型)、 AtomicBoolean (布尔型)
    • 数组类型的原子类: AtomicIntegerArray (整型数组)、 AtomicLongArray (长整型数组)、 AtomicReferenceArray (引用类型数组)
    • 引用类型的原子类: AtomicReference (引用类型)、 AtomicStampedReference (原子更新带有版本号的引用类型)、 AtomicMarkableReference (原子更新带有标记的引用类型)
  5. Atomic 原子类的内部实现基于 CAS(Compare-and-Swap)算法,该算法通过比较内存中的值与期望值是否一致来判断是否更新

  6. Atomic 原子类的使用场景包括计数器、线程安全的累加器、标志位的设置等。

  7. Atomic 原子类 **在高并发场景下可以提高性能,避免了使用锁造成的线程等待和上下文切换的开销**。

# ThreadLocal 类

img

# 有什么用?

ThreadLocal 类可以让每个线程拥有自己单独的变量副本,分配在堆内的 TLAB 中,使得每个线程中的变量相互独立,从而保证线程安全。

img

ThreadLocal 变量是 private static 类型的,访问这个变量的每个线程都会在 TLAB 中存储这个变量的本地副本,所以是线程安全的。他们可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

# 如何使用?

下面简单演示一下如何在项目中实际使用 ThreadLocal

import java.text.SimpleDateFormat;
import java.util.Random;
public class ThreadLocalExample implements Runnable{
     // SimpleDateFormat 不是线程安全的,所以每个线程都要有自己独立的副本
    private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm"));
    public static void main(String[] args) throws InterruptedException {
        ThreadLocalExample obj = new ThreadLocalExample();
        for(int i=0 ; i<10; i++){
            Thread t = new Thread(obj, ""+i);
            Thread.sleep(new Random().nextInt(1000));
            t.start();
        }
    }
    @Override
    public void run() {
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern());
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //formatter pattern is changed here by thread, but it won't reflect to other threads
        formatter.set(new SimpleDateFormat());
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern());
    }
}

# 原理

Thread 类源代码入手。

public class Thread implements Runnable {
    //......
    // 与此线程有关的 ThreadLocal 值。由 ThreadLocal 类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;
    // 与此线程有关的 InheritableThreadLocal 值。由 InheritableThreadLocal 类维护
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    //......
}

从上面 Thread 类的源代码有一个 threadLocals 变量和一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,可以理解为 ThreadLocal 类实现的定制化 HashMap

  • 默认情况下这两个变量都是 null
  • 只有当前线程调用 ThreadLocal 类的 set()get() 方法时才创建它们,实际上调用的是 ThreadLocalMap 类对应的 get()set() 方法。

ThreadLocal 类的 set() 方法

public void set(T value) {
    // 获取当前请求的线程
    Thread t = Thread.currentThread();
    // 取出 Thread 类内部的 threadLocals 变量 (哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

结论:最终的变量是存放在当前线程的 ThreadLocalMap 中,而不是 ThreadLocal

  • ThreadLocal 可以理解为只是 ThreadLocalMap 的封装,传递了变量值。
  • ThrealLocal 类中可以通过 Thread.currentThread() 获取到当前线程对象后,直接通过 getMap(Thread t) 可以访问到该线程的 ThreadLocalMap 对象。

每个 Thread 中都具备一个 ThreadLocalMap ,可以存储(key= ThreadLocal 对象,value= Object 对象)的键值对。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //......
}

比如我们在同一个线程中声明了两个 ThreadLocal 对象的话, Thread 内部都是使用仅有的那个 ThreadLocalMap 存放数据的,key 就是 ThreadLocal 对象,value 就是 ThreadLocal 对象调用 set() 方法设置的值。

ThreadLocal 数据结构如下图所示:

ThreadLocal 数据结构

ThreadLocalMapThreadLocal 的静态内部类。

ThreadLocal内部类

# 内存泄露问题

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来, ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露

ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal 方法后最好手动调用 remove() 方法。

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

弱引用介绍:

如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。

弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。

# 你的理解?(小结)

ThreadLocal 提供了一种方式,在多线程环境下,每个线程都可以独立地维护自己的变量副本,互不干扰

在多线程编程中,共享变量的访问可能会引发线程安全的问题。比如多个线程同时修改同一个变量,可能会导致数据不一致或者竞态条件等问题。而 ThreadLocal 则可以解决这个问题,它为每个线程提供了一个独立的变量副本,每个线程都可以访问和修改自己的变量副本,而不会影响其他线程的副本。这样就避免了多个线程之间的竞争和冲突。

ThreadLocal 的工作原理是,在每个线程内部维护一个 ThreadLocalMap ,用于存储线程局部变量的副本,(key=ThreadLocal 对象,value=Object 对象)

当线程需要访问这个变量时,首先通过 ThreadLocal 对象获取当前线程的副本,如果不存在则创建一个新的副本,并存储到 Map 中。而线程对变量的访问和修改都是通过 ThreadLocal 对象进行,保了线程间的隔离性

ThreadLocal 的典型应用场景包括:

  • 数据库连接管理:每个线程都可以拥有自己的数据库连接,避免了多个线程之间的数据库连接竞争和冲突。
  • 用户身份信息传递:将用户身份信息存储在 ThreadLocal 中,可以在多个方法中方便地获取和传递,避免了显式地传递参数。
  • 事务管理:将事务对象存储在 ThreadLocal 中,可以在多个方法中共享同一个事务,避免了事务对象的传递和管理。

需要注意的是,使用 ThreadLocal 时要注意及时清理资源,避免内存泄漏问题。一般可以通过在 ThreadLocal 使用完毕后,调用 remove() 方法进行清理。

# 同步器

# AQS 抽象类

# 简介

AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。

img

AQS 就是一个抽象类,主要用来简单且高效地构造出大量的同步器比如 ReentrantLockSemaphore ,其他的诸如 ReentrantReadWriteLockSynchronousQueue 等等皆是基于 AQS 的

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}

# 底层原理

# 核心思想

AQS 核心思想是:

  • 如果被请求的共享资源是空闲的,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
  • 如果被请求的共享资源被占用了,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制是基于 CLH 队列锁 (Craig, Landin, and Hagersten locks) 实现的,即将暂时获取不到锁的线程加入到队列中

CLH 锁是对自旋锁的一种改进,是一个 **虚拟的双向 FIFO 队列**(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 将每个请求共享资源的线程封装成 CLH 队列锁的一个结点(Node)来实现锁的分配。每个结点包含:

  • thread:线程的引用
  • waitStatus:当前节点在队列中的状态
  • prev:前驱节点
  • next:后继节点

CLH 队列结构如下:

CLH 队列结构

AQS( AbstractQueuedSynchronizer ) 的核心原理图:

image-20231013130403824

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 双向 FIFO 线程等待队列 来完成获取资源线程的排队工作。

  • state 变量由 volatile 修饰,保证线程可见性,用于展示当前临界资源的获锁情况。

    // 共享变量,使用 volatile 修饰保证线程可见性
    private volatile int state;
  • 另外,状态信息 state 可以通过 protected 类型的 getState()setState()compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

    // 返回同步状态的当前值
    protected final int getState() {
         return state;
    }
     // 设置同步状态的值
    protected final void setState(int newState) {
         state = newState;
    }
    // 原子地(CAS 操作)将同步状态值设置为给定值 update 如果当前同步状态的值等于 expect(期望值)
    protected final boolean compareAndSetState(int expect, int update) {
          return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

可重入式的独占锁 ReentrantLock 为例,它的内部维护了一个 state 变量,表示这个锁被多少个线程所持有

  • state初始值为 0,表示锁处于未锁定状态。

  • 当线程 A 调用 lock() 方法时,会尝试通过 tryAcquire() 方法独占该锁,并让 state 的值加 1

    • 如果成功了,那么线程 A 就获取到了锁。
    • 如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。
  • 假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的state 会累加)。

    这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 state 的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。

线程 A 尝试获取 ReentrantLock 锁的过程如下图所示:

AQS 独占模式获取锁

(AQS 独占模式获取锁)

再以倒计时器 CountDownLatch 以例,

  • 任务分为 N 个子线程去执行, state 也初始化为 N(表示子线程的个数)。

  • 这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 countDown() 方法。该方法会尝试使用 CAS 操作,让 state 的值减少 1。

  • 当所有的子线程都执行完毕后(即 state 的值变为 0), CountDownLatch 会调用 unpark() 方法,唤醒主线程。

    park () 和 unpark () 是 LockSupport 类中的方法:

    LockSupport.park(); // 暂停当前线程
    LockSupport.unpark(暂停线程对象) // 恢复某个线程的运行
  • 主线程被唤醒后就可以从 CountDownLatch 中的 await() 方法(而非 AQS 中的)返回,继续执行后续的操作。

# 资源共享方式

AQS 定义两种资源共享方式:

  • Exclusive独占式,只有一个线程能执行,如 ReentrantLock
  • Share共享式,多个线程可同时执行,如 Semaphore / CountDownLatch

一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现 tryAcquire-tryReleasetryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock

# 自定义同步器

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样:

  1. 继承 AbstractQueuedSynchronizer (AQS 抽象类),并重写指定的钩子方法
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS 使用了模板方法模式,自定义同步器时,需要重写下面几个 AQS 提供的钩子方法:

// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
protected boolean tryAcquire(int)
// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryRelease(int)
// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryReleaseShared(int)
// 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
protected boolean isHeldExclusively()

钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

除了上面提到的钩子方法之外,AQS 类中的其他方法都是 final ,所以无法被其他类重写

# 常见同步工具类

下面介绍几个基于 AQS 的常见同步工具类:

  • Semaphore(信号量):共享式,AQS 的 state 值为 permits ,表示许可证的数量,只有拿到许可证的线程才能执行。可以用来控制同时访问特定资源的线程数量。
  • CountDownLatch(倒计时器):共享式,AQS 的 state 值为 count ,表示允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
  • CyclicBarrier(循环栅栏)

# Semaphore(信号量)

共享式的资源访问方式

# 简介

Semaphore (信号量) 是共享锁,默认构造 AQS 的 state 值为 permits ,理解为许可证的数量,只有拿到许可证的线程才能执行

Semaphore 的使用简单,我们这里假设有 N (N>5) 个线程来获取 Semaphore 中的共享资源。下面的代码将 permits 初始化为 5,表示同一时刻只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。

// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取 1 个许可
semaphore.acquire();
// 释放 1 个许可
semaphore.release();

当初始的  permits 为 1 的时候, Semaphore 退化为独占锁

Semaphore 有两种模式:

  • 公平模式:调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO
  • 非公平模式抢占式,也是默认的模式。

Semaphore 的两个构造方法如下,二者都必须提供 permits ,其中第二个构造方法可以通过提供 fair 指定是公平模式 / 非公平模式,默认非公平模式。

public Semaphore(int permits) {
  	sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
  	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore 通常用于那些对资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。

# 原理

线程调用 semaphore.acquire() 尝试获取许可证,

  • 如果 permits >= 0 的话,则表示可以获取成功。使用 CAS 操作去修改 permits 的值减 1。
  • 如果 permits < 0 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入 CLH 队列锁,挂起当前线程
/**
 *  获取 1 个许可证
 */
public void acquire() throws InterruptedException {
 	 sync.acquireSharedInterruptibly(1);
}
/**
 * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
 */
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
        // 尝试获取许可证,arg 为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于 0, 则创建一个节点加入阻塞队列,挂起当前线程。
    if (tryAcquireShared(arg) < 0)
      doAcquireSharedInterruptibly(arg);
}

线程调用 semaphore.release() 尝试释放许可证,

  • 并使用 CAS 操作去修改 permits 的值加 1。
  • 释放许可证成功之后,同时会唤醒 CLH 队列锁 中的一个线程
  • 被唤醒的线程会重新尝试获取许可证,修改 permits 的值减 1,
    • 如果 permits >= 0 则获取令牌成功
    • 如果 permits < 0 则重新进入阻塞队列,挂起线程
// 释放一个许可证
public void release() {
  	sync.releaseShared(1);
}
// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
    // 释放共享锁
    if (tryReleaseShared(arg)) {
      // 唤醒同步队列中的一个线程
      doReleaseShared();
      return true;
    }
    return false;
}

# CountDownLatch(倒计时器)

共享式的资源访问方式

# 简介

CountDownLatch (倒计时器)是共享锁,默认构造 AQS 的 state 值为 count ,理解为允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕

CountDownLatch 是 **一次性** 的, count 只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

# 原理
  • 当线程调用 countDown() 方法时,其实调用了 tryReleaseShared() 方法以 CAS 的操作来减少 count ,直至 count 为 0

  • 当调用 await() 方法的时候,如果 count 不为 0,那就证明任务还没有执行完毕, await() 方法就会一直阻塞。也就是说 await() 方法之后的语句不会被执行。

  • 直到 count 个线程调用了 countDown() ,使 count 值被减为 0。或者调用 await() 的线程被中断,该线程才会从阻塞中被唤醒, await() 方法之后的语句得到执行。

# 应用场景

CountDownLatch 的作用就是允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 CountDownLatch 。具体场景是下面这样的:

我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理结果进行统计整理。

为此我们定义了一个线程池和 count 为 6 的 CountDownLatch 对象 。

  • 使用线程池处理读取任务
  • 每一个线程处理完,调用 CountDownLatch 对象的 countDown() 将 count 减 1
  • 当所有线程处理完,调用 CountDownLatch 对象的 await() 方法。直到所有文件读取完之后,才会接着执行后面的逻辑。
public class CountDownLatchExample1 {
    // 处理文件的数量
    private static final int threadCount = 6;
    public static void main(String[] args) throws InterruptedException {
        // 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadnum = i;
            threadPool.execute(() -> {
                try {
                    // 处理文件的业务操作
                    //......
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 表示一个文件已经被完成
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("finish");
    }
}

有没有可以改进的地方呢?** 可以使用 CompletableFuture 类来改进!**Java8 的 CompletableFuture 提供了很多对多线程友好的方法,使用它可以很方便地为我们编写多线程程序,什么异步、串行、并行或者等待所有线程执行完任务什么的都非常方便。

CompletableFuture<Void> task1 =
    CompletableFuture.supplyAsync(()->{
        // 自定义业务操作
    });
......
CompletableFuture<Void> task6 =
    CompletableFuture.supplyAsync(()->{
    // 自定义业务操作
    });
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
    headerFuture.join();
} catch (Exception ex) {
    //......
}
System.out.println("all done. ");

上面的代码还可以继续优化,当任务过多的时候,把每一个 task 都列出来不太现实,可以考虑通过循环来添加任务

// 文件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有文件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
    .map(filePath -> doSomeThing(filePath))
    .collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);

# CyclicBarrier(循环栅栏)

共享式的资源访问方式

# 简介

CyclicBarrier (循环栅栏)和 CountDownLatch 非常类似,也是共享锁,内部通过一个 count 变量作为计数器,其初始值也是 parties 属性(表示拦截的线程数量)的初始值

CyclicBarrier 也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大,主要应用场景类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock ( ReentrantLock 也属于 AQS 同步器) 和 Condition

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

# 原理

难!

每当一个线程到了栅栏这里了,那么就将 count 减 1。当 count 值为 0 时,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

// 每次拦截的线程数
private final int parties;
// 计数器
private int count;

结合源码来简单看看。

1、 CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties) ,参数 parties 表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中, parties 就代表了需要拦截的线程数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过

2、当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是 dowait(false, 0L) 方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

public int await() throws InterruptedException, BrokenBarrierException {
  try {
    	return dowait(false, 0L);
  } catch (TimeoutException toe) {
   	 throw new Error(toe); // cannot happen
  }
}

dowait(false, 0L) 方法源码分析如下:

// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
    private int count;
    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 锁住
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果线程中断了,抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //cout 减 1
            int index = --count;
            // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行 await 方法之后的条件
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 将 count 重置为 parties 属性的初始化值
                    // 唤醒之前等待的线程
                    // 下一波执行开始
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

# 【面试题】AQS 的底层原理是什么?

互联网、美团 / 阿里、后端、Java。

**AQS(AbstractQueuedSynchronizer,抽象队列同步器)** 的底层是通过 Java 中的 Lock 接口和 Condition 接口实现的。AQS 是一个抽象类,提供了一种灵活的机制来实现线程间的同步和互斥操作。它提供了两种同步模式:独占模式(Exclusive)共享模式(Shared)

在 AQS 的底层实现中,主要使用了一个 ** 虚拟的双向 FIFO 队列(CLH 队列锁)** 来维护等待获取锁的线程队列。当一个线程请求获取锁时,如果锁已经被其他线程占用,则该线程会被加入到队列中,并进入等待状态。当持有锁的线程释放锁时,AQS 会从队列中选择一个线程唤醒并允许其获取锁。

AQS 还提供了一些核心的方法,如 acquire 、 release 和 tryAcquire 等,用于实现具体的同步操作。这些方法通过内置的 ** 状态变量(state)** 来记录锁的状态,并根据不同的情况进行相应的操作。

# 讲讲你对 AQS 的理解

AQS(AbstractQueuedSynchronizer)是 Java 并发包中一个重要的工具类,它提供了实现锁、同步器的基础框架

  1. AQS 是一个抽象类,它定义了锁和同步器的基本接口和方法。它的子类可以通过继承 AQS 并重写其中的方法来实现自定义的同步器。
  2. AQS 通过一个 ** 状态变量(state)** 来表示锁的状态。state 的具体含义对于不同的同步器会有不同的解释,例如,ReentrantLock 中 的 state 表示持有锁的线程数量。
  3. AQS 使用一个 ** 双向队列(CLH 队列锁)** 来管理等待获取资源的线程。每个等待线程会被包装成一个 "Node" 对象,并且会按照一定的顺序排队。
  4. AQS 提供了两个主要的方法: acquire()release()
    • acquire () 方法用于获取资源,而 release () 方法用于释放资源。
    • 当一个线程调用 acquire () 方法时,如果资源已经被其他线程占用,那么该线程将会被加入到 CLH 队列 中进行等待。
  5. Condition 是 AQS 的一个补充接口,它提供了更为细粒度的等待 / 通知机制。Condition 对象可以通过 AQS 的 newCondition () 方法创建。Condition 底层的实现依赖于 AQS ,并且使用 AQS 队列来管理等待线程。
  6. Condition 的底层原理是基于 AQS 的状态和 CLH 队列。当一个线程调用 Condition 的 await 方法时,该线程会释放持有的 AQS 状态(比如锁),并且进入 Condition 维护的等待队列中等待通知。当另一个线程执行相应的通知操作时,被唤醒的线程可以重新竞争资源。

总的来说,

  • AQS 是 Java 并发包中实现锁和同步器的基础框架。它使用一个双向队列来管理等待获取资源的线程,并提供 acquire 和 release 等方法来获取和释放资源。
  • Condition 是 AQS 的补充接口,提供了更为细粒度的等待 / 通知机制。Condition 底层的实现依赖于 AQS 的状态和 CLH 队列。

# ReentrantLock 类

可重入式的独占锁(悲观锁)

# 简介

ReentrantLock 实现了 Lock 接口,是一个可重入独占式的锁,和 synchronized 关键字类似。不过, ReentrantLock 更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。

public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock 里面有一个继承自 AQS( AbstractQueuedSynchronizer )的内部类 Sync ,添加锁和释放锁的大部分操作实际上都是其中实现的。 Sync 有两个子类:

  • 公平锁 FairSync
  • 非公平锁 NonfairSync

img

ReentrantLock 默认使用非公平锁,也可以通过构造器来显式的指定使用公平锁。

// 传入一个 boolean 值,true 时为公平锁,false 时为非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

从上面的内容可以看出, ReentrantLock 的底层就是由 AQS 来实现的。

# 公平锁和非公平锁有什么区别?

公平锁:

  • 锁被释放之后,先申请的线程先得到锁
  • 性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁。

非公平锁

  • 锁被释放之后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。
  • 性能更好,但可能会导致某些线程永远无法获取到锁。
  • 例如: synchronized

# synchronized 和 ReentrantLock 有何异同?

# 都是可重入锁

可重入锁也叫递归锁,是指线程可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果是不可重入锁的话,就会造成死锁

JDK 提供的所有现成的 Lock 实现类,包括 synchronized 关键字锁都是可重入的。


在下面的代码中, method1()method2() 都被 synchronized 关键字修饰, method1() 调用了 method2()

public class SynchronizedDemo {
    public synchronized void method1() {
        System.out.println("方法1");
        method2();
    }
    public synchronized void method2() {
        System.out.println("方法2");
    }
}

由于 synchronized 锁是可重入的,同一个线程在调用 method1() 时可以直接获得当前对象的锁,执行 method2() 的时候可以再次获取这个对象的锁,不会产生死锁问题。假如 synchronized 是不可重入锁的话,由于该对象的锁已被当前线程所持有且无法释放,这就导致线程在执行 method2() 时获取锁失败,会出现死锁问题。

# synchronized 依赖于 JVM,而 ReentrantLock 依赖于 API

synchronized 是依赖于 JVM 实现的,前面我们也讲到了虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。

ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock () 和 unlock () 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。

# ReentrantLock 比 synchronized 增加了一些高级功能

相比 synchronizedReentrantLock 增加了一些高级功能。主要来说主要有三点:

  • 等待可中断 : ReentrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情
  • 可实现公平锁 : ReentrantLock 可以指定是公平锁还是非公平锁。synchronized 只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。 ReentrantLock 默认情况是非公平的,可以通过 ReentrantLock 类的 ReentrantLock(boolean fair) 构造方法来指定是否是公平的。
  • 可实现选择性通知(锁可以绑定多个条件): synchronized 关键字与 wait()notify() / notifyAll() 方法相结合可以实现等待 / 通知机制。 ReentrantLock 类当然也可以实现,但是需要借助于 Condition 接口与 newCondition() 方法。

关于 Condition 接口的补充:

Condition 接口是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能。也就是在一个 Lock 对象中可以创建多个 Condition 实例(即对象监视器),线程对象可以注册在指定的 Condition 中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用 notify()/notifyAll() 方法进行通知时,被通知的线程是由 JVM 选择的,用 ReentrantLock 类结合 Condition 实例可以实现 “选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。

synchronized 关键字就相当于整个 Lock 对象中只有一个 Condition 实例,所有的线程都注册在它一个身上。如果执行 notifyAll() 方法的话就会通知所有处于等待状态的线程,这样会造成很大的效率问题。而 Condition 实例的 signalAll() 方法,只会唤醒注册在该 Condition 实例中的所有等待线程。

# 可中断锁和不可中断锁有什么区别?

等待可中断

可中断锁:获取锁的过程中可以被中断,不需要一直等到获取锁之后 才能进行其他逻辑处理。 ReentrantLock 就属于是可中断锁。

不可中断锁:一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。 synchronized 就属于是不可中断锁。

# 介绍下 ReentrantLock 的底层原理(可重入、公平锁和非公平锁的原理)

  1. 可重入的原理:ReentrantLock 通过一个计数器(状态变量 state)来记录锁的持有线程数。当一个线程第一次获取锁时,计数器加 1,当该线程再次获取锁时,计数器再次加 1。当线程释放锁时,计数器减 1。只有当计数器值为 0 时,表示锁完全释放,其他线程可以获取该锁。
  2. 公平锁和非公平锁的原理:ReentrantLock 提供了公平锁( FairSync )和非公平锁( NonfairSync )两种模式。
  • 公平锁模式:当多个线程等待获取锁时,按照线程的申请顺序来获取锁。即先到先得的原则,保证等待时间越久的线程越早获取锁。
  • 非公平锁模式:当多个线程等待获取锁时,不按照线程的申请顺序来获取锁。即存在一个竞争机制,新来的线程有机会抢占锁,这样可以提高吞吐量。
  1. 在底层实现上,公平锁和非公平锁的区别在于线程获取锁的方式:
  • 公平锁先检查队列中是否有正在等待的线程,如果有,则按照 FIFO 的顺序选择锁的持有者。
  • 非公平锁先尝试直接获取锁,
    • 如果失败,则再进入同步队列等待。
    • 如果失败,则进入队列等待,但在这个等待过程中,可能会允许新来的线程抢占锁。

总结:

  • ReentrantLock 通过计数器实现可重入的机制,保证同一个线程可以多次获取同一个锁。
  • 公平锁和非公平锁的区别在于锁的获取方式,公平锁按照线程等待的顺序进行获取,而非公平锁允许在锁释放时新来的线程有机会优先获取锁。

# 线程池

# 线程池

# 简介

顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务

池化技术的主要思想:为了减少每次获取资源的消耗,提高对资源的利用率。

借用《Java 并发编程的艺术》来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

# Executor 框架

# 简介

Executor 框架是 Java5 之后引进的,通过 Executor 来启动线程比使用 Threadstart 方法更好。除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题

this 逃逸:在构造函数返回之前,其他线程就持有该对象的引用,调用尚未构造完全的对象的方法,可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列、拒绝策略等, Executor 框架让并发编程变得更加简单。

# 三大部分

# 任务( Runnable / Callable

执行任务需要实现 Runnable 接口Callable 接口,对应的实现类可以被 ThreadPoolExecutor ScheduledThreadPoolExecutor 执行。

# 任务的执行( Executor

如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自它的 ExecutorService 接口。

ThreadPoolExecutorScheduledThreadPoolExecutor 是两个关键的实现类。

img

ThreadPoolExecutor 类描述:

//AbstractExecutorService 类实现了 ExecutorService 接口
public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 类描述:

//ScheduledExecutorService 继承 ExecutorService 接口
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService
# 异步计算的结果( Future

Future 接口其实现类 FutureTask都可以代表异步计算的结果。

当我们把 Runnable 接口或 Callable 接口的实现类提交给 ThreadPoolExecutorScheduledThreadPoolExecutor 执行时,在内部调用 submit() 方法时会返回一个 FutureTask 对象。

# 如何使用

Executor 框架的使用示意图:

image-20231020143912018

  1. 主线程首先要创建实现了 Runnable 接口或者 Callable 接口的任务对象。

  2. 把创建完成的实现了 Runnable / Callable 接口的对象直接交给 ExecutorServiceexecute() 或者 submit() 执行。

    由于 FutureTask 实现了 Runnable ,我们也可以直接创建 FutureTask ,然后交给 ExecutorService 执行。

  3. 如果执行的是 ExecutorService.submit() ,将返回一个实现了 Future 接口的 FutureTask 对象。

  4. 最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。

# ThreadPoolExecutor 类

线程池实现类 ThreadPoolExecutorExecutor 框架最核心的类。

# 构造方法

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法,比如默认制定拒绝策略是什么)。

/**
     * 用给定的初始参数创建一个新的 ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,// 线程池的核心线程数量
                              int maximumPoolSize,// 线程池的最大线程数
                              long keepAliveTime,// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,// 时间单位
                              BlockingQueue<Runnable> workQueue,// 任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,// 线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler// 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

# 常见参数

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数量当任务队列未达到队列容量时,最大可以同时运行的线程数量。如果没有全局设置池内线程的过期时间,池内会维持此数量线程。
  • maximumPoolSize : 最大线程数量当核心线程都在运行任务,并且任务队列中的任务数量已满,此时会创建非核心线程,当前池内可以同时运行的线程数量变为最大线程数。
  • workQueue 用于存放池内任务的任务队列。新任务到来时会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会先被存放在任务队列中。

其他常见参数:

  • keepAliveTime 当线程数大于  corePoolSize 时,多余的空闲线程能够存活的最长时间
    • 当线程池中的线程数量大于 corePoolSize 时,如果这时没有新的任务提交,多余的空闲线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁。
    • 线程池回收线程时,会对核心线程和非核心线程一视同仁,直到线程池中线程的数量等于 corePoolSize ,回收过程才会停止。
  • unit keepAliveTime 参数的时间单位
  • threadFactory :线程工厂,用来创建线程(包括优先级、名称、守护状态等属性的初始化),一般默认即可。
  • handler 拒绝策略 / 饱和策略。当池内线程都在运行,同时任务队列也满时,可以制定相关策略来处理新来的任务。JDK 线程池中实现了四种拒绝策略,默认 AbortPolicy,抛出异常。

对线程池中各个参数的相互关系的理解:

image-20231018163149255

# 饱和策略

如果当前同时运行的线程数量达到 maximumPoolSize ,并且 workQueue 也被放满任务时, ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor. AbortPolicy 默认的饱和策略。抛出 RejectedExecutionException 异常来拒绝新任务的处理
  • ThreadPoolExecutor. CallerRunsPolicy :它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。直接在调用 execute 方法的调用者线程中运行 ( run ) 任务。如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor. DiscardPolicy :不处理新任务,直接丢弃掉
  • ThreadPoolExecutor. DiscardOldestPolicy :此策略将丢弃最早的未处理的任务请求

举个例子:

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略来配置线程池的时候,默认使用的是 AbortPolicy 。在这种饱和策略下,如果队列满了, ThreadPoolExecutor 将抛出 RejectedExecutionException 异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。

如果不想丢弃任务的话,可以使用 CallerRunsPolicy 。和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 直接主线程执行,而不是线程池中的线程执行
                r.run();
            }
        }
}

# 线程池的创建方式

# 方式 1:ThreadPoolExecutor 类的构造函数

推荐!

image-20231015003053159

# 方式 2:Executor 框架的 Executors 工具类

不推荐~

我们可以创建多种类型的 ThreadPoolExecutor

  • FixedThreadPool :该方法返回一个固定线程数量的线程池。
    • 该线程池中的线程数量始终不变。
    • 当有一个新的任务提交时,
      • 若线程池中有空闲线程,则立即执行。
      • 若线程池中没有空间线程,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor 该方法返回一个只有一个线程的线程池。
    • 若多于一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool 该方法返回一个会根据需要创建线程的线程池。
    • 初始大小为 0。
    • 当有新任务提交时,如果当前线程池中没有线程可用,它会创建一个新的线程来处理该任务。
    • 如果在一段时间内(默认为 60 秒)没有新任务提交,核心线程会超时并被销毁,从而缩小线程池的大小。
  • ScheduledThreadPool :该方法返回一个用来在给定的延迟后,或者定期执行任务的线程池。

对应 Executors 工具类中的方法如图所示:

img

在《阿里巴巴 Java 开发手册》“并发处理” 这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

原因:使用线程池可以减少在创建和销毁线程上所消耗的开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者 “过度切换” 的问题。

另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 工具类去创建,而是通过 ThreadPoolExecutor 构造函数的方式。这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽(OOM)的风险

Executors 工具类返回线程池对象的弊端如下 (后文会详细介绍到):

  • FixedThreadPoolSingleThreadExecutor :使用的是无界的 LinkedBlockingQueue ,任务队列最大长度为 Integer.MAX_VALUE可能堆积大量的请求,从而导致 OOM
  • CachedThreadPool :使用的是同步队列 SynchronousQueue ,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM
  • ScheduledThreadPoolSingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列 DelayedWorkQueue ,任务队列最大长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM 。
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

# 常用的阻塞队列

即任务队列。难!

新任务来的时候会先判断当前运行的线程数量是否达到 corePoolSize ,如果达到的话,新任务就会被存放在阻塞队列 workQueue 中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析:

  • 无界阻塞队列 LinkedBlockingQueue

    • 容量为 Integer.MAX_VALUE
    • FixedThreadPoolSingleThreadExector 这两种线程池都是用的是无界队列,因此二者的任务队列永远不会被放满。
    • FixedThreadPool 最多只能创建 corePoolSize 个线程
    • SingleThreadExector 只能创建 1 个线程
  • 同步队列 SynchronousQueue

    • 容量为 0,不存储元素
    • 目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。
    • CachedThreadPool 线程池使用的是同步队列。
    • 也就是说, CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM 。
  • 延迟阻塞队列 DelayedWorkQueue

    • 该队列的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序。内部采用的是 “堆” 的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。
    • ScheduledThreadPoolSingleThreadScheduledExecutor 这两种线程池使用的是延迟阻塞队列。
    • DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE ,所以最多只能创建 corePoolSize 个线程。

# 线程池原理分析

我们上面讲解了 Executor 框架以及 ThreadPoolExecutor 类,下面让我们实战一下,来通过写一个 ThreadPoolExecutor 的小 Demo 来回顾上面的内容。

# ThreadPoolExecutor 示例代码

首先创建一个 Runnable 接口的实现类(也可以是 Callable 接口的实现类)

MyRunnable.java

import java.util.Date;
/**
 * 这是一个简单的 Runnable 类,需要大约 5 秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {
    private String command;
    public MyRunnable(String s) {
        this.command = s;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }
    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Override
    public String toString() {
        return this.command;
    }
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {
        // 使用阿里巴巴推荐的创建线程池的方式
        // 通过 ThreadPoolExecutor 构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 10; i++) {
            // 创建 WorkerThread 对象(WorkerThread 类实现了 Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            // 执行 Runnable
            executor.execute(worker);
        }
        // 终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

上面的代码指定了:

  • corePoolSize : 核心线程数为 5。
  • maximumPoolSize :最大线程数 10
  • keepAliveTime : 等待时间为 1L。
  • unit : 等待时间的单位为 TimeUnit.SECONDS。
  • workQueue :任务队列为 ArrayBlockingQueue ,并且容量为 100;
  • handler : 饱和策略为 CallerRunsPolicy

输出结构:

pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
Finished all threads  // 任务全部执行完了才会跳出来,因为 executor.isTerminated () 判断为 true 了才会跳出 while 循环,当且仅当调用 shutdown () 方法后,并且所有提交的任务完成后返回为 true

# 线程池原理分析(处理任务的流程)

在示例代码中,我们使用 executor.execute(worker) 来提交一个任务到线程池中去。为了搞懂线程池的原理,我们需要首先分析一下 execute() 方法,源码如下:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    // 任务队列
    private final BlockingQueue<Runnable> workQueue;
    public void execute(Runnable command) {
        // 如果任务为 null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        //ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();
        //  下面会涉及到 3 步 操作
        // 1. 首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过 addWorker (command, true) 新建一个线程,并将任务 (command) 添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2. 如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前工作线程数量为 0,新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过 addWorker (command, false) 新建一个线程,并将任务 (command) 添加到该线程中;然后,启动该线程从而执行任务。
        // 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize
        // 如果 addWorker (command, false) 执行失败,则通过 reject () 执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

简单分析一下整个流程

图解线程池实现原理

  1. 如果 当前运行的线程数 < corePoolSize ,那么就会新建一个线程来执行任务。

  2. 如果 corePoolSize ≤ 当前运行的线程数 < maximumPoolSize ,那么就把该任务放入到任务队列里等待执行

    1. 如果向任务队列添加任务失败(即任务队列已经满了),就新建一个线程来执行任务。
  3. 如果 当前运行的线程数 == maximumPoolSize ,此时任务队列、线程池都满了,那么会根据饱和策略来处理无法接收的任务,比如抛出异常或者丢弃任务。


execute() 方法中,多次调用 addWorker() 方法,该方法主要用来创建新的工作线程,如果创建和启动工作线程成功则返回 true ,否则返回 false。

// 全局锁,并发操作必备
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟踪线程池的最大大小,只有在持有全局锁 mainLock 的前提下才能访问此集合
    private int largestPoolSize;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁 mainLock 的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    // 获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 判断线程池的状态是否为 Running
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    /**
     * 添加新的工作线程到线程池
     * @param firstTask 要执行
     * @param core 参数为 true 的话表示使用线程池的基本大小,为 false 使用线程池最大大小
     * @return 添加成功就返回 true 否则返回 false
     */
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 这两句用来获取线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
               // 获取线程池中工作的线程的数量
                int wc = workerCountOf(c);
                //core 参数为 false 的话表明队列也满了,线程池大小变为 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               // 原子操作将 workcount 的数量加 1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   // 获取线程池状态
                    int rs = runStateOf(ctl.get());
                   //rs < SHUTDOWN 如果线程池状态依然为 RUNNING, 并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
                  //(rs=SHUTDOWN && firstTask == null) 如果线程池状态小于 STOP,也就是 RUNNING 或者 SHUTDOWN 状态下,同时传入的任务实例 firstTask 为 null,则需要添加到工作线程集合和启动新的 Worker
                   //firstTask == null 证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                       // 更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 工作线程是否启动成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                //// 如果成功添加工作线程,则调用 Worker 内部的线程实例 t 的 Thread#start () 方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                  /// 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
           // 线程启动失败,需要从工作线程中移除对应的 Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

现在,对于上述示例代码,分析如下:我们模拟了 10 个任务,配置的核心线程数为 5、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

# 几个常见的对比

# Runnable vs Callable

Runnable 接口自 Java 1.0 以来一直存在,但 Callable 接口仅在 Java 1.5 中引入,目的是为了处理 Runnable 接口不支持的用例。

Runnable 接口无返回值,且不会向上抛出异常,但是 Callable 接口可以

所以,如果任务不需要返回结果或抛出异常,则推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。

  • Executors.callable(Runnable task)
  • Executors.callable(Runnable task, Object result)
@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值,也无法抛出异常
    */
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

# execute() vs submit()

execute()submit()
声明位置Executor 接口ExecutorService 接口
接收参数RunnableRunnableCallable<T>
返回值类型voidFuture
能否处理异常无法处理异常借助 Future.get() 可以捕获并处理异常
  • execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功

  • submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功。

    • 可以通过 Futureget() 方法来获取返回值,该方法会阻塞当前线程直到任务完成。
    • 而使用 Futureget(long timeout,TimeUnit unit) 方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException

示例 1:使用 get() 方法获取返回值。

ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
String s = submit.get();
System.out.println(s);
executorService.shutdown();

输出:

abc

示例 2:使用 get(long timeout,TimeUnit unit) 方法获取返回值。

ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
executorService.shutdown();

输出:

Exception in thread "main" java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)

# shutdown() vs shutdownNow()

  • shutdown()

    • 关闭线程池,线程池的状态变为 SHUTDOWN
    • 线程池不再接受新任务了,但是队列里的任务得执行完毕
  • shutdownNow()

    • 关闭线程池,线程池的状态变为 STOP
    • 线程池会终止当前正在运行的任务,并停止处理排队的任务,并返回正在等待执行的 List

# isShutdown() vs isTerminated()

  • isShutDown() :当调用 shutdown() 方法后,返回为 true

  • isTerminated() :当调用 shutdown() 方法后,并且所有提交的任务完成后,返回为 true

# 几种常见的内置线程池

即上文提到的 Executor 框架的 Executors 工具类中所提供的线程池

# FixedThreadPool

# 介绍

FixedThreadPool 被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:

/**
     * 创建一个可重用固定数量线程的线程池
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

可以看出 corePoolSizemaximumPoolSize 都被设置为 nThreads ,这个 nThreads 参数是我们使用的时候自己传递的。

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为 FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue无界队列),任务队列永远不会被放满

# 处理任务的流程

FixedThreadPoolexecute() 方法运行示意图:

FixedThreadPool的execute()方法运行示意图

  1. 如果当前运行的线程数小于 corePoolSize , 如果再来新任务的话,就创建新的线程来执行任务;

  2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue

  3. 线程池中的线程执行完手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;

# 为什么不推荐使用?

FixedThreadPool 使用无界队列 LinkedBlockingQueue (队列的容量为 Integer.MAX_VALUE )作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool 的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool (未执行 shutdown()shutdownNow() )不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)

# SingleThreadExecutor

# 介绍

SingleThreadExecutor只有一个线程的线程池。下面看看 SingleThreadExecutor 的实现:

/**
     * 返回只有一个线程的线程池
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

可以看出 corePoolSizemaximumPoolSize 都被设置为 1,其他参数和 FixedThreadPool 相同。

# 处理任务的流程

SingleThreadExecutor 的运行示意图:

SingleThreadExecutor的运行示意图

  1. 如果当前运行的线程数少于 corePoolSize ,则创建一个新的线程执行任务;
  2. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
  3. 线程执行完当前的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;
# 为什么不推荐使用?

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue (无界队列)作为线程池的工作队列。说简单点,就是可能会导致 OOM

# CachedThreadPool

# 介绍

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看实现:

/**
     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

corePoolSize 被设置为空(0), maximumPoolSize 被设置为 Integer.MAX.VALUE ,即它是无界的。这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时, CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源

# 处理任务的流程

CachedThreadPoolexecute() 方法的执行示意图:

image-20231021003533603

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) ,那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行, execute() 方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) 。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;
# 为什么不推荐使用?

CachedThreadPool 使用的是同步队列 SynchronousQueue允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM

# ScheduledThreadPool

# 介绍

ScheduledThreadPool 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下即可。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

ScheduledThreadPool 是通过 ScheduledThreadPoolExecutor 创建的,** 使用的 DelayedWorkQueue (延迟阻塞队列)** 作为线程池的任务队列。

DelayedWorkQueue 的内部元素并不是按照放入时间排序的,而是按照延迟时间长短对任务进行排序的,内部采用的是 “堆” 的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE ,所以最多只能创建核心线程数的线程。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor ,所以创建 ScheduledThreadExecutor 本质也是创建一个 ThreadPoolExecutor 线程池,只是传入的参数不相同。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService
# ScheduledThreadPoolExecutor 和 Timer 对比
  • Timer 对系统时钟的变化敏感, ScheduledThreadPoolExecutor 不是;

  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory ),你可以完全控制创建的线程;

  • TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机即计划任务将不再运行。 ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法 ThreadPoolExecutor )。抛出异常的任务将被取消,但其他任务将继续运行。

# 如何设计一个能根据任务优先级来执行的线程池?

这是一个常见的面试问题,本质其实还是在考察求职者对于线程池以及阻塞队列(任务队列)的掌握。上面也提到了,不同的线程池会选用不同的阻塞队列作为任务队列。比如 FixedThreadPool 使用的是 LinkedBlockingQueue (无界队列),由于该队列永远不会被放满,因此 FixedThreadPool 最多只能创建 corePoolSize 个线程。

假如我们需要实现一个优先级任务线程池的话,那可以考虑使用 PriorityBlockingQueue (优先级阻塞队列)作为任务队列ThreadPoolExecutor 的构造函数有一个 workQueue 参数可以传入任务队列)。

ThreadPoolExecutor构造函数

优先级阻塞队列 PriorityBlockingQueue 一个支持优先级的无界阻塞队列,可以看作是线程安全PriorityQueue ,两者底层都是使用小顶堆形式的二叉堆,即值最小的元素优先出队。不过, PriorityQueue支持阻塞操作

要想让 PriorityBlockingQueue 实现对任务的排序,传入的任务必须是具备排序能力的,方式有两种:

  1. 让任务实现 Comparable 接口,并重写 compareTo 方法来指定任务之间的优先级比较规则。
  2. (推荐!) 创建 PriorityBlockingQueue 时传入一个 Comparator 对象来指定任务之间的排序规则

不过,这存在一些风险和问题,比如:

  • PriorityBlockingQueue 是无界的,可能堆积大量的请求,从而导致 OOM

    解决方法:继承 PriorityBlockingQueue重写一下 offer (入队) 方法的逻辑,当插入的元素数量超过指定值就返回 false

  • 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。

    解决方法:可以通过优化设计来解决(比较麻烦),比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升

  • 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁 ReentrantLock ),因此会降低性能

    解决方法:性能方面的影响是没法避免的,毕竟需要对任务进行排序操作。并且,对于大部分业务场景来说,这点性能影响是可以接受的

# 【面试题】线程池有了解吗?线程池大概的原理?

互联网、米哈游、后端、Java。考察的是线程池的常见参数、处理任务的流程

# 线程池的核心参数

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue:任务的阻塞队列
  • keepAliveTime:当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在超过 keepAliveTime 时间后会被终止。
  • threadFactory:线程工厂,用于创建新的线程。
  • handler:拒绝策略(饱和策略),当线程池无法接收新的任务时,采取的处理方式。

image-20231018163149255

# 线程池的任务处理流程

图解线程池实现原理

  1. 如果 当前运行的线程数 < corePoolSize ,那么就会新建一个线程来执行任务。

  2. 如果 corePoolSize ≤ 当前运行的线程数 < maximumPoolSize ,那么就把该任务放入到任务队列里等待执行

    随着不断添加任务,如果任务队列满了,就新建一个线程来执行任务。

  3. 如果 当前运行的线程数 == maximumPoolSize ,此时任务队列、线程池都满了,那么根据拒绝策略(饱和策略)来处理无法接收的任务,比如抛出异常或者丢弃任务。

  4. 当线程执行完任务后,会从任务队列中获取下一个任务继续执行,直到任务队列为空。

  5. 当空闲线程的时间超过 keepAliveTime 时,如果线程池中的线程数量超过 corePoolSize ,则多余的空闲线程会被终止,直到线程数量等于 corePoolSize 为止。

# 【面试题】设计一个线程池需要考虑哪些因素?

互联网、米哈游、后端情景题、系统设计。考察的是线程池的特征,比较综合全面

设计一个线程池需要考虑以下几个方面:

  1. 线程池的大小:线程池的大小应该根据系统的负载情况和任务的性质来确定。
    • 如果线程池太小,会导致任务排队等待,影响系统的性能
    • 如果线程池太大,会造成过多的上下文切换。
  2. 任务队列:线程池中的等待任务需要有一个队列来存储。任务队列可以是阻塞队列或非阻塞队列,根据具体的需求来选择。
  3. 线程工厂:用于创建新的线程。
  4. 拒绝策略:当线程池中的线程都处于忙碌状态时,新提交的任务会被放入任务队列中等待执行。此时需要设置一个拒绝策略,防止任务一直被放入队列中而无法执行。常见的拒绝策略有直接抛出异常、丢弃任务等。
  5. 饱和策略:当线程池中的线程数量达到最大值时,新提交的任务可能会被拒绝执行。此时需要设置一个饱和策略,防止线程池过度扩展。常见的饱和策略有直接抛出异常、丢弃任务等。
  6. 线程池监控:为了方便对线程池进行监控和管理,可以添加一些监控功能,如获取当前线程池的状态、获取当前正在执行的任务等。

基于以上方面,可以设计一个简单的线程池示例代码如下:

import java.util.concurrent.*;
public class ThreadPoolExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
        // 提交任务到线程池中执行
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            executorService.execute(() -> {
                System.out.println("Task " + taskId + " is running by thread " + Thread.currentThread().getName());
            });
        }
        // 关闭线程池
        executorService.shutdown();
    }
}

# Java 线程池最佳实践

# 1、正确声明线程池

** 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用 Executors 类创建线程池,会有 OOM 风险。** 说白了就是:使用有界队列,控制线程创建数量。

除了避免 OOM 的原因之外,不推荐使用 Executors 提供的两种快捷的线程池的原因还有:

  • 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
  • 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。

# 2、监测线程池运行状态

你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 做一个简陋的监控。从下图可以看出, ThreadPoolExecutor 提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

image-20231021094845929

下面是一个简单的 Demo。 printThreadPoolStatus() 会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

/**
 * 打印线程池的状态
 *
 * @param threadPool 线程池对象
 */
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}

# 3、建议不同类别的业务用不同的线程池

很多人在实际项目中都会有类似这样的问题:我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?

一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。

我们再来看一个真实的事故案例! (本案例来源自:《线程池运用不当的一次线上事故》 ,很精彩的一个案例)

案例代码概览

上面的代码可能会存在死锁的情况,为什么呢?画个图给大家捋一捋。

试想这样一种极端情况:假如我们线程池的核心线程数为 n,父任务(扣费任务)数量为 n,父任务下面有两个子任务(扣费任务下的子任务),其中一个已经执行完成,另外一个被放在了任务队列中。由于父任务把线程池核心线程资源用完,所以子任务因为无法获取到线程资源无法正常执行,一直被阻塞在队列中。父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 "死锁"

解决方法也很简单,就是专门为执行子任务新增加一个线程池为其服务。

线程池使用不当导致死锁

# 4、别忘记给线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

给线程池里的线程命名通常有下面两种方式:

1、利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

2、自己实现 ThreadFactory

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;
    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

# 5、正确配置线程池大小

# 常规操作

线程池大小设置过大或者过小都会有问题,合适的才是最好。

  • ** 线程过少会导致大量任务堆积,造成 OOM **。如果同一时间有大量任务 / 请求需要处理,可能会导致大量的请求 / 任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务 / 请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。

  • 线程过多会增加上下文切换成本。大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

    上下文切换:任务从保存到再加载的过程

有一个简单并且适用面较广的公式:

  • CPU 密集型任务 (N+1):这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间

    CPU 密集型任务:简单理解就是利用 CPU 计算能力的任务。比如你在内存中对大量数据进行排序。

  • I/O 密集型任务 (2N):这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

    IO 密集型任务:但凡涉及到网络读取,文件读取,这类任务都是 IO 密集型,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上

# 美团骚操作

美团技术团队的思路是线程池的 3 个核心参数实现自定义可配置

  • corePoolSize :核心线程数量。当任务队列未达到容量时,可以同时运行的最大线程数量。
  • maximumPoolSize :最大线程数量。当任务队列达到容量时,可以同时运行的最大线程数量。
  • workQueue :任务队列。当新任务来时,如果当前运行的线程数量达到了 corePoolSize 的话,新任务就会被存放在该队列中。

这 3 个参数之所以如此重要,是因为它们基本决定了线程池对任务的处理策略。

ThreadPoolExecutor 提供的下面这些方法支持参数的动态配置。

img

  • 需要注意的是 corePoolSize , 程序运行期间的时候,我们调用 setCorePoolSize() 方法的话,线程池会首先判断当前工作线程数是否大于 corePoolSize ,如果大于的话就会回收工作线程。

  • 另外,上面并没有动态指定任务队列长度的方法,因此美团自定义了一个叫做 ResizableCapacityLinkedBlockIngQueue 的队列。

    主要就是把 LinkedBlockingQueue 的 capacity 字段的 final 关键字修饰给去掉了,让它变为可变的

最终实现的可动态修改线程池参数效果如下。👏👏👏

动态配置线程池参数最终效果

如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:

  • Hippo4j:异步线程池框架,支持线程池动态变更 & 监控 & 报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力。
  • Dynamic TP:轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)。

# 6、记得关闭线程池

当线程池不再需要使用时,应该显式地关闭线程池,释放线程资源。

线程池提供了两个关闭方法:

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN 。线程池不再接受新任务了,但是队列里的任务得执行完毕
  • shutdownNow() :关闭线程池,线程池的状态变为 STOP 。线程池会终止当前正在运行的任务,停止处理排队的任务,并返回正在等待执行的 List。

调用完 shutdownNowshuwdown 方法后,并不代表线程池已经完成关闭操作,它只是异步的通知线程池进行关闭处理。

如果要同步等待线程池彻底关闭后才继续往下执行,需要调用 awaitTermination() 方法进行同步等待

  • 在调用 awaitTermination() 方法时,应该设置合理的超时时间,以避免程序长时间阻塞而导致性能问题。
  • 另外,由于线程池中的任务可能会被取消或抛出异常,因此在使用 awaitTermination() 方法时还需要进行异常处理。该方法会抛出 InterruptedException 异常,需要捕获并处理该异常,以避免程序崩溃或者无法正常退出。
// ...
// 关闭线程池
executor.shutdown();
try {
    // 等待线程池关闭,最多等待 5 分钟
    if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
        // 如果等待超时,则打印日志
        System.err.println("线程池未能在5分钟内完全关闭");
    }
} catch (InterruptedException e) {
    // 异常处理
}

# 7、线程池尽量不要放耗时任务

线程池本身的目的是为了提高任务执行效率,避免因频繁创建和销毁线程而带来的性能开销。如果将耗时任务提交到线程池中执行,可能会导致线程池中的线程被长时间占用,无法及时响应其他任务,甚至会导致线程池崩溃或者程序假死。

因此,在使用线程池时,我们应该尽量避免将耗时任务提交到线程池中执行。对于一些比较耗时的操作,如网络请求、文件读写等,可以采用异步操作的方式来处理,以避免阻塞线程池中的线程

# 8、线程池使用的一些小坑

# 不要重复创建线程池

线程池是可以复用的,一定不要频繁创建线程池(比如一个用户请求到了就单独创建一个线程池)。

@GetMapping("wrong")
public String wrong() throws InterruptedException {
    // 自定义线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,1L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.CallerRunsPolicy());
    //  处理任务
    executor.execute(() -> {
      // ......
    }
    return "OK";
}

出现这种问题的原因还是对于线程池认识不够,需要加强线程池的基础知识。

# 使用 Spring 内部线程池时一定要手动自定义

使用 Spring 内部线程池时,一定要手动自定义线程池,配置合理的参数,不然会出现生产问题(一个请求创建一个线程)。

@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {
    @Bean(name="threadPoolExecutor")
    public Executor threadPoolExecutor(){
        ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的 Java 虚拟机的数量
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        threadPoolExecutor.setCorePoolSize(corePoolSize); // 核心池大小
        threadPoolExecutor.setMaxPoolSize(maxPoolSize); // 最大线程数
        threadPoolExecutor.setQueueCapacity(maxPoolSize * 1000); // 队列程度
        threadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
        threadPoolExecutor.setDaemon(false);
        threadPoolExecutor.setKeepAliveSeconds(300);// 线程空闲时间
        threadPoolExecutor.setThreadNamePrefix("test-Executor-"); // 线程名字前缀
        return threadPoolExecutor;
    }
}
# 线程池和 ThreadLocal 不要共用

线程池和 ThreadLocal 共用,可能会导致线程从 ThreadLocal 获取到的是旧值 / 脏数据。因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的 ThreadLocal

不要以为代码中没有显示使用线程池就不存在线程池了,像常用的 Web 服务器 Tomcat 处理任务为了提高并发量,就使用到了线程池,并且使用的是基于原生 Java 线程池改进完善得到的自定义线程池。

当然了,你可以将 Tomcat 设置为单线程处理任务。不过,这并不合适,会严重影响其处理任务的速度。

server.tomcat.max-threads=1

解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal ( TTL ) 来代替 ThreadLocal

该类继承并加强了 JDK 内置的 InheritableThreadLocal 类,在使用线程池等会池化复用线程的执行组件情况下,提供 ThreadLocal 的值传递功能,解决异步执行时上下文传递的问题。

项目地址:https://github.com/alibaba/transmittable-thread-local

# Future 接口

# 有什么用?

Future 类是 **异步调用思想** 的典型运用,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,再通过 Future 类获取到耗时任务的执行结果。可以提高程序的执行效率

在 Java 中, Future 类只是一个 **泛型接口**,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务
  • 判断任务是否被取消
  • 判断任务是否已经执行完成
  • 获取任务执行结果
// V 代表了 Future 执行的任务返回值的类型
public interface Future<V> {
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

简单理解就是:

  • 我有一个任务,提交给了 Future 来处理。
  • 任务执行期间,我可以去做任何想做的事情,还可以取消任务或者获取任务的执行状态。
  • 一段时间之后,我就可以 Future 那里直接取出任务执行结果。

# 与 Callable 的关系

FutureTask 类基本实现了 Future 接口,常用来封装 CallableRunnable ,具有取消任务、查看任务是否执行完成、获取任务执行结果的方法。 ExecutorService.submit() 方法返回的其实就是 Future 的实现类 FutureTask

<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

FutureTask 不光实现了 Future 接口,还实现了 Runnable 接口,因此可以作为任务直接被线程执行。

img

FutureTask 有两个构造函数,可传入 Callable 或者 Runnable 对象。实际上,传入 Runnable 对象也会在方法内部转换为 Callable 对象。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
    // 通过适配器 RunnableAdapter 来将 Runnable 对象转换成 Callable 对象
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

FutureTask 相当于对 Callable 进行了封装,管理着任务执行的情况,存储了 Callablecall 方法的任务执行结果

# CompletableFuture 类

Future 在实际使用过程中存在一些局限性,比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。

Java 8 引入 CompletableFuture 类来解决 Future 的这些缺陷。除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

下面我们来简单看看 CompletableFuture 类的定义。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

可以看到, CompletableFuture 同时实现了 Future 接口和 CompletionStage 接口。

img

CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

CompletionStage 接口中的方法比较多, CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。

# 并发包(J.U.C)

java.util.concurrent

# Java 常见并发容器

JDK 提供的这些容器大部分在 java.util.concurrent 包中。

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List ,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList ,这是一个非阻塞队列
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

# ConcurrentHashMap

线程安全的 HashMap

HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装我们的 HashMap 。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本 —— ConcurrentHashMap 的诞生。

  • 在 JDK 1.7 的时候, ConcurrentHashMap 对整个桶数组进行了分段(Segment),采用的是分段锁,每把锁对应一个 Segment。每个 Segment 都是一个类似 HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。但是 Segment 的个数一但初始化就不能改变。
  • 到了 JDK 1.8 的时候, ConcurrentHashMap 摒弃了 Segment 的概念,而是采用 Node 数组 + 链表 / 红黑树 的数据结构来实现,使用 synchronized 锁 + CAS 来控制并发。Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。

# ConcurrentHashMap 1.7

Java 7 ConcurrentHashMap 存储结构

Java 7 中 ConcurrentHashMap 的存储结构如上图,

  • ConcurrnetHashMap 由很多个 Segment 组合
  • 每一个 Segment 是一个类似于 HashMap 的结构 HashEntry 数组,所以每一个 HashEntry 的内部可以进行扩容。
  • 但是 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数是 16 个,你也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。

# ConcurrentHashMap 1.8

# 存储结构

Java8 ConcurrentHashMap 存储结构(图片来自 javadoop)

可以发现 Java8 的 ConcurrentHashMap 相对于 Java7 来说变化比较大,不再是之前的 Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表 / 红黑树当冲突链表达到一定长度时,链表会转换成红黑树

# 初始化 initTable

从源码中可以发现 ConcurrentHashMap 的初始化是通过自旋CAS 操作完成的。里面需要注意的是变量 sizeCtl ,它的值决定着当前的初始化状态。

  • -1 说明正在初始化

  • -N 说明有 N-1 个线程正在进行扩容

  • 0 表示 table 初始化大小,如果 table 没有初始化

  • >0 表示 table 扩容的阈值,如果 table 已经初始化。

# put
  1. 根据 key 计算出 hashcode

  2. 判断是否需要进行初始化

  3. 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功

  4. 如果当前位置的 hashcode == MOVED == -1 ,则需要进行扩容

  5. 如果都不满足,则利用 synchronized 锁写入数据

  6. 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法,在 treeifyBin 中会首先判断当前数组长度 ≥64 时才会将链表转换为红黑树

# get
  1. 根据 hash 值计算位置。
  2. 查找到指定位置,如果头节点就是要找的,直接返回它的 value.
  3. 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之。
  4. 如果是链表,遍历查找之。

# 【面试题】concurrentHashMap 相对于 hashMap 好在哪里?

ConcurrentHashMap 是 Java 中的一个线程安全的 hashMap 实现,它可以在多线程环境下并发地进行读写操作,而不需要像传统的 hashMap 那样在读写时加锁

ConcurrentHashMap 的实现原理主要基于分段锁和 CAS 操作。它将整个哈希表分成了多个 Segment(段),每个 Segment 都类似于一个小的 HashMap,它拥有自己的数组和一个独立的锁。在 ConcurrentHashMap 中,读操作不需要锁,可以直接对 Segment 进行读取,而写操作则只需要锁定对应的 Segment,而不是整个哈希表,这样可以大大提高并发性能。

# CopyOnWriteArrayList

线程安全的 List ,适合读多写少的场景

在 JDK1.5 之前,如果想要使用并发安全的 List 只能选择 Vector 。而 Vector 是一种老旧的集合,已经被淘汰。 Vector 对于增删改查等方法基本都加了 synchronized ,这种方式虽然能够保证同步,但这相当于对整个 Vector 加上了一把大锁,使得每个方法执行的时候都要去获得锁,导致性能非常低下。

JDK1.5 引入了 Java.util.concurrent (JUC)包,其中提供了很多线程安全且并发性能良好的容器,其中唯一的线程安全 List 实现就是 CopyOnWriteArrayList

对于大部分业务场景来说,读取操作往往是远大于写入操作的。由于读取操作不会对原有数据进行修改,因此,对于每次读取都进行加锁其实是一种资源浪费。相比之下,我们应该允许多个线程同时访问 List 的内部数据,毕竟对于读取操作来说是安全的。

这种思路与 ReentrantReadWriteLock 读写锁的设计思想非常类似,即读读不互斥、读写互斥、写写互斥(只有读读不互斥)。 CopyOnWriteArrayList 更进一步地实现了这一思想。为了将读操作性能发挥到极致, CopyOnWriteArrayList 中的读取操作是完全无需加锁的。更加厉害的是,写入操作也不会阻塞读取操作,只有写写才会互斥。这样一来,读操作的性能就可以大幅度提升。

CopyOnWriteArrayList 线程安全的核心在于其采用了 写时复制(Copy-On-Write) 的策略,从 CopyOnWriteArrayList 的名字就能看出了。

当需要修改( addsetremove 等操作) CopyOnWriteArrayList 的内容时,不会直接修改原数组,而是会先创建底层数组的副本,对副本数组进行修改,修改完之后再将修改后的数组赋值回去,这样就可以保证写操作不会影响读操作了。

# ConcurrentLinkedQueue

线程安全的非阻塞队列

Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue ,非阻塞队列的典型例子是 ConcurrentLinkedQueue ,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出, ConcurrentLinkedQueue 这个队列使用链表作为其数据结构。 ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。

ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁ConcurrentLinkedQueue 来替代。

# BlockingQueue 接口

线程安全的阻塞队列,可用作线程池中的工作队列 workQueue

# 简介

阻塞队列( BlockingQueue )被广泛使用在 “生产者 - 消费者” 问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。

  • 当队列容器已满,生产者线程会被阻塞,直到队列未满
  • 当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止

BlockingQueue 是一个接口,继承自 Queue ,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的实现类:

BlockingQueue 的实现类

下面主要介绍一下 3 个常见的 BlockingQueue 的实现类: ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue

# ArrayBlockingQueue

ArrayBlockingQueueBlockingQueue 接口的有界阻塞队列实现类,底层采用数组来实现。

public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

  • 当队列容量满时,尝试将元素放入队列将导致操作阻塞
  • 尝试从一个空队列中取一个元素也会同样阻塞

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue 。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue 。如果保证公平性,通常会降低吞吐量。

如果需要获得公平性的 ArrayBlockingQueue ,可采用如下代码:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

# LinkedBlockingQueue

LinkedBlockingQueue 底层基于 **单向链表实现的阻塞队列,可以当做无界阻塞队列 ** 也可以当做有界阻塞队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增大,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定大小,容量等于 Integer.MAX_VALUE

相关构造方法:

/**
 * 某种意义上的无界队列
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 有界队列
 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *         than zero
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

# PriorityBlockingQueue

线程安全的 PriorityQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock ,队列为无界队列ArrayBlockingQueue 是有界队列, LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

# ConcurrentSkipListMap

使用跳表实现的 Map

首先要清楚什么是 “跳表”:对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O (logn) 。所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表的本质:同时维护了多个链表,并且链表是分层的最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。跳表内的所有链表的元素都是排序的

2级索引跳表

(2级索引跳表)

从顶级链表开始查找。一旦发现被查找的元素大于当前链表中的所有取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。

如下图所示,在跳表中查找元素 18。查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引对查找效率的提升就会非常明显。

从上面很容易看出,跳表是一种利用空间换时间的算法。

在跳表中查找元素18

(在跳表中查找元素18)

使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap

# Java IO

# 基础知识

Java IO 流的 40 多个类都是从如下 4 个抽象类基类中派生出来的:

  • InputStream :字节输入流
  • OutputStream :字节输出流
  • Reader : 字符输入流
  • Writer : 字符输出流

# 设计模式

从 IO 中能够学习到的设计模式

# 装饰器模式(Decorator)

装饰器模式可以在不改变原有对象的情况下,拓展其功能。通过组合替代继承,来扩展原始类的功能,在一些继承关系比较复杂的场景更加实用。

对于字节流来说, FilterInputStreamFilterOutputStream 是装饰器模式的核心,分别用于增强 InputStreamOutputStream 子类对象的功能。

我们常见的 BufferedInputStream (字节缓冲输入流)、 DataInputStream 等等都是 FilterInputStream 的子类, BufferedOutputStream (字节缓冲输出流)、 DataOutputStream 等等都是 FilterOutputStream 的子类。

# 适配器模式(Adapter)

适配器(Adapter Pattern)模式主要用于接口互不兼容的类的协调工作,你可以将其联想到我们日常经常使用的电源适配器。

适配器模式中存在被适配的对象或者类称为适配者 (Adaptee) ,作用于适配者的对象或者类称为适配器 (Adapter) 。适配器分为对象适配器和类适配器。

  • 类适配器:使用继承关系来实现
  • 对象适配器:使用组合关系来实现

# 与装饰器模式的区别

装饰器模式:更侧重于动态地增强原始类的功能,装饰器类需要跟原始类继承相同的抽象类或者实现相同的接口。并且,装饰器模式支持对原始类嵌套使用多个装饰器。

适配器模式:更侧重于让接口不兼容而不能交互的类可以一起工作,当我们调用适配器对应的方法时,适配器内部会调用适配者类或者和适配类相关的类的方法,这个过程透明的。就比如说 StreamDecoder (流解码器)和 StreamEncoder (流编码器)就是分别基于 InputStreamOutputStream 来获取 FileChannel 对象并调用对应的 read 方法和 write 方法进行字节数据的读取和写入。

# 工厂模式(Factory)

工厂模式用于创建对象,NIO 中大量用到了工厂模式,比如 Files 类的 newInputStream 方法用于创建 InputStream 对象(静态工厂)、 Paths 类的 get 方法创建 Path 对象(静态工厂)、 ZipFileSystem 类( sun.nio 包下的类,属于 java.nio 相关的一些内部实现)的 getPath 的方法创建 Path 对象(简单工厂)。

# 观察者模式

NIO 中的文件目录监听服务使用到了观察者模式。

NIO 中的文件目录监听服务基于 WatchService 接口和 Watchable 接口。 WatchService 属于观察者, Watchable 属于被观察者

Watchable 接口定义了一个用于将对象注册到 WatchService (监控服务) 并绑定监听事件的方法 register

常用的监听事件有 3 种:

  • StandardWatchEventKinds.ENTRY_CREATE :文件创建。
  • StandardWatchEventKinds.ENTRY_DELETE : 文件删除。
  • StandardWatchEventKinds.ENTRY_MODIFY : 文件修改。

# IO 模型

# 何为 IO ?

从应用程序的角度来解读一下 I/O。

根据操作系统相关知识:为了保证操作系统的稳定性和安全性,一个进程的地址空间划分为用户空间(User space)内核空间(Kernel space)

为了限制不同的程序之间的访问能力,防止它们获取别的程序的内存数据,或者获取外围设备的数据,并发送到网络,CPU 划分出两个权限等级:

  • 用户态:只能受限地访问内存,且不允许访问外围设备,占用 cpu 的能力被剥夺,cpu 资源可以被其他程序获取。

  • 内核态:cpu 可以访问内存的所有数据,包括外围设备,例如硬盘,网卡,cpu 也可以将自己从一个程序切换到另一个程序。

最大的区别:权限不同,运行在用户态下的程序不能直接访问操作系统内核数据结构和程序。

像我们平常运行的应用程序都是运行在用户空间,只有内核空间才能进行系统态级别的资源有关的操作,比如文件管理、进程通信、内存管理等等。也就是说,想要进行 IO 操作,一定是要依赖内核空间的能力。并且,用户空间的程序不能直接访问内核空间。当想要执行 IO 操作时,由于没有执行这些操作的权限,只能发起系统调用请求操作系统帮忙完成。

因此,用户进程想要执行 IO 操作的话,必须通过系统调用来间接访问内核空间。

我们在平常开发过程中接触最多的就是磁盘 IO(读写文件)网络 IO(网络请求和响应)

从应用程序的视角来看,应用程序对操作系统的内核发起 IO 调用(系统调用),操作系统负责的内核执行具体的 IO 操作。也就是说,应用程序实际上只是发起了 IO 操作的调用而已,具体 IO 的执行是由操作系统的内核来完成的

当应用程序发起 I/O 调用后,会经历两个步骤:

  1. 内核等待 I/O 设备准备好数据
  2. 内核将数据从内核空间拷贝到用户空间

# 常见的 IO 模型

UNIX 系统下,IO 模型一共有 5 种:

  • 同步阻塞 I/O
  • 同步非阻塞 I/O
  • I/O 多路复用
  • 信号驱动 I/O
  • 异步 I/O

# Java 中 3 种常见 IO 模型

# BIO(Blocking)

BIO 属于同步阻塞 IO 模型,应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间。这种阻塞模型在处理多个并发连接时可能会导致性能瓶颈,因为需要为每个连接创建一个线程,而线程的创建和切换都是有开销的。

图源:《深入拆解Tomcat & Jetty》

# NIO(Non-blocking/New)

# NIO 简介

为了解决 BIO 同步阻塞 IO 模型导致在高并发下的性能瓶颈问题,Java 1.4 引入了 NIO,对应 java.nio 包,提供了 Channel , SelectorBuffer 等抽象。它在标准 Java 代码中提供了非阻塞、面向缓冲、基于通道的 I/O,在高负载、高并发情况下可以使用少量的线程来处理多个连接,大大提高了 I/O 效率和并发。

NIO 可以看作是 I/O 多路复用模型。也有很多人认为,Java 中的 NIO 属于同步非阻塞 IO 模型。


先来看看同步非阻塞 IO 模型。应用程序会一直发起 read 调用,等待数据从内核空间拷贝到用户空间的这段时间里,线程依然是阻塞的,直到在内核把数据拷贝到用户空间。通过 轮询 操作,避免了一直阻塞

但是存在问题:应用程序不断进行 I/O 系统调用轮询数据是否已经准备好的过程是十分消耗 CPU 资源的

图源:《深入拆解Tomcat & Jetty》


这个时候,I/O 多路复用模型 就上场了。线程首先发起 select 调用,询问内核数据是否准备就绪,等内核把数据准备好了,用户线程再发起 read 调用。read 调用的过程(数据从内核空间 -> 用户空间)还是阻塞的。

目前支持 IO 多路复用的系统调用,有 selectepoll 等等。select 系统调用,目前几乎在所有的操作系统上都有支持。

  • select 调用:内核提供的系统调用,它支持一次查询多个系统调用的可用状态。几乎所有的操作系统都支持。
  • epoll 调用:linux 2.6 内核,属于 select 调用的增强版本,优化了 IO 的执行效率。

IO 多路复用模型,通过减少无效的系统调用,减少了对 CPU 资源的消耗

img


# NIO 三大组件

NIO 主要包括以下三个核心组件:

  • Buffer(缓冲区):NIO 读写数据都是通过缓冲区进行操作的。读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。
  • Channel(通道):Channel 是一个双向的、可读可写的数据传输通道,NIO 通过 Channel 来实现数据的输入输出。通道是一个抽象的概念,它可以代表文件、套接字或者其他数据源之间的连接。
  • Selector(选择器):也被称为多路复用器允许一个线程处理多个 Channel。所有的 Channel 都可以注册到 Selector 上,由 Selector 来分配线程来处理事件。

三者的关系如下图所示(暂时不理解没关系,后文会详细介绍):

Buffer、Channel和Selector三者之间的关系

(Buffer、Channel和Selector三者之间的关系)
# Buffer(缓冲区)

在传统的 BIO 中,数据的读写是面向流的, 分为字节流和字符流。

在 Java 1.4 的 ** NIO 库中,所有数据都是用缓冲区处理的**,这是新库和之前的 BIO 的一个重要区别,有点类似于 BIO 中的缓冲流。

  • NIO 在读取数据时,它是直接读取缓冲区中的
  • 在写入数据时,是写入到缓冲区中

Buffer 的子类如下图所示,其中最常用的是 ByteBuffer ,它可以用来存储和操作字节数据。

Buffer 的子类

(Buffer的子类)

可以将 Buffer 理解为数组IntBufferFloatBufferCharBuffer 等分别对应 int[]float[]char[] 等。


为了更清晰地认识缓冲区,我们来简单看看 Buffer 类中定义的四个成员变量

public abstract class Buffer {
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
}

这四个成员变量满足如下的关系:0 <= mark <= position <= limit <= capacity,具体含义如下:

  1. 容量( capacityBuffer 可以存储的最大数据量Buffer 创建时设置且不可改变
  2. 界限( limitBuffer 中可以读 / 写数据的边界
    • 写模式下,表示最多能写入的数据,一般等于 capacity
    • 读模式下,表示 Buffer 中实际写入的数据大小
  3. 位置( position下一个可以被读写的数据的索引。从写操作模式到读操作模式切换的时候(flip), position 都会归零,这样就可以从头开始读写了。
  4. 标记( markBuffer 允许将位置直接定位到该标记处,这是一个可选属性;

另外,Buffer 有读模式和写模式这两种模式,分别用于从 Buffer 中读取数据或者向 Buffer 中写入数据。

  • Buffer 被创建之后默认是写模式,调用 flip() 可以切换到读模式
  • 通过调用 clear() 或者 compact() 方法可以再次切换回写模式

position 、limit 和 capacity 之前的关系

position 、limit 和 capacity 之前的关系


Buffer 对象不能通过 new 调用构造方法创建对象,只能通过静态方法实例化 Buffer

ByteBuffer 为例进行介绍:

// 分配堆内存
public static ByteBuffer allocate(int capacity); 
// 分配直接内存
public static ByteBuffer allocateDirect(int capacity);

Buffer 最核心的两个方法

  1. get() : 读取缓冲区的数据
  2. put() :向缓冲区写入数据

除上述两个方法之外,其他的重要方法:

  • flip() :将缓冲区从写模式切换到读模式,将 limit 的值设置为当前 position 的值,将 position 的值设置为 0。
  • clear() : 清空缓冲区,将缓冲区从读模式切换到写模式,并将 position 的值设置为 0,将 limit 的值设置为 capacity 的值。
  • ……

Buffer 中数据变化的过程

import java.nio.*;
public class CharBufferDemo {
    public static void main(String[] args) {
        // 分配一个容量为 8 的 CharBuffer
        CharBuffer buffer = CharBuffer.allocate(8);
        System.out.println("初始状态:"); 
        printState(buffer); 
        // 向 buffer 写入 3 个字符
        buffer.put('a').put('b').put('c');
        System.out.println("写入3个字符后的状态:");
        printState(buffer);
        // 调用 flip () 方法,准备读取 buffer 中的数据,将 position 置 0,limit 的置 3
        buffer.flip();
        System.out.println("调用flip()方法后的状态:");
        printState(buffer);
        // 读取字符
        while (buffer.hasRemaining()) { 
            System.out.print(buffer.get());
        }
        // 调用 clear () 方法,清空缓冲区,将 position 的值置为 0,将 limit 的值置为 capacity 的值
        buffer.clear();
        System.out.println("调用clear()方法后的状态:");
        printState(buffer);
    }
    // 打印 buffer 的 capacity、limit、position、mark 的位置
    private static void printState(CharBuffer buffer) {
        System.out.print("capacity: " + buffer.capacity());
        System.out.print(", limit: " + buffer.limit());
        System.out.print(", position: " + buffer.position());
        System.out.print(", mark 开始读取的字符: " + buffer.mark());
        System.out.println("\n");
    }
}

输出:

初始状态:
capacity: 8, limit: 8, position: 0
写入3个字符后的状态:
capacity: 8, limit: 8, position: 3
准备读取buffer中的数据!
调用flip()方法后的状态:
capacity: 8, limit: 3, position: 0
读取到的数据:abc
调用clear()方法后的状态:
capacity: 8, limit: 8, position: 0

为了帮助理解,我绘制了一张图片展示 capacitylimitposition 每一阶段的变化。

capacity、limit和position每一阶段的变化

# Channel(通道)

Channel 是一个通道,它建立了与数据源(如文件、网络套接字等)之间的连接。我们可以用它来读取和写入数据,就像打开了一条自来水管,让数据在 Channel 中自由流动。

BIO 中的流是单向的,分为各种 InputStream (输入流)和 OutputStream (输出流),数据只是在一个方向上传输。通道与流的不同之处在于通道是双向的,它可以用于读、写或者同时用于读写。

Channel 与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。

Channel 和 Buffer之间的关系

(Channel 和 Buffer之间的关系)

另外,因为 Channel 是全双工的,所以它可以比流更好地映射底层操作系统的 API。特别是在 UNIX 网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。


Channel子类如下图所示。

Channel 的子类

其中,最常用的是以下几种类型的通道:

  • FileChannel :文件访问通道;
  • SocketChannelServerSocketChannel :TCP 通信通道;
  • DatagramChannel :UDP 通信通道;

Channel继承关系图

(Channel继承关系图)

Channel 最核心的两个方法

  1. read()读取数据并写入到 Buffer 中
  2. write()将 Buffer 中的数据写入到 Channel 中

这里我们以 FileChannel 为例演示一下是读取文件数据的。

RandomAccessFile reader = new RandomAccessFile("/Users/guide/Documents/test_read.in", "r")) 
FileChannel channel = reader.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
# Selector(选择器)

Selector 是 NIO 中的一个关键组件,是基于事件驱动的 I/O 多路复用模型,它允许一个线程处理多个 Channel。其主要运作原理是:

  • 将 Channel 的事件注册在 Selector 上
  • Selector 会轮询注册在它上的 Channel。当某个 Channel 的事件发生时,该 Channel 就处于就绪状态,会被 Selector 轮询出来,并将相关的 Channel 加入就绪集合中
  • 通过 SelectionKey 集合可以获取就绪 Channel 的集合(一个 SelectionKey 对应一个 Channel),然后对这些就绪的 Channel 进行响应的 I/O 操作

Selector 选择器工作示意图

(Selector 选择器工作示意图)

一个多路复用器 Selector 可以同时轮询多个 Channel,由于 JDK 使用了 epoll() 代替传统的 select() 实现,所以它并没有最大连接句柄 1024/2048 的限制。这也就意味着只需要一个线程负责 Selector 的轮询,就可以接入成千上万的客户端


Selector 可以监听四种事件类型

  1. SelectionKey.OP_ACCEPT :表示通道接受连接的事件,这通常用于 ServerSocketChannel
  2. SelectionKey.OP_CONNECT :表示通道完成连接的事件,这通常用于 SocketChannel
  3. SelectionKey.OP_READ :表示通道准备好进行读取的事件,即有数据可读。
  4. SelectionKey.OP_WRITE :表示通道准备好进行写入的事件,即可以写入数据。

Selector 是抽象类,可以通过调用此类的静态方法 open() 来创建 Selector 实例。Selector 可以同时监控多个 SelectableChannelIO 状况,是非阻塞 IO 的核心。

一个 Selector 实例有三种  SelectionKey 集合

  1. 所有的 SelectionKey 集合:代表了注册在该 Selector 上的 Channel ,这个集合可以通过 keys() 方法返回。
  2. 被选择的 SelectionKey 集合:代表了所有可通过 select() 方法获取的、需要进行 IO 处理的 Channel,这个集合可以通过 selectedKeys() 返回。
  3. 被取消的 SelectionKey 集合:代表了所有被取消注册关系的 Channel ,在下一次执行 select() 方法时,这些 Channel 对应的 SelectionKey 会被彻底删除,程序通常无须直接访问该集合,也没有暴露访问的方法。

简单演示一下如何遍历被选择的 SelectionKey 集合并进行处理:

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if (key != null) {
        if (key.isAcceptable()) {
            // ServerSocketChannel 接收了一个新连接
        } else if (key.isConnectable()) {
            // 表示一个新连接建立
        } else if (key.isReadable()) {
            // Channel 有准备好的数据,可以读取
        } else if (key.isWritable()) {
            // Channel 有空闲的 Buffer,可以写入数据
        }
    }
    keyIterator.remove();
}

Selector 还提供了一系列select() 相关的方法

  • int select() 阻塞线程,监控所有注册的 Channel 中是否有需要进行  IO  处理的。如果有,该方法会将对应的 SelectionKey 加入到被选择的 SelectionKey 集合中,并返回这些 Channel 的数量
  • int select(long timeout) :可以设置超时时长的 select() 操作。
  • int selectNow() :执行一个立即返回的 select() 操作,相对于无参数的 select() 方法而言,该方法不会阻塞线程
  • Selector wakeup() :使一个还未返回的 select() 方法立刻返回。
  • ……

使用 Selector 实现网络读写的简单示例:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioSelectorExample {
  public static void main(String[] args) {
    try {
      // 实例化一个用于处理连接事件的 Channel :ServerSocketChannel
      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
      serverSocketChannel.configureBlocking(false);
      serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 设置 Channel 去监听 8080 端口
      // 实例化一个 Selector 对象
      Selector selector = Selector.open();
      // 将 ServerSocketChannel 注册到 Selector 并监听 OP_ACCEPT 事件(通道接受连接)
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
      while (true) { // 轮询
        int readyChannels = selector.select(); // 一直阻塞,直到 Selector 上存在需要进行 IO 处理的 Channel ,立即返回这些 Channel 的数量
        if (readyChannels == 0) {
          continue; // 此时 Selector 上没有需要进行 IO 处理的 Channel,继续轮询
        }
        // 此时 Selector 上有需要进行 IO 处理的 Channel
        Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 获取 SelectionKey 集合,借此可获取就绪的 Channel 集合,从而得以处理它们的 IO 操作
        Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); // 获取 SelectionKey 的迭代器
        while (keyIterator.hasNext()) { // 遍历 SelectionKey
          SelectionKey key = keyIterator.next();
          if (key.isAcceptable()) {
            // 处理连接事件:ServerSocketChannel
            ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取当前 SelectionKey 对应的 Channel
            // 完成连接,返回一个 SocketChannel 对象
            SocketChannel client = server.accept(); 
            client.configureBlocking(false);
            // 将 SocketChannel 对象注册到 Selector 并监听 OP_READ 事件
            client.register(selector, SelectionKey.OP_READ);
          } else if (key.isReadable()) {
            // 处理读事件:SocketChannel
            SocketChannel client = (SocketChannel) key.channel(); // 获取当前 SelectionKey 对应的 Channel
            ByteBuffer buffer = ByteBuffer.allocate(1024); // 新建 Buffer 帮助读取数据
            int bytesRead = client.read(buffer); // 通过 Channel 读取 Buffer 中的数据
            if (bytesRead > 0) {
              buffer.flip(); // 将 Buffer 切换为读模式(position 置为 0,limit 置为当前 Buffer 中的数据大小)
              System.out.println("收到数据:" +new String(buffer.array(), 0, bytesRead));
              // 将 SocketChannel 注册到 Selector 并监听 OP_WRITE 事件
              client.register(selector, SelectionKey.OP_WRITE);
            } else if (bytesRead < 0) {
              // 客户端断开连接
              client.close();
            }
          } else if (key.isWritable()) {
            // 处理写事件:SocketChannel
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.wrap("Hello, Client!".getBytes());
            client.write(buffer); // 通过 Channel 将数据写入 Buffer 中
            // 将 SocketChannel 注册到 Selector 并监听 OP_READ 事件
            client.register(selector, SelectionKey.OP_READ);
          }
          keyIterator.remove(); // 移除当前已处理完毕的 SelectionKey ,准备遍历下一个 SelectionKey 
        }
        // 当前 select () 处理完毕,进入下一次轮询
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

在示例中,我们创建了一个简单的服务器,监听 8080 端口,使用 Selector 处理连接、读取和写入事件。当接收到客户端的数据时,服务器将读取数据并将其打印到控制台,然后向客户端回复 "Hello, Client!"。

# NIO 零拷贝

零拷贝是提升 IO 操作性能的一个常用手段,像 ActiveMQ、Kafka 、RocketMQ、QMQ、Netty 等顶级开源项目都用到了零拷贝。

零拷贝是指计算机执行 IO 操作时,CPU 不需要将数据从一个存储区域复制到另一个存储区域,从而可以减少 CPU 拷贝、上下文切换的时间。也就是说,零拷贝主主要解决操作系统在处理 I/O 操作时频繁复制数据的问题。零拷贝的常见实现技术有: mmap+writesendfilesendfile + DMA gather copy

下图展示了各种零拷贝技术的对比图:

CPU 拷贝DMA 拷贝上下文切换系统调用
传统方法 (read + write)224read+write
mmap + write124mmap+write
sendfile122sendfile
sendfile + DMA gather copy022sendfile

可以看出,无论是传统的 I/O 方式,还是引入了零拷贝之后,2 次 DMA(Direct Memory Access)拷贝是都少不了的。因为两次 DMA 都是依赖硬件完成的。零拷贝主要是减少了 CPU 拷贝及上下文的切换。

Java 对零拷贝的支持:

  • MappedByteBuffer :是 NIO 基于 ** 内存映射( mmap )** 这种零拷⻉⽅式的提供的⼀种实现,底层实际是调用了 Linux 内核的 mmap 系统调用。它可以将一个文件或者文件的一部分映射到内存中,形成一个虚拟内存文件,这样就可以直接操作内存中的数据,而不需要通过系统调用来读写文件。

  • FileChannel :其 transferTo()/transferFrom() 是 NIO 基于 ** 发送文件( sendfile )** 这种零拷贝方式的提供的一种实现,底层实际是调用了 Linux 内核的 sendfile 系统调用。它可以直接将文件数据从磁盘发送到网络,而不需要经过用户空间的缓冲区

    关于 FileChannel 的用法可以看看这篇文章:Java NIO 文件通道 FileChannel 用法

代码示例:

private void loadFileIntoMemory(File xmlFile) throws IOException {
  FileInputStream fis = new FileInputStream(xmlFile);
  // 创建 FileChannel 对象
  FileChannel fc = fis.getChannel();
  // FileChannle.map () 将文件映射到直接内存并返回 MappedByteBuffer 对象
  MappedByteBuffer mmb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
  xmlFileBuffer = new byte[(int)fc.size()];
  mmb.get(xmlFileBuffer);
  fis.close();
}
# 小结

这篇文章我们主要介绍了 NIO 的核心组件、零拷贝。

如果我们需要使用 NIO 构建网络程序的话,不建议直接使用原生 NIO,编程复杂且功能性太弱,推荐使用一些成熟的基于 NIO 的网络编程框架比如 Netty 。Netty 在 NIO 的基础上进行了一些优化和扩展,比如支持多种协议、支持 SSL/TLS 等等。

# AIO(Asynchronous)

Java 7 中引入了 NIO 的改进版 AIO(即 NIO 2),它是异步 IO 模型。基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

目前来说 AIO 的应用还不是很广泛。Netty 之前也尝试使用过 AIO,不过又放弃了。这是因为,Netty 使用了 AIO 之后,在 Linux 系统上的性能并没有多少提升。

img

# 小结

BIO、NIO 和 AIO 对比

(BIO、NIO 和 AIO 对比)

# 【面试题】NIO 是如何实现同步非阻塞的?主线程是只有一个吗?

NIO 底层是用 SelectorChannelBuffer 来实现的。主线程在循环调用 select() 方法进行阻塞等待,当有 acceptable 、 readable 或者 writable 事件发生的时候,循环就会往下走,将对应的事件交给对应的事件处理器进行处理。

它可以多线程的,可以有多个 accept () 线程和多个 worker 线程。

补充:

在 NIO 中,使用了多路复用器 Selector 来实现同步非阻塞的 IO 操作。 Selector 可以监控多个 Channel 是否需要 IO 处理,当一个或多个 Channel 准备好读或写时,Selector 会通知程序进行读写操作,而不像 BIO 一样阻塞等待 IO 操作完成。

在 NIO 中,主线程通常只有一个,但是可以使用 Selector 来管理多个 Channel,实现多个连接的非阻塞读写操作。当有多个 Channel 需要进行 IO 操作时,Selector 会轮询这些 Channel,检查它们的状态是否可读或可写。如果有可读或可写的 Channel,就将其加入到一个已选择键集合中,等待程序处理。这样,一个线程就可以同时处理多个 Channel,提高了系统的并发处理能力。

# 【面试题】BIO/NIO/AIO 的区别?

BIO(同步阻塞 IO):在进行 IO 操作时,必须等待 IO 操作完成后才能进行下一步操作,这时线程会被阻塞。适用于连接数比较小且固定的架构,由于线程阻塞等待 IO 操作,所以并发处理能力不强

NIO(同步非阻塞 IO):支持多个连接同时进行读写操作,因此可以用较少的线程来处理大量的连接。NIO 通过 Selector 来监听多个 Channel 的状态,当 Channel 中有数据可读或可写时,Selector 会通知程序进行读写操作。适用于连接数多且连接时间较短的场景。

AIO(异步非阻塞 IO):与 NIO 不同的是,AIO 不需要用户线程等待 IO 操作完成,而是由操作系统来完成 IO 操作,操作系统完成 IO 操作后会利用回调机制通知用户线程处理。适用于连接数较多且连接时间较长的场景,如高性能网络服务器等。

image-20231025210135977

更新于 阅读次数