并发编程之JUC篇
在笔者看来,后端的技术点无非两个,一个是并发一个是IO。其中并发涉及很多知识点:线程安全、锁、线程池等等。想了解它们是什么吗?想知道它们是怎样实现的吗?想自己手写一个锁,写一个线程池吗?本篇笔记就带你走进Java最经典的内容——JUC。
本篇笔记主要讲的是JUC源码,笔者详细的讲解JUC中核心类的源码,其中基于的JDK版本为
1.8.0_291
。另外,本篇笔记部分内容参考自《深入浅出 Java Concurrency》
1. CAS
1.1 原子操作
假设疫情期间,你们老板要做一款微信扫描场所码的功能,其中有个功能需要你统计一下一天共扫了多少次码。
作为资深程序员的你很快就写好了代码:
public class ScanCode {
private int count = 0;
//注:打印语句打印出的并非是当前线程加过后的count值,这两句也并非原子操作,这里加打印语句只是方便展示问题
public void scanCode(){
count++;
System.out.println("当前扫码:"+count);
}
}
这段代码在单线程下没有任何问题,但是如果多线程一起请求就会有想不到的错误,我们假设当前有1000个用户一起来扫码:
public static void main(String[] args) {
ScanCode scanCode = new ScanCode();
Thread[] threads = new Thread[1000];
for(int i = 0; i < 1000; i++){
threads[i] = new Thread(() -> scanCode.scanCode());
}
for(Thread thread:threads){
thread.start();
}
}
打印结果中最大的一条==“当前扫码”==的记录往往是小于1000的,且每次运行得到的结果也不相同。
这是为什么呢?
首先我们需要知道现代计算机为了弥补CPU与存储介质的速度差异,往往采用多级缓存的架构。一般CPU在做计算的时候会将内存中的数据加载到寄存器,在寄存器中对数据操作计算后,再写回到内存。如果在这之间发生中断,进行了线程上下文切换,会有可能发生意想不到的情况。我们模拟两个线程A,B的执行来解释这一情况:
OS | 线程A | 线程B |
---|---|---|
从内存中取count,此时count=50,将50写入寄存器 | ||
在寄存器中做++操作,此时寄存器中的值是51 | ||
发生中断,保存线程A状态,恢复线程B执行 | ||
从内存中取count,此时count=50,将50写入寄存器 | ||
在寄存器中做++操作,此时寄存器中的值是51 | ||
将51写回内存,此时内存中count=51 | ||
发生中断,保存线程B状态,恢复线程A执行 | ||
将寄存器中的值写回内存,此时count=51 |
可以看到,我们扫了两次码,但count只加了1。
根本原因在于count++
这一操作不是原子操作。那么什么是原子操作?《Java并发编程实战》一书中给出了定义,但描述的依然生涩难懂。笔者在此简单解释下:所谓原子操作就是不可再切分的操作,不能在运行期间插入别的任务的,一个要么一口气执行完要么不执行的操作。
以上面的线程AB例子来看:在执行count++的过程中,需要做如下几件事:
- 从内存中将数据加载到寄存器
- 在寄存器中做操作
- 将值写回内存
上述任意一步之间均可被中断,此时就不是一个原子操作。
1.2 CAS
我们需要一个超级指令,当这个指令执行时,它会像期望那样执行更新。它不能在指令中间中断,如果发生中断,指令根本没有运行,或者运行完成,没有中间状态。
而这一超级指令是由现代CPU实现的(注意是硬件CPU,而非软件层)。
现代CPU提供了一个叫CAS的功能,CAS全称compare and swap,即比较并交换,其用伪代码大概如下:
int compareAndSwap(int * ptr, int expected, int new){
int actual = *ptr;
if(actual == expected){
*ptr = new;
}
return actual;
}
基本思路是检测 ptr 指向的值是否和 expected 相等;如果是,更新 ptr 所指的值为新值。否则,什么也不做。不同的是这一过程会原子的执行。
CAS有一个高频的面试问题——ABA问题:
比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。
对于ABA问题,也有一些解决方案,比较常见的一种是加版本号,也即每次更改都对应一个唯一的版本号,通过判断前后两次版本号是否一致来判断当前值是否与expected相等。
CAS的思想异常重要,说它是Java并发的基石也不为过。整个JUC都是建立在CAS上的。如果你还不了解CAS有什么用或者该怎么用,没关系,我们下面很快会讲到。
1.3 整体认识
上图为JUC的整体内容,我们将围绕原子类,锁,集合和线程池来讲解。
2. 原子类型
原子类型是JUC并发中一个相对简单的知识点,它会加强我们对CAS的理解,同时又是后续的锁,线程池等知识点的基础。
2.1 AtomicInteger
在第一章中,我们以一个count++
的例子引出了原子操作和CAS。很多时候我们往往就是需要一个线程安全的count++的方案,并且这个方案要足够的简单高效。首先加锁可以排除掉,因为一般情况下我们认为加锁这个操作太重了,不符合简单高效的思想,那能不能借助CAS呢?
答案是肯定的,在Java中,AtomicInteger
就是这一实现。我们先看下AtomicInteger
是如何做到线程安全的累加的
public class ScanCode {
private AtomicInteger count = new AtomicInteger();
public void scanCode(){
int thisCount = count.incrementAndGet();
System.out.println("当前扫码:"+thisCount);
}
}
public static void main(String[] args) {
ScanCode scanCode = new ScanCode();
Thread[] threads = new Thread[1000];
for(int i = 0; i < 1000; i++){
threads[i] = new Thread(() -> scanCode.scanCode());
}
for(Thread thread:threads){
thread.start();
}
}
这个方法无论多少线程测试多少遍结果都是正确的。
其中incrementAndGet()
就是线程安全的count++操作。那么AtomicInteger是如何实现的呢?
public class AtomicInteger {
private static final long valueOffset;
//内部维持一个数字,这就是我们的count值
private volatile int value;
//这段代码看不懂没关系,你只需要知道valueOffset和this(这个对象)加起来可以原子的获取到value的值就可以了
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public final int incrementAndGet() {
//调用getAndAddInt操作
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//getAndAddInt会以一个do while的形式不断调用CAS操作,直到某一次执行成功为止
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
}
上述代码中最复杂的应该是getAndAddInt
函数。首先我们先回顾一下第一章中CAS的伪代码
int compareAndSwap(int * ptr, int expected, int new){
int actual = *ptr;
if(actual == expected){
*ptr = new;
}
return actual;
}
这段代码与getAndAddInt
中的this.compareAndSwapInt(var1, var2, var5, var5 + var4)
做个对比,其中:
var1和var2对应上述伪代码的*ptr,var5对应的是expected,var5+var4对应的是new。
另外getAndAddInt
中的getIntVolatile
是获得当前value的值。
有了这些概念后我们再来看getAndAddInt
函数。我们首先得到当前count的值,将它赋值给var5,然后由于传入的var4是1,因此上述代码可以描述为:
- 获得当前value的值
- 如果刚才获得的value的值与我们现在的value的值相等,那么就将value更新为value+1,结束执行。
- 如果不相等,那么就重新回到步骤1执行。
do while就是一个不断尝试,如果某次执行不成功,那肯定代表在执行getIntVolatile
与compareAndSwapInt
之间有其他线程修改了value的值,此时我们就需要重新获取最新的value值,在此基础上再做线程安全的value++操作。
CAS与循环基本是成对出现的,因为我们知道CAS只保证了原子的执行一个操作,但不保证一定执行成功,此时就需要重复执行,直到某次执行成功为止。单次不循环的CAS基本没什么意义。
另外Java中的CAS是通过JNI实现的,底层是C++,因此上述代码中的getIntVolatile
与compareAndSwapInt
都是native方法,对于native方法,本笔记不做更多说明。
public native int getIntVolatile(Object var1, long var2);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
了解了AtomicInteger#incrementAndGet
之后其实基本就懂了JUC下所有的原子类型操作的底层,除了incrementAndGet
,AtomicInteger
还支持如下常用方法:
//以原子方式将给定值与当前值相加。 实际上就是等于线程安全版本的i =i+delta操作。
int addAndGet(int delta)
//如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。 如果成功就返回true,否则返回false,并且不修改原值。
boolean compareAndSet(int expect, int update)
//以原子方式将当前值减 1。 相当于线程安全版本的--i操作。
int decrementAndGet()
//获取当前值。
int get()
//以原子方式将给定值与当前值相加。 相当于线程安全版本的t=i;i+=delta;return t;操作。
int getAndAdd(int delta)
// 以原子方式将当前值减 1。 相当于线程安全版本的i--操作。
int getAndDecrement()
//以原子方式将当前值加 1。 相当于线程安全版本的i++操作。
int getAndIncrement()
//以原子方式设置为给定值,并返回旧值。 相当于线程安全版本的t=i;i=newValue;return t;操作。
int getAndSet(int newValue)
//以原子方式将当前值加 1。 相当于线程安全版本的++i操作。
int incrementAndGet()
这其中的原子操作都是通过CAS来完成的,我想当你看到方法名的时候就已经猜到了底层大概的实现逻辑了。
另外除了AtomicInteger
,JUC还支持AtomicLong
,AtomicReference
与AtomicBoolean
等类型,其实现与AtomicInteger
基本相似,此处不再做介绍,感兴趣的可以自行查看JDK的源码。
2.2 数组的原子操作
以AtomicIntegerArray
为例,我们看看其常用的API:
//将索引i处的值设置为newValue
void set(int i, int newValue)
//返回索引i处的值并将其设置为newValue
int getAndSet(int i, int newValue)
//原子的方式,如果索引i处的值是expect,就将索引i处的值设置为update
boolean compareAndSet(int i, int expect, int update)
//原子的方式,返回索引i处的值并做++操作
int getAndIncrement(int i)
//原子的方式,返回索引i处的值并做--操作
int getAndDecrement(int i)
//原子的方式,返回索引i处的值并做+delta操作
int getAndAdd(int i, int delta)
//原子的方式,将索引i处的值做++操作并返回
int incrementAndGet(int i)
//原子的方式,将索引i处的值做--操作并返回
int decrementAndGet(int i)
//原子的方式,将索引i处的值并做+delta操作并返回
int addAndGet(int i, int delta)
其底层实现与AtomicInteger
基本相似,只不过包了一层数组,需要做数组越界的校验,我们以incrementAndGet
为例来看下源码:
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}
//此处delta为1,代表+1操作(++)
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
//校验数组越界
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
//这个函数我们在AtomicInteger时讲过了,一模一样,就是循环加CAS,原子的操作,直到成功
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
JUC下的AtomicLongArray
与AtomicReferenceArray
实现也基本相同(核心都是CAS),不再赘述。
2.3 引用的原子操作
引用的原子操作难点不在于源码,而在于怎么用。我们先以一个例子来说明:
还是第一章中微信扫码的例子,你需要统计一下一天共扫了多少次码,我们假设你没有使用AtomicInteger类型,就是使用了int类型,且现在不允许将int改为AtomicInteger,但你还是想高效简单的线程安全的做count++操作,可以吗?
public class ScanCode {
int count = 0;
}
可以,此时就需要引用的原子操作,所谓引用的原子操作就是对一个类里的某一个引用来进行原子的修改,拿我们本例来说就是对ScanCode
类里的count
值做原子的++操作。
public class ScanCode {
volatile int count;
AtomicIntegerFieldUpdater<ScanCode> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(ScanCode.class, "count");
public void scanCode(){
int newCount = fieldUpdater.incrementAndGet(this);
System.out.println("当前扫码:"+newCount);
}
}
可以看到,我们使用了AtomicIntegerFieldUpdater
类型对原来的int类型做了包装,并利用这个包装类来做++操作,看到这里很多同学基本也猜到了其底层其实就是反射,通过传入class和fieldName,反射的获取属性然后原子的更改属性的值。
但是引用的原子操作对类中的引用字段有一些要求:
- 字段必须是volatile类型的
- 字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
- 只能是实例变量,不能是类变量,也就是说不能加static关键字。
- 只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
- 对于
AtomicIntegerFieldUpdater
和AtomicLongFieldUpdater
只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater
2.4 volatile
其实如果你观察的够仔细的话会发现我们一直在使用volatile
关键字,但却未讲解其是什么,以及为什么要用。由于volatile的讲解涉及到了Java的内存模型,笔者虽看过些许书籍但依然无法理解透,因此对于volatile,本文档也只是简单的说一下,后续笔者会再补上对volatile更详细的说明。
volatile主要解决了多线程间可见性这一问题,什么是可见性?一个线程修改了变量,另一个线程也需要立马能够看到这个变量的修改就是可见性。那为什么不加volatile就是不可见的?
首先Java的内存模型规定,每个线程在运行的时候都会缓冲一份堆内存的部分字段的值,因此线程修改堆内存都是先保存到线程栈内存,然后再刷新到堆,这就会存在短暂的不可见性。比如一个线程已经修改了堆内存的值,但由于还未刷新到堆,且其他线程未感知到,因此其他线程还持有的是旧的值。
一种典型的场景如下:
public class VolatileTest {
boolean flag = true;
}
public void test() {
VolatileTest volatileTest = new VolatileTest();
new Thread(()->{
while (volatileTest.flag){
//do something
}
}).start();
new Thread(()->{
volatileTest.flag = false;
}).start();
}
上述代码虽然线程2修改了volatileTest.flag
的值,但很有可能线程1永远都不会停止,因为线程2的修改对线程1不可见。
volatile就解决了这一问题,每个线程的修改都会立马被其他线程看到。
另外volatile不同于锁,其只保证了可见性,未保证原子性,但锁可以保证原子性和可见性。
那么volatile适合什么场景下呢?
- 对变量的写操作不依赖于当前值(每次操作都是幂等操作)。
- 只有一个线程在修改这个值,其余线程都是读。
如果存在一个线程修改,其他线程读的情况,就可以只使用volatile关键字,而无需加锁,但是如果是多线程的修改(前提是非幂等的修改),为保证线程安全性就需要加锁。
3. 锁
锁是JUC中最核心的内容,在本章我们将讲解JUC下常见的锁,以及介绍下著名的AQS框架。
在讲解源码前,我们先来介绍一些基本的但常见的概念。
锁是什么?解决了什么问题?
在多线程下,我们希望有些代码是线程安全的执行的,我们将这样的代码行叫临界区。锁保证了临界区的代码可以像单条指令一样原子的执行,当某个线程持有锁的时候,就可以执行临界区的代码,其他线程此时是不可以的,只有这个持有锁的线程释放锁才能继续由下一个线程执行。
悲观锁,乐观锁
悲观锁:每次都以悲观的态度假设最坏的情况,每次拿数据的时候都认为别人会修改,所以每次在操作前会先上锁,操作后再将锁释放,Java中的
synchronized
和ReentrantLock
都是悲观锁。乐观锁:每次都是乐观的态度假设当前是最好的情况,每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在修改的时候会判断别人有没有修改这个数据(通过CAS和版本号实现),Java中的
AtomicInteger
就是乐观锁的一种实现,依赖了CAS。我们一般也认为悲观锁是阻塞锁,而乐观锁是非阻塞锁。非阻塞指的是不阻塞其他线程,即自己可以失败可以阻塞挂起,但不能因为自己的失败和挂起影响其他线程导致其他线程失败挂起,或者说自己在执行临界区代码的时候不影响别人的执行。
很多乐观锁的实现都依赖于CAS,但CAS本质是没有锁的,虽然叫乐观锁但它并没有锁。CAS只是通过比较和替换的方式,只有比较之后符合要求才进行替换,其中比较和替换是个原子操作,这其中并没有上锁以及并不会干扰任何其他线程,通常CAS都是自旋的,不断的尝试比较和替换,所以这里可以看到CAS是非阻塞锁,他的执行并不会干扰其他线程。由于CAS没有加锁,所以需要通过CAS同步修改的变量都应该使用volatile修饰符,这样保证变量的修改可以被其他线程看到。
通常而言乐观锁应用于写少读多的情况,悲观锁应用于写多读少的情况。
可重入锁,不可重入锁
所谓可重入锁就是如果一个线程持有了锁,那么它可以重复执行需要这种锁的临界区代码,可以进行重复加锁。反之为不可重入锁。
如:
public void test(){ lock.lock(); try { while (x > 10){ x--; test(); } }finally { lock.unlock(); } }
独占锁,共享锁
所谓独占锁和共享锁是对资源而言的,如果某个资源只能同时被一个线程访问,那么就需要使用独占锁。而如果可以被所有线程一起访问,那么就可以使用共享锁。其中独占锁又叫互斥锁。大多数情况下我们在说锁的时候都是默认独占锁。
公平锁,非公平锁
如果获取一个锁是按照请求的顺序得到的,那么就是公平锁,否则就是非公平锁。
3.1 Lock与AQS
Java下的Lock接口定义如下:
public interface Lock {
//获取锁,如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
Condition newCondition();
}
JUC下大部分锁的实现均是继承自该接口,其中Lock比较重要的两个函数是lock
和unlock
,我们在讲具体实现类的时候也主要讲这两个接口的实现。
在讲JUC下锁的实现类前,我们先想下如果让你设计一个锁,实现lock()
方法,你需要做什么?
首先要判断是否允许当前线程获取锁,对于是否允许获取锁,得有一个状态位,这个状态位的修改必须是线程安全的。其次如果允许获取锁,就获取,否则就阻塞等待或者直接获取失败。若线程需要阻塞等待,那么就需要一个阻塞等待的队列。最后阻塞的时候,线程往往需要挂起不消耗资源。当状态位由不允许获取变更到允许获取时,得唤醒队列里的线程,并且线程得从阻塞队列中移除出去。
综上我们发现lock()
方法需要以下条件:
- 一个标志锁同步状态的原子操作的状态位。
- 一个有序的队列,方便管理所有阻塞等待的线程。
- 线程未获得锁,等待时可能需要挂起,当可以获得锁时再唤醒等待的线程。即线程的挂起和唤醒功能。
而这便是AQS框架的核心。AQS全称AbstractQueuedSynchronizer
,它是JUC中最复杂最核心的一个类,后面我们讲到的CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore等都是在其基础上实现的。
在AQS中锁的同步状态是通过volatile int
来实现的
public abstract class AbstractQueuedSynchronizer{
//...
private volatile int state;
//...
}
看到volatile的时候很多同学应该也明白这个状态位线程安全的修改都是依赖CAS。
AQS中提供了如下方法来访问状态位:
//获得状态位
protected final int getState()
//直接修改状态位
protected final void setState(int newState)
//通过CAS线程安全的修改状态位
protected final boolean compareAndSetState(int expect, int update)
线程的挂起和唤醒通过LockSupport
类来实现的,LockSupport
底层是JNI,其性能比Java原本的线程挂起和唤醒效率更高。LockSupport的API很简单:
//挂起当前线程
public static void park()
//唤醒当前线程
public static void unpark(Thread thread)
而阻塞队列在AQS中使用CLH队列的变体,简单来讲就是一个FIFO的包含头尾节点的双向的队列。队列节点主要包含属性如下:
class Node {
//节点的等待状态,主要包含SIGNAL/CANCELLED/CONDITION/PROPAGATE等状态
volatile int waitStatus;
//前置节点
volatile Node prev;
//后置节点
volatile Node next;
//绑定的线程
volatile Thread thread;
//主要用于Condition Queue,这里不讲
Node nextWaiter;
}
同时AQS持有队列的头尾节点:
public abstract class AbstractQueuedSynchronizer{
//...
private transient volatile Node head;
private transient volatile Node tail;
//...
}
有了这些基础后,我们就可以简单的实现一个锁了,我们使用AQS框架,模拟一下lock()
的简单实现
- 调用lock()方法,通过锁的同步状态位判断当前线程是否允许获得锁,如果允许获取就结束,否则进入步骤2
- 由于不允许获取锁,将当前线程封装为Node节点,加入到CLH队列,判断当前线程是否需要挂起等待
- 如果需要挂起等待就调用LockSupport.park()方法挂起当前线程,否则就持续不断的尝试获取锁。
而unlock()
流程也很简单:
- 调用unlock,释放锁,修改锁的状态位
- 如果后继节点是挂起等待的,就将后继节点唤醒,让他继续重新尝试获取锁。
看,借助AQS我们已经很轻松的实现了一个锁,后面我们讲的所有不同锁的实现类都与上述流程基本相同。另外贴一下AQS框架的整体架构图:
上图中有颜色的为Method,无颜色的为Attribution。
我们将AQS框架分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。对于自定义的锁继承AQS的时候只需要重写第一层的方法即可,不需要关心底层的实现逻辑。自定义锁对于加锁和释放锁会进入第二层,如果锁获取失败就会进入第三和第四层对等待队列 进行处理,而这些处理方式均依赖于第五层的基础数据提供层。
如果你还是不太了解AQS,没关系,看完下面的实现类我想你会对AQS有更清晰的认识。
3.2 ReentrantLock
ReentrantLock是JUC提供的一个可重入锁和独占锁。其使用场景大致如下:
public class LockDemo{
private int value;
private Lock lock = new ReentrantLock();
public int get(){
this.lock.lock();
try {
return this.value;
}finally {
this.lock.unlock();
}
}
public void set(int value){
this.lock.lock();
try {
this.value = value;
}finally {
this.lock.unlock();
}
}
public int getAndSet(int value){
this.lock.lock();
try {
int temp = this.value;
this.value = value;
return temp;
}finally {
this.lock.unlock();
}
}
public boolean compareAndSet(int expect,int update){
this.lock.lock();
try {
if(value==expect){
value = update;
return true;
}else {
return false;
}
}finally {
this.lock.unlock();
}
}
public int getAndIncrement(){
this.lock.lock();
try {
return this.value++;
}finally {
this.lock.unlock();
}
}
public int getAndDecrement(){
this.lock.lock();
try {
return this.value--;
}finally {
this.lock.unlock();
}
}
public int incrementAndGet(){
this.lock.lock();
try {
return ++this.value;
}finally {
this.lock.unlock();
}
}
public int decrementAndGet(){
this.lock.lock();
try {
return --this.value;
}finally {
this.lock.unlock();
}
}
}
可以看到我们通过锁实现了一个AtomicInteger
。
3.2.1 lock
下面我们就重点分析一下ReentrantLock加锁的原理
public class ReentrantLock implements Lock{
private final Sync sync;
}
ReentrantLock内部只有一个有意义的属性sync,其中Sync类是其内部类,这是个抽象类,继承自AQS
,该类有两个实现类分别是FairSync
和NonfairSync
,即公平锁和非公平锁。
默认的new ReentrantLock
构造方法构造的是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
非公平锁的加锁会比公平锁稍微复杂一丢丢,为了源码流程的方便,我们先看公平锁的加锁过程:
当调用lock.lock()
时,本质上调用的是sync.lock()
public void lock() {
sync.lock();
}
我们现在看公平锁,因此调用的是公平锁的lock()
static final class FairSync extends Sync {
//...
final void lock() {
acquire(1);
}
//...
}
因此核心就是acquire(1)
方法,这个方法是AQS实现的:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法实际做了四个操作:
- tryAcquire(1) 尝试获取锁,如果能获得就直接返回,否则进行步骤2
- addWaiter(Node.EXCLUSIVE) 将当前线程封装成Node节点,并将节点加入到CLH队列的最尾端
- acquireQueued() 自旋尝试获得锁,如果获得不了,就判断自己是否应park,如果应该就park,否则就一直尝试获取锁
- selfInterrupt()设置中断位
下面我们会一个方法一个方法的看源码:
tryAcquire由FairSync实现,其源码如下:
//tryAcquire方法用于尝试获取锁
protected final boolean tryAcquire(int acquires) {
//得到当前请求线程
final Thread current = Thread.currentThread();
//获得当前AQS的状态(其实也是锁的状态,即前面说的一个状态位标识当前是否允许获取锁)
int c = getState();
if (c == 0) { //当前锁没被占用,可以获取锁
//如果阻塞队列确实是空(或头结点是当前线程),并且将锁的状态设置为1成功后(即更改锁的状态,避免其他线程进来)
//这里对于状态值的修改是通过CAS实现的
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//那么就将当前线程设为当前正在独占(运行)的线程,即AQS中会有个exclusiveOwnerThread字段,用于存储独占锁当前正在运行的线程
//exclusiveOwnerThread这个字段用于下面的可重用线程
setExclusiveOwnerThread(current);
//此时线程获取锁成功,返回true,即可以获得锁
return true;
}
}
//否则如果当前获取锁的线程是当前线程的话,那么也允许获取锁,可见这是一个可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //对于重复获取锁的线程,将状态位+1
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//这里对于状态值得修改不需要加锁或CAS,因为能走到这里,肯定正在运行的线程是当前线程,必不可能有其他线程进来一起修改这个值
setState(nextc);
//可重入锁也返回true,代表获取锁成功
return true;
}
//否则返回失败,代表获取锁失败
return false;
}
//这个方法用于判断当前线程是否满足获取锁
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
//翻译过来即 如果头尾不相等并且(头的下一个节点是空或者头结点的下一个节点不等于当前线程) 则返回true
//这样不利于理解,我们对这个逻辑整体取反,即如果(头尾相等) || (头节点的下一个节点不是空 并且头结点的下一个节点是当前线程)
//这就很好理解了
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
addWaiter源码如下:
//会执行这个函数意味着获取锁失败了,那么就开始将当前线程加入CLH队列,排队等待获取锁
//将当前线程封装为Node对象(Node对象即为我们之前说的CLH队列每个元素的类型),并将其加入到CLH队列的队尾,并返回构造的这个Node对象
private Node addWaiter(Node mode) {
//这里的传参mode是Node.EXCLUSIVE,表示是独占模式
//在new Node中,第一个参数被赋予了thread字段,第二个参数被赋予了nextWaiter字段,也即认为独占锁的nextWaiter是空的,也即独占锁不具有条件
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果当前队列的尾节点不等于空
if (pred != null) {
//那么就让我们之前构造的节点(该节点盛装当前线程)的pre指针指向队列尾节点
node.prev = pred;
//同时通过CAS将当前节点设为新的队列尾节点
if (compareAndSetTail(pred, node)) {
//成功后将之前尾节点的next指针指向当前节点
pred.next = node;
//完全完成后当前节点已经是尾节点,将构造的当前节点返回
return node;
}
}
//否则,这里的否则包括:
//1.尾节点等于空
//2.尾节点不等于空但是CAS操作将当前节点设置为尾节点失败了
//那么就执行enq函数,enq函数做的也是将当前节点置为尾节点
enq(node);
return node;
}
//在将当前节点设置为尾节点失败后,通过enq函数,保证将当前节点设为尾节点一定要成功
//首先将当前节点设为尾节点失败有两个原因:
//1.队列是空的(尾节点不存在)
//2.队列不为空,但CAS操作将当前节点设为尾节点失败了,一般是有其他线程在抢
//针对第一个问题就创建一个队列(创建一个头尾傀儡节点),这里为了保证创建成功会死循环一直创建
//针对第二个问题 也是无限尝试,直到通过CAS操作尝试成功。
private Node enq(final Node node) {
//死循环,无限尝试,直到成功
for (;;) {
//获得当前尾节点
Node t = tail;
if (t == null) {
//如果尾节点是空的话(代表整个CLH队列是空的不存在的或还没初始化的)
//那么就创建一个无意义节点,我们将这个无意义节点称之为傀儡节点
//通过CAS操作将傀儡节点设为CLH队列的头
if (compareAndSetHead(new Node()))
//并让尾节点也等于头
//此时完成了CLH的初始化,即只有一个无意义的傀儡节点,这个节点既是队列头也是队列尾巴
tail = head;
} else {
//否则
//一旦进入了这个else语句代表尾节点不再是空,也即CLH完成了初始化,如果初始化未成功(因为是CAS操作很可能不成功),那么就死循环初始化直到成功,成功后才能进入else
//将当前节点的pre指针指向队列的尾节点
node.prev = t;
//CAS操作设置当前节点为队列尾节点(如果不成功,死循环操作,无限尝试,直到成功)
if (compareAndSetTail(t, node)) {
//之前队列节点的next指针
t.next = node;
return t;
}
}
}
}
这里需要说明一下:
在初始的时候AQS内的CLH队列是没被创建的,只有需要的时候才会被创建。另外我们知道AQS具有队列的头尾指针,其中这个头指针指向的是个傀儡节点,永远一直都是傀儡节点。也即初始的时候,会创建一个无意义节点,这时头指针和尾指针都指向这个无意义节点,当有Node入队的时候,就将tail.next = node; tail = node;
,替换新的尾节点,但此时头节点的指向依然没有变,还是无意义节点。因此真正的头节点其实是head.next
,头节点是假的这个信息十分重要,在前面的hasQueuedPredecessors
函数其实就已经使用到了这一信息,后面我们还会频繁的用到,希望大家能在此处就记住。
acquireQueued源码如下:
//走到这里意味着线程获取锁失败,并且线程已经封装为节点,当前节点已经在CLH队列最尾端了
//参数node即addWaiter方法返回的节点,也即封装了当前线程的节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志位
boolean interrupted = false;
//无限尝试
for (;;) {
//获得当前节点的前置节点(即pre指针指向的节点)
final Node p = node.predecessor();
//如果当前节点的前置节点是头结点,那么就尝试获取锁
//这里我们又用到了头节点是傀儡节点这一信息
//如果当前节点的前置节点是头结点,那么就可以认为当前节点是真正的头结点,也即队列的第一个元素
//那么既然是队列的第一个元素,就具有获取锁的资格,因此执行tryAcquire函数获取锁
if (p == head && tryAcquire(arg)) {
//锁获取成功后,将当前节点设为头节点(新的傀儡节点,因为当前节点已经获得锁了,所以这个节点本身已经没有意义了,只是傀儡节点的意义了。)
setHead(node);
//将之前的头节点释放掉
p.next = null; // help GC
failed = false;
return interrupted;
}
//走到这里意味着当前节点不是真实头结点或当前节点是真实头结点但获取锁失败
//判断当前节点是否应该挂起,如果应该挂起就挂起当前节点并检查当前节点的中断位
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//理论上如果线程获得了锁,那么failed就必然是false,如果线程没有获得锁,那么就应该for死等待获得锁
//会进入到这里,就说明线程没有获得锁并且被中断了,此时我们就需要更改节点的状态并从CLH队列中移除
cancelAcquire(node);
}
}
//判断当前节点是否应该挂起,如果需要挂起就返回true,否则返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//得到当前节点的前置节点状态
int ws = pred.waitStatus;
//如果是signal状态,也即当前节点的前置节点也是阻塞未获取锁的,正常来说只有前面节点获得了锁,并且释放了锁,当前节点才能获得锁
//因此既然前置节点也是阻塞的,那么当前节点应该挂起等待(等到被前置节点唤醒)
if (ws == Node.SIGNAL)
return true;
//如果当前节点的前置节点状态大于0,也即cancelled状态,那么从当前节点开始,往前推,删除中间所有的状态大于0(即被cancelled的节点)
//相当于每次走到这个函数时都会重新整合CLH链表,剔除无意义的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//走到这个else代表当前节点的前置节点状态是0(因为ReentrantLock无-2,-3状态)
//因为前置节点后面有节点(即当前节点)需要被处理(适当的时候被唤醒),因此将当前节点的前置节点状态由0改为-1
//这里0和-1的区别是:
//如果一个节点状态是0,那么当他获得锁并且执行完指令释放锁之后,他会认为自己没有需要处理的继任节点,也即释放完锁不需要唤醒其他节点
//但对于节点状态是-1,那么当他获得锁执行完任务并且释放锁后,他会唤醒自己的继任节点,让继任节点获得锁开始运行
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//这里需要明确的一点是由于acquireQueued()方法是for(;;)形式,也即可能会不断的调用当前函数,所以某次返回false不代表当前线程不需要休眠,可能某次执行的是ws> 0 的分支,重排完队列后,下一次再进来就是执行ws == Node.SIGNAL,此时就需要挂起了。
return false;
}
//取消获取锁,在锁获取失败时执行
private void cancelAcquire(Node node) {
if (node == null)
return;
//释放当前节点绑定的线程
node.thread = null;
//获得当前节点的前置节点,判断当前节点前置节点是否是CANCELLED状态,
//如果是那么再往前找,如此循环,一直找到一个状态小于等于0的节点,将当前节点的前置节点设为那个节点
//这里其实就是在剔除当前节点前面的无效等待节点(状态大于0的节点都是无效节点)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//此时pred已经是最靠近当前节点的有意义的前节点
Node predNext = pred.next;
//将当前节点状态设为CANCELLED,这样可以下次整理队列时删除这个节点
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,由于我们做了一些无意义节点的剔除工作,此时就需要重设一个有意义的尾节点
//很容易得出pred节点就是现在真实有意义的尾节点(前提是当前节点是尾节点)
if (node == tail && compareAndSetTail(node, pred)) {
//同时将pred节点(新的尾节点)的next指针指向空
compareAndSetNext(pred, predNext, null);
} else { //如果当前节点不是尾节点或者将pred节点设为尾节点失败
int ws;
//这个if比较长,但其实就说了一句话,如果pred节点是SIGNAL(有后续需要通知的节点),那么就如何如何
//我们可以一个一个看
//pred != head 前置节点不是傀儡头 并且
//pred.waitStatus== Node.SIGNAL 前置节点的状态是SIGNAL 或者
//ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 即如果前置节点不是SIGNAL但他是<=0的,那么就把它设置成SIGNAL状态
//所以归根结底,就是让pred状态为SIGNAL状态
//这里将pred状态设置为SIGNAL的原因也很简单,因为当前节点不是尾节点,那么就代表当前节点还有后继待唤醒的节点,但当前节点以及前面的一些连续无效节点要被删了,此时就需要新的节点来作为它们的前置节点,然后唤醒这些后继节点。因此将waitStatus设为SIGNAL,代表有后继需要唤醒的节点。
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//走到这里代表pred的状态是SIGNAL了,如果当前节点的下一个节点不是CANCELLED,那么就将pred节点的next指针设为当前节点的next
//这样,链表就删除掉了当前节点和当前节点之前的那些CANCELLED状态节点,
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//否则唤醒继任节点 unparkSuccessor这个函数我们会在unlock时分析,目前只需要记住他是用来唤醒继任节点的。
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
//这里做个总结,这个函数其实就是在做一件事:整理CLH队列,剔除无意义节点。当然它只是剔除了自己以及自己之前的那些连续的无意义头节点,及时的剔除无意义节点,有利于提高CLH队列的遍历效率
selfInterrupt源码如下:
//复原线程的中断标志
//只有线程中断过才会进这个函数 之前的parkAndCheckInterrupt()函数清空了线程的中断位
//这里相当于重新赋值线程中断位 之前清空的现在赋值回去
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
看完了公平锁整个加锁逻辑,我们再来看下非公平锁:
非公平锁的加锁只有两处代码不同,第一处是lock方法:
static final class NonfairSync extends Sync {
//...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//...
}
在公平锁中,lock方法会直接调用acquire,而acquire会调用tryAcquire方法,这个方法在获得锁的时候会判断当前CLH队列是否为空或者当前节点是否为头节点,换句话说,公平锁严格要求获取锁的节点是队列的头节点,别的节点不能获得锁。但非公平锁不同,非公平锁在lock的时候会直接立马尝试获得锁,要是能立马得到锁成功就返回,否则才会执行acquire
,另外两者的tryAcquire方法也不同:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
不同点只有一处,公平锁没有执行hasQueuedPredecessors
函数,而这个函数就是保证公平的核心函数。
后面所有的内容均与公平锁一致,比如入CLH队列的末尾节点,自旋获得锁,挂起等待等。
可以看见,公平锁与非公平锁主要区别就是在获取锁的时候,非公平锁可以先尝试着直接获取,不等其他任何节点,如果要是获取锁失败,依然会乖乖的将自己置为等待队列的尾节点等着获取锁。公平锁比较守规矩,每次获取锁的时候只有自己是队列的第一个时才会尝试获取锁。不公平锁有可能提高并发的性能,这是因为通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。
3.2.2 unlock
与lock一样,当我们调用lock.unlock时,实际调的是 sync.release()
public void unlock() {
sync.release(1);
}
其核心实现为AQS的release方法:
//锁释放的核心方法,其步骤就两步:第一步是修改锁的状态位,第二步是唤醒等待的下一个线程
public final boolean release(int arg) {
//尝试释放锁,由子类实现,其本质是修改状态位state
if (tryRelease(arg)) {
//如果锁释放成功并且当前节点的状态不等于0(其实就是<0,代表还有后继节点需要唤醒),那么就直接线程的唤醒操作
Node h = head;
if (h != null && h.waitStatus != 0)
//线程的唤醒操作
unparkSuccessor(h);
return true;
}
//否则,也即尝试释放锁失败直接返回false
return false;
}
//子类Sync实现,尝试释放锁,本质就是在修改state值,将state-1
protected final boolean tryRelease(int releases) {
//由于传入的releases == 1,因此这里就是state-1
int c = getState() - releases;
//如果释放锁的线程不是当前持有锁的线程直接抛错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c代表的是当前持有锁的数量(可重入的时候每次重入也会+1),如果c==0其实就是代表当前持有锁的线程是0个,那么就代表当前锁已经被释放掉了,那么将exclusiveOwnerThread清空并返回true。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//线程的唤醒,非常非常非常核心的代码,本质就是从CLH队列中找到下一个有效头节点唤醒它
private void unparkSuccessor(Node node) {
//将当前头节点的状态进行修改,设为0。
//这里的意义我也没看懂,可能是怕多线程同时执行release方法,这样CAS操作设为0后就避免别的线程也操作后续代码
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//这是核心逻辑代码,这里的目的就是从CLH队列中找出新的有意义头节点(会有些没意义的节点,比如状态为CANCEL的节点)
Node s = node.next;
//其逻辑与cancelAcquire方法中的逻辑一模一样。
//我们需要先理解变量s现在等于node.next,由于node已经执行完释放锁了,那么node也就没了意义,此时node.next就是这个CLH的头节点,但这个头节点有没有意义呢?如果s==null或者s.waitStatus>0,此时s就是无意义节点,就需要寻找真正的有意义节点
if (s == null || s.waitStatus > 0) {
s = null;
//真正有意义节点的寻找也很简单,就是从尾节点开始往前遍历,一直遍历到最后一个有意义的节点,就是头节点。
//也即离尾节点最远的那个有意义节点就是新的头节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到头节点后,唤醒头节点
if (s != null)
LockSupport.unpark(s.thread);
}
这里可能需要解释的为:
tryRelease
中的if(c==0)
,这里可能很多人会有疑问,既然是独占锁,那么应该只有一个线程会持有锁,c应该只等于1,那么c-1不一定等于0吗,为什么还判断c==0。这个原因很简单,就是可重入,在可重入的时候c也会增加。如果可重入的锁释放了,并不代表没有线程占用锁了,可能这个线程还在占用,没完全释放锁。
看完了ReentrantLock的加锁与释放锁源码,我们再举个例子说明一下多线程下使用独占锁时线程的生命周期:
假设你是一个线程小Y,你需要执行一段代码,但这段代码被上了锁,每个线程都必须拿到锁才能执行。也即你需要先执行
lock.lock()
这代表申请锁,当你执行后,会先判断当前锁有没有被别人持有(也即这段代码有没有别的线程在运行)。很不幸,你没能获得锁,那按照规则,你被包装成了一个节点,加进了等待队列,由于你是后来的你进了队列的最尾端。这一进队列不要紧,你发现原来队列里有那么多跟你一模一样的兄弟,它们也都在排着队等待获得锁然后执行代码。
现在队列中正在获得锁执行的是线程A。
由于你没有获得锁,也不是当前的头节点,很快有一个凶神恶煞的管理员过来跟你说:“小子,赶紧给我睡会,别在这不干活又消耗CPU的。”,你也不敢多言,只好准备睡了。只不过在睡之前,你发现你前面的兄弟早就已经熟睡了,并且并不知道什么时候,你前面兄弟的节点状态由0改为了-1。
注 就是对应这段代码:
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
你也不知道睡了多久,总之当你醒来的时候,现在队列已经是这样了:
你睡眼朦胧的问你的前置节点X大哥,是它把你唤醒的吗,X回答是的,并说道:现在到你了小Y,你现在是新的头节点了,你可以获得锁执行代码了。
由于从刚才睡着的位置被唤醒,因此你还是在执行
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
不同的是,你现在是新的头节点了,此时再一次for循环的时候,你会进入if分支,你将自己设为头节点代表获得了锁,然后开始噼里啪啦的执行加锁的那段代码。
当你执行完代码后,紧接着执行一个叫
lock.unlock()
的代码,有人告诉你,这是代表你现在执行完了,可以释放锁了,然后你要像X大哥那样,也唤醒你的后继节点,让它持有锁来执行代码吧。你如是照做,释放了锁,并执行
unparkSuccessor
函数,唤醒了你的后继节点Z。此时Z睡眼惺忪的问你:是你把我唤醒的吗Y大哥。你摸摸它的头,觉得这个场景似曾相识,然后回答道:是的,现在到你了小Z,你现在是新的头节点了,你可以获得锁执行代码了。
3.3 Condition
3.4 CountDownLatch
ReentrantLock是一个典型的独占锁,而CountDownLatch是共享锁的思想。独占锁和共享锁其实都可以做到线程间的同步机制。CountDownLatch核心功能也是同步多个线程。
假设我们需要等待某个(或多个)条件成熟,在这个条件(或所有条件)未成熟前,所有依赖于这个条件的线程都需要阻塞,一旦条件成熟,这些挂起的线程就都可以唤醒执行。
CountDownLatch翻译为闭锁。一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。
//创建 CountDownLatch,下面代码的CountDownLatch均是这个创建的对象
CountDownLatch countDownLatch = new CountDownLatch(2);
//线程1.创建100个线程,挂起等待两个条件成熟,两个条件均成熟后才被唤醒执行
public void test(){
for(int i = 0; i < 100; i++){
Thread thread = new Thread(()->{
System.out.println("条件未成熟,线程被挂起阻塞");
countDownLatch.await();
System.out.println("条件成熟,线程被唤醒执行");
});
thread.start();
}
}
//线程2. 条件1成熟
public void notify1(){
countDownLatch.countDown();
}
//线程3. 条件2成熟
public void notify1(){
countDownLatch.countDown();
}
上述代码中,先创建了一个需要等待两个条件的CountDownLatch。
线程1创建了100个线程,这100个线程执行countDownLatch.await()
,代表会一直等待,等到countDownLatch的两个条件均成熟才继续执行。
线程2执行countDownLatch.countDown();
,表示一个条件成熟
线程3执行countDownLatch.countDown();
,表示另一个也成熟。
此时两个条件均成熟,挂起的100个线程均会被唤醒然后执行自己后续的代码。可以看到我们通过CountDownLatch实现了100个线程的同步,让100个线程均阻塞等待,直到两个条件成熟,一旦条件成熟,100个线程均会被唤醒(不像独占锁一个一个唤醒)。
下面我们看一下CountDownLatch是如何实现await()和countDown()的。
3.4.1 await
//当创建一个闭锁时,传入count,这个count值会传给AQS的state值
//AQS的state值就是我们之前说的锁的状态位,比如在独占锁中我们进行lock和unlock的时候也是在修改这个state值(+1,-1)
//在CountDownLatch中,count的数量就是需要等待条件的数量,只有所有条件均成熟,才能放行等待的线程。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
protected final void setState(int newState) {
state = newState;
}
//调用await操作本质是调用sync.acquireSharedInterruptibly(1);sync是CountDwonLatch内部类
//当前方法是先判断是否满足获得锁(其实没有锁的概念,就是看state是否等于0,如果等于0就代表无需等待,因为没有需要等待的条件,条件均成熟了),如果不满足就将自己挂起
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//判断自己是否满足获得锁,如果state==0代表等待条件全部满足 返回正值代表获取锁成功否则代表获取锁失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//走到这里必然代表获取锁失败(有未成熟的条件,需要挂起等待),所以同独占锁,将自己变为节点加入到CLH队列自旋挂起
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//addWaiter方法与独占锁代码相同,都是将当前线程变为节点加入到CLH队列的末尾
//只不过对于独占锁,是以Node.SHARED方式加入,而同步锁是null
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//与独占锁挂起相同,for循环不断尝试挂起
for (;;) {
//如果当前节点是真实头节点,那么调用tryAcquireShared尝试获取锁
final Node p = node.predecessor();
if (p == head) {
//tryAcquireShared会返回1或-1,上面我们已经看了它的源码,1代表获取锁成功(无等待条件),-1代表获取锁失败(有等待条件)
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果获取锁成功,将当前节点设为新的头结点并唤醒继任节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果当前线程不是头结点或者获取锁失败,判断当前线程是否应该挂起
//这段代码也许独占锁完全相同,就是清理CLH队列的节点,如果前面节点是SIGNAL状态,那么就将自己挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//这里也是一样的,如果线程中断就将这个节点CANCELLED
if (failed)
cancelAcquire(node);
}
}
//设置新的头结点 将当前节点设为头结点并且唤醒继任节点,走到这其实propagate必然等于1
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取继任节点,并判断如果是共享模式,就执行doReleaseShared唤醒继任节点。
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//唤醒头结点的继任节点
//这里的逻辑有些复杂,但核心就是走到unparkSuccessor(h)这句话
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒继任节点,这个函数我们在独占锁讲过
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3.4.2 countDown
//countDownLatch的countDown
//本质就是将state状态位-1然后如果减为了0 就调用doReleaseShared,唤醒CLH队列头结点的继任节点
public final boolean releaseShared(int arg) {
//尝试将当前状态位-1 如果状态位归0返回true否则返回false
if (tryReleaseShared(arg)) {
//尝试释放当前头结点的继任节点
doReleaseShared();
return true;
}
return false;
}
//将state状态位-1,如果state==0就返回true,否则返回false
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//通过CAS操作将state-1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
现在我们就看完了CountDownLatch的源码,那么我们可以对比一下独占锁和共享锁:
首先在ReentrantLock中,锁的状态位state代表当前持有锁的线程数量,但在CountDownLatch中代表需要等待的条件的数量。
其次独占锁的逻辑是这样的:
lock.lock();
dosomething();
lock.unlock();
因此当独占锁获得锁后会先执行自己的业务代码,在执行完业务代码后再释放锁唤醒下一个线程。所以这个流程就是:获得锁->执行业务代码->释放锁唤醒下一个线程->获得锁->执行业务代码->释放锁唤醒下一个线程-> ......
但共享锁完全不同,共享锁在获得锁后,会先唤醒下一个线程再执行自己的代码,也即:
获得锁->唤醒下一个线程后再执行自己的业务代码->获得锁->唤醒下一个线程后再执行自己的业务代码->获得锁->唤醒下一个线程后再执行自己的业务代码-> ......
这样一旦头节点获得了锁,大家的所有逻辑都是先唤醒继任节点再干自己的事,因此CLH队列内的所有阻塞线程会在很快的时间均被唤醒。这样就会出现一种一旦锁释放,所有线程都可以运行自己代码的现象,也就可以理解为所有线程共享一个锁。
独占锁和共享锁最大的区别就是唤醒继任节点的时间点。
3.5 CyclicBarrier
想象这样一个场景:
过年了,大家一起都等着吃年夜饭。现在规定饭没做好的时候大家都会等它做好再吃,如果已经做好了就可以直接吃。假设年夜饭会在六点钟做好,小孩5点就开始等了,5点半的时候女人也开始加入一起等的人里面,一直到六点,这时老人也来了,饭正好也做好,大家不用等其他人,饭做好了就直接吃。
上面这个场景我们可以很容易的使用一个CountDownLatch模拟出。CountDownLatch只需要一个条件,就是年夜饭做好。每个小孩,老人或女人都是一个单独的线程,小孩在5点的时候await(),女人在5点半的时候await(),老人在6点的时候await()。CountDownLatch在6点的时候执行countDown()此时所有被等待的人均可以开始吃饭了。
但在七点的时候男人回来了,男人回来后提出了一些新的规矩:
男人觉得年夜饭大家应该一起开动一起吃,怎么能不等他们就直接开吃呢?因此男人规定,不管年夜饭什么时候做好,只要人凑齐了,大家就可以开始吃了。
现在难办了,因为CountDownLatch只会关心等待的条件有没有成熟,但同时有多少线程在等待(人有没有齐)并不关心。此时就是CyclicBarrier大展身手的时候了,Barrier翻译栅栏或屏障,就好像前面有一个屏障,只有大家都到齐了,才能手拉手一起过去,谁走的快一点谁就在屏障点那等着。上面吃饭的场景代码如下:
/**
* @author coderZoe
* @date 2022/6/23 13:26
*/
public class NewYearDinner {
private final CyclicBarrier cyclicBarrier;
private final int count;
public NewYearDinner(int count) {
this.count = count;
this.cyclicBarrier = new CyclicBarrier(count);
}
public void readyToEat(){
new Thread(() ->{
try {
System.out.println("准备吃饭");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以吃饭了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
//测试方法
public static void main(String[] args) throws InterruptedException {
int personCount = 10;
NewYearDinner newYearDinner = new NewYearDinner(personCount);
for(int i = 0; i < personCount; i++){
newYearDinner.readyToEat();
Thread.sleep(1000);
}
}
}
CyclicBarrier在构建的时候需要传参count
,这代表需要等待的线程数,只有一共count个线程在等待时,才代表到齐了,可以放行了。
我们之前在CountDownLatch时讲过:
一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。
CyclicBarrier中有一个单词是Cyclic,翻译为循环或周期的,这代表CyclicBarrier可以循环的时候,不仅只能用一次,这也是与CountDownLatch的不同。
还以上面的吃年夜饭为例:
现在大家都已经开始吃年夜饭了,小孩子一般吃的快一些而男人与老人一般吃的慢一些,吃完饭后往往要放烟花,自然而然的,放烟花也是要等大家一起的,先吃完的必须等待还没吃完的,等到大家都吃完了,才能放烟花。
可以看到放烟花也是一个CyclicBarrier的使用场景,我们其实可以复用之前的CyclicBarrier,其代码如下:
package cn.com.coderZoe.Module10JUC.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author coderZoe
* @date 2022/6/23 13:26
*/
public class NewYearDinner {
private final CyclicBarrier cyclicBarrier;
private final int count;
public NewYearDinner(int count) {
this.count = count;
this.cyclicBarrier = new CyclicBarrier(count);
}
public void readyToEat(){
new Thread(() ->{
try {
System.out.println("准备吃饭");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以吃饭了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
public void readyToFireworks(){
new Thread(() ->{
try {
System.out.println("准备放烟花");
cyclicBarrier.await();
System.out.println("太好了,大家都准备好了,可以放烟花了");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
//测试函数
public static void main(String[] args) throws InterruptedException {
int personCount = 10;
NewYearDinner newYearDinner = new NewYearDinner(personCount);
for(int i = 0; i < personCount; i++){
newYearDinner.readyToEat();
Thread.sleep(1000);
}
for(int i = 0; i < personCount; i++){
newYearDinner.readyToFireworks();
Thread.sleep(1000);
}
}
}
可以看到我们使用了同一个CyclicBarrier。
在讲CyclicBarrier#await()
实现之前,我们先来看下CyclicBarrier的部分源码实现:
CyclicBarrier的属性:
//独占锁
private final ReentrantLock lock = new ReentrantLock();
//条件变量 用于线程间的同步
private final Condition trip = lock.newCondition();
//需要彼此等待的线程的个数
private final int parties;
//在一组任务执行完,就会执行这个runnable任务
private final Runnable barrierCommand;
//代; 一组任务代表一代或者说一次CyclicBarrier的使用代表一代
private Generation generation = new Generation();
//当前还需要等待的线程数 如果为0就代表无需等待,所有人都完成了。
private int count;
CyclicBarrier的构造方法
//构造方法,parties代表要彼此等待的线程数,同时支持传入一个runnable方法,在最后一个线程到达屏障点后会执行这个方法,如果为空则不执行
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
通过上面的部分源码和例子,我们可以总结CyclicBarrier特性如下:
- await()方法将挂起线程,直到同组的其它线程执行完毕才能继续
- await()方法有一个返回值,会返回一组内还剩多少线程未到屏障点
- 可循环使用
- CyclicBarrier 的构造函数允许携带一个任务,这个任务将在0%屏障点执行,它将在await()==0后执行。
3.5.1 await
await
有几个重载方法,其本质都是会调用到dowait()
方法,可以看到dowait()
才是核心方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
//实际执行线程阻塞和挂起的核心方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,TimeoutException {
//整个dowait操作都是线程安全的,ReentrantLock将整个函数包了起来
//原因是组内的线程调用await肯定是并发调用的,为了保证对CyclicBarrier内数据的访问和修改是线程安全的,所有加独占锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前的代或者当前组
final Generation g = generation;
//异常判断
if (g.broken)
throw new BrokenBarrierException();
//如果当前调用线程中断,直接中断整个这一组的CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//因为调用dowait时必然是指到了屏障点,此时将count-1
int index = --count;
//如果是0代表所有一组内的所有线程都到了屏障点
if (index == 0) { // tripped
boolean ranAction = false;
try {
//因为count现在等于0,那么代表当前正在执行await的线程是到达屏障点的最后一个线程
//最后一个线程需要执行一开始构造方法放进来的barrierAction,
//就是我们刚才说的每组任务执行完会由最后一个线程执行barrierAction,对应上面特性4
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//因为当前组都执行完了,所以需要唤醒其他线程,并且复位CyclicBarrier供下一次使用。
nextGeneration();
return 0;
} finally {
//这里是判断执行barrierAction失败了,如果执行失败也会终止当前一组的CyclicBarrier
if (!ranAction)
breakBarrier();
}
}
//会走到这代表当前await线程不是最后一个到达屏障点的线程,他就需要挂起等待。
for (;;) {
try {
//如果不关心超时,直接执行条件变量的await将自己挂起
if (!timed)
trip.await();
//否则就使用条件变量的awaitNanos判断是否超时
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//要是线程被中断了(或超时了,其实也是被中断),就中断掉当前barrier,即结束本组。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//异常处理 如果本轮被挂掉了,就抛出异常
if (g.broken)
throw new BrokenBarrierException();
//会走到这代表正常被唤醒了(被最后一个线程全唤醒了,g!=generation的原因是因为最后一个线程new了一个新的Generation)
//返回自己的序号就可以了
if (g != generation)
return index;
//如果超时就抛出超时异常并终止当前一组的CyclicBarrier
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
//当最后一个线程到达屏障点时会调用这个函数
private void nextGeneration() {
//通知其他线程 放行大家
trip.signalAll();
//复位
count = parties;
generation = new Generation();
}
//中断和复位
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
可以看到CyclicBarrier对于多线程的同步是通过Lock与Condition这一组合实现的。
3.6 Semaphore
Semaphore也即信号量,很多同学可能在学习操作系统的时候学习过它,如果没学过也没关系。Semaphore最多的使用场景就是池化,比如我们熟知的线程池,连接池都是用到了池化技术。
Semaphore 是一个计数信号量。从概念上讲,信号量维护了一个许可集。在许可可用前会阻塞每一个 acquire()
的线程。每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取线程。但是,Semaphore不使用实际的许可对象, 只对可用许可的号码进行计数,并采取相应的行动。
也即Semaphore是一个计数器,每有一个请求计数器会减1,每次释放计数器就会+1,在计数器大于0时允许线程通过,在计数器等于0时会阻塞线程,即使已经获得许可的线程也会阻塞,因此Semaphore是不可重入的。
我们可以通过Semaphore实现一个对象池:
//对象池
public class ObjectPool <T>{
public interface ObjectFactory<T>{
T makeObject();
}
class Node{
T obj;
Node next;
}
private final int capacity;
private final ObjectFactory<T> factory;
private final Lock lock = new ReentrantLock();
private final Semaphore semaphore;
private Node head;
private Node tail;
public ObjectPool(int capacity, ObjectFactory<T> factory) {
this.capacity = capacity;
this.factory = factory;
this.semaphore = new Semaphore(this.capacity);
this.head = null;
this.tail = null;
}
//从池内获取一个资源
public T getObject() throws InterruptedException {
semaphore.acquire();
return getNextObject();
}
//将一个资源返回给资源池
public void returnObj(T t){
returnObjToPool(t);
semaphore.release();
}
private T getNextObject(){
lock.lock();
try {
if(head==null){
return factory.makeObject();
}else {
Node ret = head;
head = head.next;
if(head==null){
tail=null;
}
ret.next = null;
return ret.obj;
}
}finally {
lock.unlock();
}
}
private void returnObjToPool(T t){
lock.lock();
try {
Node node = new Node();
node.obj = t;
if(tail==null){
head = tail = node;
}else {
tail.next = node;
tail = node;
}
}finally {
lock.unlock();
}
}
}
上面的对象池逻辑比较简单,首先对象池内部维护了一个链表,链表代表了对象池,在构造方法时传入对象池的大小,并构造Semaphore。其次通过Semaphore来控制资源的获取数量,只有当前被获取的资源小于capacity,才能进入getNextObject()
获取资源,对象的获取就是返回对象池的头结点。当对象返回给对象池时就是执行将节点加入对象池的尾端并执行semaphore.release(),当对象池的对象用完后再请求会被阻塞直到有新的对象被返回回来。
3.6.1 信号量与独占锁
想一下如果我们将Semaphore的初始化大小设为1,即只允许有一个可用的许可,如果现在有人在用,那必须等到它返回其他人才能用。这样看信号量起到了一个互斥锁的作用。这种信号量我们称为二进制信号量(二值信号量),因为只有两个状态可用和不可用。所以与锁相似,信号量也存在公平信号量和非公平信号量,对于公平信号量就是说一定会按请求的顺序获取锁而非公平信号量就是先获取如果获取不得再加入队列等待。
3.6.2 信号量与条件变量
信号量其实还可以作为条件变量使用,我们假设现在有两个线程A和B,A调用信号量的acquire()
希望能够阻塞挂起,然后B调用信号量的release()
将挂起的A唤醒,这就类似于A线程在等待条件B。在这种情况下,信号量的初始值需要赋为0,这似乎很奇怪,一个可用资源是0的信号量。其实只有这样,当A执行acquire()
才会被挂起,此时信号量-1,然后B调用release()
,此时信号量+1,线程A被唤醒。
注:在JUC下无法通过信号量为0实现条件变量,因为JUC下的信号量在唤醒线程的时候做了是否有可用资源的判断。之所以加进来信号量与条件变量是因为笔者在阅读《操作系统导论》时书中提及的知识,笔者认为可能是不同语言下信号量的实现不同,而且信号量为0这一思路也很新奇,因此加进了笔记。
3.6.3 acquire
//当调用这里的时候会走到Sync(内部类)的acquireSharedInterruptibly
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//这里的意思是说线程会优先尝试获取锁,如果获取成功(计数器大于等于0,即存在可用资源)就直接返回,否则阻塞挂起线程。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared会返回当前池内可用资源,如果小于0说明池内无可用资源
if (tryAcquireShared(arg) < 0)
//如果池内无可用资源,就将线程挂起,否则直接返回
doAcquireSharedInterruptibly(arg);
}
//对于公平锁和非公平锁其区别就在tryAcquireShared这个函数
//与ReentrantLock的相同信号量的公平锁和非公平锁也是在获取锁的一瞬间有区别,非公平锁会先尝试获取,获取成功就直接用,不成功再入队列,公平锁会先判断自己是不是头节点
//公平锁
protected int tryAcquireShared(int acquires) {
for (;;) {
//与Reentrant公平锁内的实现相似
//如果不是头结点(当前节点有前置节点) 直接返回-1
if (hasQueuedPredecessors())
return -1;
//走到这代表当前节点有可能是头节点,具有获得锁的权力
//获得state,将state减1,如果剩余小于0则直接返回,否则将state的状态设为减去1后的值(CAS加for)
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//非公平锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
//直接获取,如果获取失败(剩余小于0)就返回,否则就不断获取直到通过CAS操作将state设为减1后的值成功返回
//也即与公平锁的区别是不需要考虑当前节点是否是头结点
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
上面需要解释的是for(;;)
,之所以加for循环是因为如果当前remaining不小于0且CAS操作失败,那么可以继续重新执行。
doAcquireSharedInterruptibly()
函数源码在CountDownLatch时已经介绍过。
3.6.4 release
//释放资源
public void release() {
sync.releaseShared(1);
}
//尝试释放锁,释放成功后唤醒头结点的继任节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//这里说过很多次了,共享模式下的继任节点唤醒
doReleaseShared();
return true;
}
return false;
}
//就是简单的将sate+1然后通过CAS操作设置state的值
protected final boolean tryReleaseShared(int releases) {
//为避免CAS操作失败,for循环不断尝试
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared
在CountDownLatch中讲过。
3.6.5 一些总结
Semphore的源码已经看完了,简单来讲还是内部维持的一个state,每次请求资源的时候就执行state-1操作,当state-1小于0就挂起线程,当state-1大于等于0就将state设为sate-1,然后返回。当资源释放的时候就执行state+1操作并唤醒继任节点。
还回到锁本身讨论,这些锁或锁的应用类本质都是维护了state值,这个值得是线程安全的,所以通过CAS操作这个值。
对于ReentrantLock,这个state代表的是持有线程的锁的个数,如果state大于0并且不是正在拥有锁的线程就挂起,当unlock时state-1,并唤醒阻塞线程(如果state==0才唤醒)。对于CountDownLatch和CyclicBarrier这个state代表的是剩余条件个数,state的值大小是初始化时指定的。每有一个条件完成state就-1,当减为0代表条件成熟唤醒所有阻塞线程。对于Semaphore这个state代表的是资源可用数,也是在初始化时指定。每次资源请求时state会-1,当资源减为0还在请求时就会被挂起等待,而每次资源返回时state就会+1,并且唤醒等待的节点。
对于线程的唤醒,ReentrantLock、CountDownLatch、CyclicBarrier和Semaphore本质都是相同的,就是一旦满足条件就唤醒CLH队列的首个线程,不同之处在于唤醒的时机。
ReentrantLock是在执行完临界代码,然后unlock的时候才唤醒,即临界代码执行完才会释放锁,符合独占的思想。而对于CountDownLatch或Semaphore,会在线程一旦获得锁后也唤醒继任节点,符合共享的思想(一人得到锁,通知其他人也试着去得到)。
举个例子,如果是ReentrantLock,一个线程获得锁后会先执行自己的代码,执行完后会手动释放锁,释放锁时唤醒继任节点,继任节点也重复上述操作,每个时间只有一个线程拥有锁。而对于CountDownLatch,一个线程获得锁后代表条件成熟,他会先唤醒其他线程再执行自己的操作,当继任线程被唤醒,也会尝试获得锁,自然也可以得到锁,那么一个传一个,CLH队列内的所有节点都会被唤醒,大家醒来都是先唤醒别人再干自己的事,所以CountDownLatch是一醒都醒。Semaphore有一点不同,他也是共享锁,一个线程获得锁后会先唤醒继任节点再做其他事,但不同于CountDownLatch所有继任节点都可以获得锁,Semaphore的继任节点可能不一定能获得锁,这时这个不能获得锁的节点的后续节点均不能被唤醒了,即Semaphore不能做到一醒都醒,只能是一醒,在一定范围内的节点都醒(醒来的节点数等于可获取资源数,非公平锁不算)。
3.7 ReadWriteLock
读写锁是一种使用非常频繁的锁,比如MySQL事务的串行化隔离级别就是通过读写锁实现的。读写锁的思想很简单,读锁是一个共享锁,也即读操作可以多线程同时进行。写锁是一个独占锁,写操作需要串行的进行。
JUC中读写锁的接口如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
同时其具体实现为ReentrantReadWriteLock
,我们以一个例子来说明读写锁的使用方法:
public class SimpleConcurrentMap<K,V> implements Map<K,V> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final Map<K,V> map;
public SimpleConcurrentMap(Map<K, V> map) {
this.map = map;
}
@Override
public int size() {
readLock.lock();
try {
return map.size();
}finally {
readLock.unlock();
}
}
@Override
public boolean isEmpty() {
readLock.lock();
try {
return map.isEmpty();
}finally {
readLock.unlock();
}
}
@Override
public boolean containsKey(Object key) {
readLock.lock();
try {
return map.containsKey(key);
}finally {
readLock.unlock();
}
}
@Override
public boolean containsValue(Object value) {
readLock.lock();
try {
return map.containsValue(value);
}finally {
readLock.unlock();
}
}
@Override
public V get(Object key) {
readLock.lock();
try {
return map.get(key);
}finally {
readLock.unlock();
}
}
@Override
public V put(K key, V value) {
writeLock.lock();
try {
return map.put(key,value);
}finally {
writeLock.unlock();
}
}
@Override
public V remove(Object key) {
writeLock.lock();
try {
return map.remove(key);
}finally {
writeLock.unlock();
}
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
writeLock.lock();
try {
map.putAll(m);
}finally {
writeLock.unlock();
}
}
@Override
public void clear() {
writeLock.lock();
try {
map.clear();
}finally {
writeLock.unlock();
}
}
@Override
public Set<K> keySet() {
readLock.lock();
try {
return map.keySet();
}finally {
readLock.unlock();
}
}
@Override
public Collection<V> values() {
readLock.lock();
try {
return map.values();
}finally {
readLock.unlock();
}
}
@Override
public Set<Entry<K, V>> entrySet() {
readLock.lock();
try {
return map.entrySet();
}finally {
readLock.unlock();
}
}
}
利用读写锁我们实现了一个简单的并发Map,可以看到这里的使用很简单,就是要读的时候加读锁,要写的时候加写锁。ReadWriteLock需要严格区分读写操作,如果读操作使用了写入锁,那么降低读操作的吞吐量,如果写操作使用了读取锁,那么就可能发生数据错误。
接下来我们看下读写锁的实现:
首先读写锁内部属性如下:
//读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//锁
final Sync sync;
ReadLock和WriteLock是ReentrantReadWriteLock的两个内部类,我们在执行lock.readLock()
和lock.writeLock()
的时候就是在获取readerLock和writerLock:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
Sync也是ReentrantReadWriteLock的内部类,其底下有两个具体实现类FairSync和NonfairSync(这与ReentrantLock和Semaphore等相似)。
读锁和写锁内部都有一个Sync属性:
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
而Sync是ReentrantReadWriteLock的内部抽象类,其有两个实现类是公平锁和非公平锁也是ReentrantReadWriteLock的内部类(这与ReentrantLock一模一样)。Sync继承自AQS,即从这里我们可以看出,Sync(及其实现类)才是那个实际能做锁操作的对象。
ReentrantReadWriteLock
的构造方法如下,默认构造方法是非公平锁(与ReentrantLock一模一样)
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
这里有个非常关键的信息,在构造ReadLock和WriteLock时,我们传入的是同一个对象(this),那么也就代表writerLock和readerLock对象持有的sync属性是同一个对象,我们之前说过sync才是那个根本的锁,既然写锁是独占锁,读锁是共享锁,那为什么可以持有同一个锁对象呢?也即为什么读写锁是同一把锁呢?
之前的学习中,我们知道锁判断的本质就是AQS里的state标志位,state标志位标识了当前线程使用锁的情况,但现在我们将一个锁劈成了读锁和写锁两个锁来用,读线程和写线程肯定有不同的判断逻辑,那我们要如何记录呢?ReentrantReadWriteLock的实现方式是将state拆为高16位字节和低16位字节(state是个32字节的int变量),高16位用来描述读锁,低16位用来描述写锁的情况,我们知道16位能表示的最大整数是65535。所以无论是读锁还是写锁,能描述的最大持有锁的线程只能是65535个(其实写锁影响不大,因为写锁是独占锁,除非不断的重入)。
了解了这个之后,我们再来看读写锁的加锁和锁释放源码:
3.7.1 writeLock.lock()
//写锁lock的时候,其会调用到AQS的acquire,本质就是一个独占锁或者说使用锁的独占模式
public void lock() {
sync.acquire(1);
}
//通过独占的形式先尝试获得锁,获得不了就排队等待
//这段代码与ReentrantLock的lock相同,只是tryAcquire的实现有些许区别
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//写锁的tryAcquire(或者理解为读写锁在独占模式下的tryAcquire)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
//这里是做了位操作,获得C的低16位
int w = exclusiveCount(c);
//c是由读锁和写锁同时构成,c!=0有三种情况:
//1. 读锁不为0,当前有在读的线程
//2. 写锁不为0,当前有在写的线程
//3. 读写锁均不为0,当前有正在读和写的线程(肯定是同一个线程)
if (c != 0) {
//如果写锁为0,那么读锁肯定不为0,此时因为有在读的线程,所以无法写入
//这里我们可以看到锁是不支持升级的,也即当有线程在读取的时候是不能写入的。不能从读取锁升级为写入锁(无论正在读取的线程是不是当前线程)
//或者 首先会走到或者代表w!=0,也即当前有线程在写入,如果当前写入线程不是自己,也返回失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//走到这里代表w!=0,同时当前在写入的线程是自己
//这里是对可重入的判断,我们说过,16位最多表示65535,因此这是在判断独占锁可重入次数大于65535的错误
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//走到这里代表着:
//1.没有读线程只有写线程,且写线程是自己
//2.可重入的计数器没超过65535
//其实走到这里就是重入,而且一定是重入,因为w!=0
//更新c的值
setState(c + acquires);
return true;
}
//走到这代表c=0 因为c!=0的情况都判断完了。也即当前读锁和写锁都没人用
//判断当前线程是否需要阻塞,如果不需要阻塞就直接返回false代表获取锁失败
//如果不需要阻塞,将c值更新 如果更新失败返回false代表锁获取失败
//否则代表锁获取成功,将当前线程设为独占线程,返回成功
//这里可能有人有疑问,c都已经等于0了,那直接获得锁不就行了,咋还需要判断是不是需要挂起
//原因很简单,有可能其他线程一起进来竞争写锁,你刚才读到的c=0不代表现在还是
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
其中writerShouldBlock()
的判断就是公平锁 与非公平锁的区别:
//公平锁会执行hasQueuedPredecessors,如果当前节点是真实头结点就不需要挂起,否则就得挂起
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
//非公平锁获取锁的时候不需要考虑顺序,直接返回false直接获取就可以了。
final boolean writerShouldBlock() {
return false;
}
可以看到公平锁还是得先判断当前节点是队列头节点才允许获取锁,非公平锁直接获取。
acquireQueued()
与addWaiter()
、selfInterrupt()
上面均讲过(在ReentrantLock一节)。
3.7.2 writeLock.unlock()
//写锁释放的时候会调用AQS的release函数 与独占锁相同
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放锁,如果释放成功就判断当前节点是否有后继节点需要唤醒,如果需要唤醒就唤醒后继节点
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
//如果释放锁的线程不是当前独占的线程就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//将c值减1 由于写锁是低16字节,因此直接-1即可
int nextc = getState() - releases;
//通过位操作判断当前写锁是否等于0,等于0就代表当前线程执行完了,可以换下一个线程了
boolean free = exclusiveCount(nextc) == 0;
//如果写锁state==0代表当前写锁已经没有人占用,将独占的字段清空
if (free)
setExclusiveOwnerThread(null);
//同时更新state字段
setState(nextc);
//返回当前锁是否释放
return free;
}
后续锁释放流程与独占锁相同,不再赘述。
3.7.3 readLock.lock()
//调用共享锁的lock
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//尝试获取锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果写锁不等于0并且写锁的使用线程不是当前线程,那么直接返回失败
//如果是当前线程则代表写入的线程也可以同时执行获得读锁读取
//但是读取的时候是不能写入的
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//会走到这代表可以获得读锁 要么是写锁为空要么是写锁的独占线程就是当前线程
//获得读锁的值
int r = sharedCount(c);
//如果读锁不需要挂起并且当前读锁没超过65535并且更新读锁的数据成功就获得锁
//否则执行fullTryAcquireShared操作,不断循环获得锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//关于if里面的东西我们下面再单独说
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
通过之前的学习我们基本知道锁的状态位state其实就是一个计数器,在获得锁的时候计数器+1,释放锁的时候计数器-1。共享锁不同于独占锁,独占锁在获取锁的时候,会有个exclusiveOwnerThread变量标识当前持有锁的是哪个线程,这样当某个线程在执行lock.unlock()
时会先判断它是不是真的持有锁的那个线程,如果不是就抛出异常。但是共享锁不同,共享锁会有很多线程在持有锁,如果此时某个线程执行lock.unlock()
该如何记录这个线程是不是真的曾持有共享锁,以及如何修改计数器的值呢?因此我们需要一个信息,这个信息是记录当前线程是不是真的持有共享锁以及如果持有共享锁则持有多少个(可重入下每次重入都会+1),上面if语句的作用就是做这个的,要理解if语句内的内容,需要先看下Sync内部的属性
//当前线程持有的读锁的数量(可重入)
private transient ThreadLocalHoldCounter readHolds;
//CLH队列中最后一个节点的持有的读锁的数量(可重入)
private transient HoldCounter cachedHoldCounter;
//CLH队列中第一个节点对应的线程(第一个获得读锁的线程)
private transient Thread firstReader = null;
//CLH队列中第一个节点对应的线程持有的读锁的数量(可重入)
private transient int firstReaderHoldCount;
其中ThreadLocalHoldCounter如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
可以看到ThreadLocalHoldCounter继承自ThreadLocal,而HoldCounter类如下:
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter内的count是一个计数器,计的是持有读写锁的数量。因此ThreadLocalHoldCounter就是当前线程持有读写锁的数量,知道ThreadLocal的同学可能会明白,ThreadLocal是线程安全的,ThreadLocal作用域是线程,每个线程都持有一份自己的HoldCounter。
同样,公平锁和非公平锁的不同也就是readShouldBlock的实现:
//公平锁会执行hasQueuedPredecessors,如果当前节点是真实头结点就不需要挂起,否则就得挂起
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
//非公平锁会调用AQS的apparentlyFirstQueuedIsExclusive
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
3.7.4 readLock.unlock()
4. 常用线程安全的数据结构
5. 线程池
6. 结尾的一些话
计算机行业的发展似乎总是日新月异,新的技术和名词层出不穷,越来越多的人评价这个行业内卷,需要持续的学习。
金庸先生的武侠小说中有一门武功叫九阳真经,这是门心法武功,学会九阳真经后再学所有天下武功都是触类旁通,一学即会。天下武功,虽千变万化但最终的心法终究是一样的,掌握住这个最核心的心法,便掌握了天下的所有武功。
计算机的学习也是如此,而计算机界的九阳真经就是我们大学学的那些《操作系统》、《数据结构》、《算法》、《计算机网络》等知识。
笔者在写这篇文档的时候是2022年,距离Doug Lea完成JUC已经17年之久,而笔者重点参考的博客《深入浅出 Java Concurrency》也距今有12年之久。
那么多年,新的技术层出不穷,但JUC依然是现在Java面试的必考题。且不论面试题的八股与否,JUC源码的学习确实让笔者成长了不少,而JUC也是笔者首次揭开源码世界的大门。
笔者也坚持认为,像JUC这样的知识,它就是Java界的九阳真经(其实JUC的知识已经远超JAVA范畴),多读这些“核心功法”的源码,提升自己的开发内功,才能触类旁通,学什么都很快。
参考文档
《深入浅出 Java Concurrency》目录 - xylz,imxylz - BlogJava