1. 基本使用
// 默认是非公平锁
ReentrantLock lock = new ReentrantLock();
// 公平锁:ReentrantLock lock = new ReentrantLock(true);
public void method() {
lock.lock();
try {
// 加锁后的操作
} finally {
lock.unlock();
}
}
2. 原理
ReentrantLock
的底层就是通过 AQS
来实现的,但是它并不是直接继承 AQS
抽象类,而是实现了 Lock
接口。真正继承 AQS
的是它的内部类 Sync
。
public class ReentrantLock implements Lock {
// 只有一个Sync同步变量
private final Sync sync;
// Sync继承自AQS,主要逻辑都在这里面
abstract static class Sync extends AbstractQueuedSynchronizer {
}
// Sync的两个子类,分别实现了公平锁和非公平锁
static final class FairSync extends Sync {
}
static final class NonfairSync extends Sync {
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.acquire(1);
}
}
当调用 lock()
方法进行加锁时,内部调用的是 sync.acquire(1)
方法。
2.1 AQS
AQS
核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH
队列的变体实现的,将暂时获取不到锁的线程加入到队列中。这个队列是一个虚拟的双向队列,即 不存在队列实例,仅存在结点之间的关联关系(前后指针)。waitStatus
表示节点在队列中的状态,有以下几个枚举值:
0:当一个 Node 被初始化的时候的默认值
CANCELLED
:为 1,表示线程获取锁的请求已经取消了CONDITION
:为-2,表示节点在等待队列中,节点线程等待唤醒PROPAGATE
:为-3,当前线程处在 SHARED 情况下,该字段才会使用SIGNAL
:为-1,表示线程已经准备好了,就等资源释放了
AQS
使用 int 成员变量 state
表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。state
变量由 volatile
修饰,用于展示当前临界资源的获锁情况。且对改变量操作的方法由关键字 final
修饰,无法被子类重写,但是能调用方法修改 state
表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
以可重入的互斥锁 ReentrantLock
为例,它使用 state
变量来表示锁的占用状态。state
的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 lock()
方法时,会尝试通过 tryAcquire()
方法独占该锁,并让 state
的值加 1。如果成功了,那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH
队列)中,直到其他线程释放该锁。
2.2 非公平锁源码
非公平锁对应的同步器为 NonfairSync
,加锁的调用逻辑如下:
通过
ReentrantLock
的加锁方法lock
进行加锁操作。会调用到内部类
Sync
的acquire
方法,这个方法在AQS
的里面。AQS
的acquire
方法会执行tryAcquire
方法,但是由于tryAcquire
需要自定义同步器实现,因此执行了ReentrantLock
中的tryAcquire
方法,由于ReentrantLock
是通过公平锁和非公平锁内部类实现的tryAcquire
方法,因此会根据锁类型不同,执行不同的tryAcquire
。非公平锁调用的是
NonfairSync
的tryAcquire
方法,最终调用到Sync
的nonfairTryAcquire
方法。获取锁失败,会执行框架
AQS
的后续逻辑,跟ReentrantLock
自定义同步器无关。
// ReentrantLock
public void lock() {
sync.acquire(1);
}
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
2.2.1 加锁逻辑
主要分析最后调用的方法 nonfairTryAcquire
逻辑。
// ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 1. 获取同步状态
int c = getState();
// 2. state=0 表示无锁,则先通过 CAS 操作尝试加锁(使用CAS设置state=1)
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 3. 加锁成功,就把当前线程设置为持有锁线程
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 当前有锁,且持有锁线程就是当前线程,执行可重入的操作
else if (current == getExclusiveOwnerThread()) {
// 5. 加锁次数 + acquires
int nextc = c + acquires;
// 6. 超过tnt类型最大值,溢出了
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 7. 更新 state 变量值
setState(nextc);
return true;
}
// 8. 加锁失败,返回 false
return false;
}
整个逻辑还是很清晰的:
判断同步状态state,如果state=0表示无锁,就尝试加锁,如果加锁成功,就更新state和线程owner。
如果线程owner就是当前线程,执行可重入锁的逻辑,更新state值。
否则加锁失败,返回false。
2.2.2 加入等待队列
如果加锁失败, tryAcquire
方法返回 false
,则 !tryAcquire(arg)
为 true
,进入下一步入队 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
操作。
// AQS
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 初始化队列
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
通过当前线程和锁模式新建一个节点。
判断尾节点
tail
是否为null
,若为null
,则说明队列中没有元素,需要初始化队列。通过
CAS
操作初始化一个头结点出来。但是要注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。并将尾节点tail
指向头结点head
。
队列初始化完毕后,再次进入
for
方法体,此时tail
不为null
,则将tail
节点设置为当前线程节点的前一个节点。并通过CAS
操作将当前线程节点设置为尾节点tail
,同时,由于是双向的,也需要将前一个节点指向尾节点。如果再有线程要获取锁,依次在队列中往后排队即可。
2.2.3 再次尝试获取锁
调用上面的 addWaiter
方法使得新节点已经成功入队了,然后调用 acquireQueued
再次尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {
// 中断标记
boolean interrupted = false;
try {
// 自旋,要么获取锁,要么中断
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
// 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
// 调用ReentrantLock.NonfairSync.tryAcquire()方法再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 尝试获取锁成功
// 这里同时只会有一个线程在执行,所以不需要用CAS更新
// 把当前节点设置为新的头节点
setHead(node);
// 并把上一个节点从链表中删除
p.next = null; // help GC
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 如果发生异常,取消获取锁
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false
// 第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个节点的状态
// static final int CANCELLED = 1;
// static final int SIGNAL = -1;
// static final int CONDITION = -2;
// static final int PROPAGATE = -3;
int ws = pred.waitStatus;
// 如果前一个节点的状态是 SIGNAL(等待唤醒),直接返回 true,表示需要中断当前节点对应的线程
if (ws == Node.SIGNAL)
return true;
// 如果前一个节点的状态大于0,即 已取消 状态,
if (ws > 0) {
// 把前面所有取消状态的节点都从链表中删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
// 这里可以简单地理解为把初始状态0设置为SIGNAL
// CONDITION是条件锁的时候使用的
// PROPAGATE是共享锁使用的
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 返回是否已中断
return Thread.interrupted();
}
整体逻辑如下:
调用
acquireQueued
再次尝试获取锁,如果成功了,直接返回;如果再次失败,再调用
shouldParkAfterFailedAcquire
将节点的等待状态置为等待唤醒(SIGNAL
);调用
parkAndCheckInterrupt
阻塞当前线程;如果被唤醒了,会继续在
acquireQueued
的for
循环再次尝试获取锁,如果成功了就返回;如果不成功,再次阻塞,重复(1)(2)(3)直到成功获取到锁。
举个例子:假如有一个线程已经获取到锁,第二个线程再去请求这个锁。
第二个线程去请求锁的时候,首先通过
tryAcquire()
方法去尝试获取锁,会返回false
;然后通过
addWaiter()
方法去添加一个尾部的节点,这个时候队列还没有初始化会添加一个空的节点同时指向head
和tail
;再然后调用
acquireQueued()
方法尝试获取锁,这个方法是一个死循环,第一次循环首先判断前一个节点是否是head
,这种情况下就是head
,所以还会再次通过tryAcquire()
方法去尝试获取锁,假定第一个线程还没释放锁,这次获取锁失败,然后进入shouldParkAfterFailedAcquire()
方法去判断是否需要阻塞当前线程,由于head
节点默认的waitStatus
是 0,所以会被修改成SIGNAL
。同时进入下次循环,假定第一个线程依旧没释放锁,还是获取失败,又进入shouldParkAfterFailedAcquire()
方法,这次由于head
的状态被上次修改成SIGNAL
了,所以会返回true
,然后会进入parkAndCheckInterrupt()
方法完成当前线程的阻塞。然后进入第三次循环,我们假定这次获取锁成功,当前线程的Node
就会被修改成head
,然后返回lock()
方法,最终获取锁成功。
那么,CANCELLED
和 SIGNAL
状态的节点是怎么生成的呢?来看下面的代码。
2.2.4 获取锁异常后取消获取锁
acquireQueued
方法有个 try/catch
块,出现异常后会进入到异常处理逻辑。第一步就是调用 cancelAcquire
方法。
提前声明:
cancelAcquire
方法是出现异常后才会执行的,那么try
代码块执行过程中哪里会抛出异常呢?整个流程看下来,只有两个地方有可能:predecessor()
、tryAcquire()
,看下源码,都会抛出异常。
1、node.predecessor()
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
虽然这里有抛出异常的代码,但是这段代码永远不会执行到,因为方法 predecessor
是入队后执行的。注释里也有这样一句话 The null check could be elided, but is present to help the VM
。
2、tryAcquire()
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcqiure()
方法是 AQS
提供给子类重写的,它本身是直接抛出异常的。对于 ReentrantLock
来说,tryAcqiure()
方法也不可能出现异常。
第一,假设
ReentrantLock
没有重写tryAcquire
方法,那程序在执行acquireQueued()
方法前就会抛出UnsupportedOperationException
异常,根本执行不到。不会有线程拿到锁,不会有线程因抢不到锁入队列。第二,不论是非公平锁还是公平锁,
ReentrantLock
中的重写的tryAcquire
方法都不会出现异常。
综上,
ReentrantLock
中并不会触发cancelAcquire
方法,再广一点,JDK
的AQS
源码不会触发。但是别忘了,AQS
是模版方法设计模式,tryAcquire()
是需要定义者实现的,JDK
自带的对应实现不会触发,我们自己重写就可能出现异常了,AQS
的作者不会知道你的代码会怎么写,为了代码的健壮性,所以才做异常处理。再者,tryAcquire()
以及tryAcquireShared()
已经在方法文档里明确指出了可能会抛出IllegalMonitorStateException
, 代码必然要针对异常情况处理。
好了,虽然知道 ReentrantLock
实际不会触发 cancelAcquire
方法,但是我们还是来分析一下:
private void cancelAcquire(Node node) {
// 如果当前节点为 null,直接返回
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// 向前遍历,跳过取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 将当前节点设置为 取消 状态
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入 else,如果更新成功,将 tail 的后继节点设置为 null
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// 当前节点不是尾节点或更新尾节点失败(有其他线程节点入队且更新成功)
int ws;
// 如果当前节点不是head的后继节点
// 1. 判断当前节点前驱节点的是否为SIGNAL 2. 如果不是,则把前驱节点设置为SIGNAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为 null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 当前节点是head的后继节点,或者前面的条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总体流程如下:
获取当前节点的前驱节点,如果前驱节点的状态是
CANCELLED
,那就一直往前遍历,找到第一个waitStatus <= 0
的节点,将找到的prev
节点和当前node
关联,将当前node
设置为CANCELLED
。然后,根据当前节点在队列中的位置,需要做不同的处理:
如果当前节点是
n5
节点(node == tail
),且更新尾节点成功。假设n3
和n4
都是取消节点,则最终队列如下:n3
、n4
节点也是取消节点,说明它们也在执行cancelAcquire
方法,但是n5
节点先一步执行判断逻辑。n3,n4
后一步执行的逻辑会使得它们没有没引用了,那么下次GC
时会被回收。如果当前节点是
n3
,其他节点都是不取消状态,则最终队列如下:那么
n3
节点何时被回收呢?要等n2
节点释放锁的时候,n2
节点释放锁的时候会调用LockSupport.unpark
方法唤醒n4
节点,n4
节点会重新执行acquireQueued
方法中的for
代码块,并获取锁成功,调用setHead
方法把自己设为头结点。private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
注意
node.prev = null;
,将n4
指向n3
的prev
指针设置为null
,这个时候n3
没有任何地方被引用,可以被JVM
回收了。如果当前节点是
n2
,则队列如下:n4
唤醒后,会重新执行acquireQueued
方法中的for
代码块,但是此前它的前一个节点n3
不是头结点,所以会执行shouldParkAfterFailedAcquire
方法,会把前面所有取消状态的节点都从链表中删除。执行完后队列如下:此时
n2
和n3
节点就可以被JVM
回收了。
注意:
unparkSuccessor
找第一个非CANCELLED
的节点时是从后往前找,为什么呢?原因如下:
先回顾一下 addWaiter()
方法:
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 初始化队列
initializeSyncQueue();
}
}
}
addWaiter()
方法中节点入队并不是原子操作,node.setPrevRelaxed(oldTail);compareAndSetTail(oldTail, node)
这两处可以看作尾节点入队的原子操作,但是 oldTail.next = node;
还没执行,如果这个时候执行了 unparkSuccessor
方法,就没办法从前往后找了,所以需要从后往前找。
此外,在产生 CANCELLED
状态节点的时候,先断开的是 next
指针,prev
指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node
。
2.3 公平锁源码
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
对比非公平锁的 nonfairTryAcquire
方法,可以发现公平锁在获取锁的时候仅仅是多了 !hasQueuedPredecessors()
这个逻辑,来看一下这个方法是干嘛的。
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}
hasQueuedPredecessors
是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 false
,说明当前线程可以争取共享资源;如果返回 true
,说明队列中存在有效节点,当前线程必须加入到等待队列中。
判断头结点
head
是否为null
,如果是,说明等待队列为空,当前线程可以获取锁,返回false
;如果头结点不为空,获取头节点的下一个节点
s
,如果s
为null
,或者s
的waitStatus
大于 0,说明下一个节点可能已经取消了(即不再处于排队状态);如果
(s = h.next) == null
,说明等待队列正在有线程进行初始化,但只是进行到了tail
指向hea
,没有将head
指向tail
,此时队列中有元素,这种情况下也需要将相关线程加入队列中,为了解决极端情况下的并发问题。
如果
s
为null
或者状态不正常,则遍历从tail
开始到head
的所有节点,寻找一个有效的前驱节点p
。有效的前驱节点是指waitStatus
小于或等于 0 的节点。如果找到了有效的前驱节点
s
,且该节点的线程不是当前线程,则说明当前线程后面还有其他线程在排队,因此返回true
。如果没有找到有效的前驱节点,或者前驱节点是当前线程,则返回
false
。
总结:通过对比公平锁和非公平锁
tryAcquire
的代码可以看到,非公平锁的获取略去了!hasQueuedPredecessors()
这一操作,也就是说它不会判断当前线程是否还有前节点在等待获取锁,而是直接去进行锁获取操作。
2.4 释放锁
不管是公平锁还是非公平锁,释放锁的逻辑都是一样的。
// ReentrantLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 如果tryRelease返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
// 这个方法前面分析过,不再赘述
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁的逻辑很简单,相关解释在注释里面说明了。主要看下 release
方法唤醒后续节点的判断条件:h != null && h.waitStatus != 0
,为什么判断条件是 头结点不为空且头结点状态不是初始化节点状态?
h == null
说明队列没有初始化,队列中没有排队节点,没有其他线程在等待获取锁,也就不需要唤醒,返回true
。h != null && h.waitStatus == 0
说明后继节点对应的线程仍在运行中,会自动尝试获取锁,不需要唤醒。h != null && h.waitStatus < 0
说明后继节点可能被阻塞了,需要唤醒。2.2.3
章节最后提到过head
的waitStatus
会被修改成SIGNAL
(值为 -1),head
的后继节点会被阻塞挂起。
2.5 关于中断
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
可以看到在 acquireQueued
获取锁成功且返回 true
时,就会执行 selfInterrupt
方法中断自己这个线程。为什么获取了锁以后还要中断线程呢?因为当线程调用 LockSupport.park();
挂起后,被唤醒,它自己并不知道被唤醒的原因,因此通过 Thread.interrupted()
方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 false
)。
如果是因为其他线程调用
LockSupport.unpark
而唤醒,此时acquireQueued
返回false
,表明不需要中断当前线程。如果是因为当前线程在等待中被中断(当一个线程被
LockSupport.park()
挂起时,如果在挂起期间该线程接收到中断请求,即其他线程调用了interrupt()
,那么park()
方法会立即返回),此时线程的中断状态会被设置为true
,acquireQueued
方法抢到锁后最终也会返回true
。但是注意,即使park()
返回了,线程还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
评论区