ReentrantLock 源码分析 - 加锁
SunRan

前言

本系列文章通过日常中比较常见 ReentrantLock 开始,逐步揭开AQS的底层。

lock()

1
2
3
4
5
6
7
ReentrantLock lock = new ReentrantLock();
try{
lock.lock();
// 业务...
}finally {
lock.unlock();
}

日常我们会这样使用 ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.concurrent.locks
final void lock() {
// 首先做了一个判断
// 比较并替换(CAS) 如果期望值为0则替换为1
if (compareAndSetState(0, 1))
// 如果替换成功
// 表示已经拿到了锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则将执行一个获取锁的操作
acquire(1);
}

上面代码中 compareAndSetState  、 setExclusiveOwnerThread 、 acquire 三个方法都是AQS下的。

acquire()

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node addWaiter(Node mode) {
// 新建了一个Node对象
Node node = new Node(Thread.currentThread(), mode);
// 拿到尾巴节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 将新创建的节点追加到链表的尾巴上
// 此处同样用到了比较替换
// 这个方法主要是对tailOffset和Expect进行比较
// 如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 最终返回当前节点
return node;
}
}
// 内部是一个死循环,直到设置node为尾节点为止。
enq(node);
return node;
}

关于 compareAndSetTail 方法深挖可以发现他是一个 native 修饰的方法。
高并发下难免会出现多线程并发去追加尾巴节点,防止追加错乱。使用了CAS没有使用其他锁是因为,CAS的颗粒比较小,永远只需要盯着 pred 作比较,无需锁住整个链表。

acquireQueued

上文解释了 addWaiter 方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到 acquireQueued 方法中。 acquireQueued 方法可以对排队中的线程进行“获锁”操作。
总的来说,一个线程获取锁失败了,被放入等待队列, acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始死循环(轮询)
// 直至拿到锁,或者中断
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,自己作为新的头结点
setHead(node);
// p.next 其实就是 本方法传参中的 node
// 设置为null 没有了引用 有利于gc 清理node
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点
// 这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

总结


从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):

从队列中释放节点的疑虑打消了,那么又有新问题了:

  • shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
  • 是在什么时间释放节点通知到被挂起的线程呢?

cancelAcquire

通过cancelAcquire方法,将Node的状态标记为CANCELLED。接下来,我们逐行来分析这个方法的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
  • 本文标题:ReentrantLock 源码分析 - 加锁
  • 本文作者:SunRan
  • 创建时间:2022-01-06 15:51:42
  • 本文链接:https://lksun.cn/2022/01/06/ReentrantLock 源码分析 - 加锁/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论