ReentrantLock原理探究(二)

前言

上篇ReentrantLock原理探究(一)介绍了ReentrantLock类的使用说明,详细解析了关于非公平锁的lock()过程。这篇我们继续分析。

三、源码解析

2.unlock()方法

公平锁与非公平锁的unlock()方法相同,就不用区别了。unlock()方法调用了release()方法。

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

看看 tryRelease(),这个方法实现在ReentrantLock的内部类Sync而不是分别留在FairSyncNonfairSync中,这也说明了释放锁的过程与锁的公平性无关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
//每次释放锁状态减1,若为0说明锁已释放完毕
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//若锁已释放,将表示占有锁的线程变量设为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

这个方法比较简单,仅仅是检查并设置状态。不过仍需注意的是,释放锁的方法必须由占有锁的线程调用,否则,则会抛出IllegalMonitorStateException异常。

假设线程1已执行完,调用unlock()执行到此,会将表示阻塞线程个数状态的state设置0。返回true,进if语句。

1
2
3
4
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;

此时head节点是没有线程的空节点,其后跟着表示线程2的线程节点。而head节点的waitStatusSIGNAL,值为-1。

进入unparkSuccessor(h)方法,h是队列的头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void unparkSuccessor(Node node) {
/*
若节点`waitStatus`为负值,设置0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
这里有JDK注释,大意是,解锁阻塞队列中的线程。一般情况下是下一个节点(head后面的节点),但若这个节点表示的线程被取消了,则往后遍历直到找到需释放的(waitStatus为负值的)。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//解锁head节点后的第一个等待节点
if (s != null)
LockSupport.unpark(s.thread);
}

LockSupport.unpark(s.thread)方法应该不陌生,上篇中就是用LockSupport.park(this)方法将当前线程锁住的,底层调用的是Unsafe类方法。这里暂不研究。

这里和lock()的过程对接一下,当线程2完成unlock()过程。唤起线程1。

此时线程1在acquireQueued()的循环中,

1
2
3
4
5
6
7
8
9
10
11
12
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

走循环的第一个if,执行完tryAcquire()后线程2已获取锁。这里进if,设置头节点

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

经过设置,阻塞队列重新恢复成只有一个空的头节点的状态。这样获取、释放锁的流程就走完了。其他类似。

3. 公平锁的获取方式

公平锁与非公平锁的解锁流程是一致的,区别只在锁的过程,即FairSynctryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁比非公平的只在hasQueuedPredecessors()方法。此方法会判阻塞队列前面是否有其他线程在等待。

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

其他步骤与非公平锁一致。

4. 线程取消

前面一直提到线程取消,这里解析一下

acquireQueued()方法中finally代码段是处理异常发送后取消线程的,主要调用了cancelAcquire(node)。node为要取消的线程表示的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

private void cancelAcquire(Node node) {
//节点为空直接返回
if (node == null)
return;

node.thread = null;

// 跳过被取消的(即waitStatus大于0)节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

//node前置节点的后置节点,不一定是node
Node predNext = pred.next;

// 将节点状态改为取消状态
node.waitStatus = Node.CANCELLED;

//若节点是尾节点,直接设为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//前置节点不是head,表明该节点前有阻塞的线程
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//连接该节点的前置节点和后续节点,即去掉该节点
compareAndSetNext(pred, predNext, next);
} else { //前置节点是head
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

注释都说差不多了,最后调用的unparkSuccessor前面解锁过程已经提过。可以发现,如果外界有异常导致当前线程被取消时,会设置状态,并根据该节点所在位置恰当退出队列。

四、Condition研究

上篇示例中演示了Condition()的用法,这里简要分析一下它的底层。

1. 类结构

AQS内部类ConditionObject实现了Condition接口,主要有下列两个字段.

1
2
3
4
5
//条件队列第一个节点
private transient Node firstWaiter;

//条件队列最后一个节点
private transient Node lastWaiter;

2.await()方法

await()方法需结合Lock类使用,上篇示例中已经提过,此方法与Object.wait()功能相近,阻塞当前线程,直到调用相同锁的signal()方法。

await()方法有众多版本,如awaitNanos(long nanosTimeout)、awaitUntil(Date date)、await(long time, TimeUnit unit)。这里以await()为例。

为表述方便,我们以Condition测试用例 为实例,有线程A、B两个线程。线程A遍历到20调用await()阻塞,线程B休眠2秒唤醒线程A。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void await() throws InterruptedException {
//显式检查中断
if (Thread.interrupted())
throw new InterruptedException();
//用当前线程构造等待条件节点
Node node = addConditionWaiter();
//当前线程释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在阻塞队列中,将自己锁住
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//以下是被唤醒后重新竞争锁,与lock过程类似
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

await()方法为暂停该线程,等待唤醒。线程A调用后会阻塞在上面的while循环中。等待唤醒。

首先会检查中断标志,这也是await()方法会响应中断的实现点。addConditionWaiter()以当前线程为节点,fullyRelease()释放当前线程持有的锁,内部调用的是release()。再判断节点是否在队列中,若不在,在循环中阻塞该线程。循环后面的语句是经signal()唤醒后竞争锁的过程。与lock就关联起来了。

3.signal()方法

signal()主要作用就是唤醒被await的线程。与await()一样,该方法必须在获取锁的情况下使用。

1
2
3
4
5
6
7
8
public final void signal() {
//当前线程是否是锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

主要处理逻辑在doSignal(first)first是条件队列的首节点。

1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//遍历找到第一个可以释放的节点(不能是被取消的)
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

看看transferForSignal(first)

1
2
3
4
5
6
7
8
9
10
11
12
13
final boolean transferForSignal(Node node) {
//
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// 将要唤醒的线程节点加入阻塞队列等待唤醒
Node p = enq(node);
int ws = p.waitStatus;
//该节点被取消,直接释放解锁
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

总结一下signal() 的流程。

线程A已被阻塞(处于while循环),此时在等待队列中(由Condition维护,不同于AQS的同步阻塞队列)。线程B调用signal()后线程A从等待队列取出,放到同步阻塞队列。此时正好破解了线程A的while循环。

1
2
3
// await()方法

while (!isOnSyncQueue(node)) //是否处于阻塞队列

这样一来流程就清晰了。等线程B执行完后(执行完unlock()释放锁),由于同步阻塞队列没有其他阻塞线程,线程A就可获取锁继续执行了。

相反,假设阻塞队列有排队的线程,即不止一个线程(这里只有线程A)等待释放,走await()后面的竞争锁过程,其实是lock阻塞的过程,这里就不提了。

参考文章

  1. 怎么理解Condition
  2. ReentrantLock实现原理深入探究
  3. AbstractQueuedSynchronizer的介绍和原理分析
  4. Java线程(九):Condition-线程通信更高效的方式
分享到:
0%