ReentrantLock原理探究(一)

前言

ReentrantLock类是synchronized语义的替代品,可以实现与其相同的功能,了解其实现原理对并发编程无疑是很有帮助的。其次,ReentrantLock 的实现基础AQS(AbstractQueuedSynchronizer)也是Java并发编程中相当重要的一个类,所以无论如何,我们都要了解一番。

一. 用法及概念

1. 用法

ReentrantLock(可重入锁)由java1.5引入,被用来实现synchronized关键字语义。这样就可以从代码级别而不是语言层面实现锁的定义。Lock 是其接口,规范了若干作为锁必须实现的方法。

下面使用lock 来保证i++ 操作的线程安全。

1
2
3
4
5
6
7
8
9
10
11
private void addUseLock() {
Lock lock = new ReentrantLock();
//在进行操作前先锁定
lock.lock();
try {
flag++;
} finally {
//操作结束后一定要在finally中释放锁,否则可能导致死锁
lock.unlock();
}
}

当然,我们也可以用synchronized语义实现

1
2
3
private synchronized void addUseSync() {
flag++;
}

Lock 可以创建Condition(条件变量),用来实现wait() 、 notify()语义。从而控制线程间的通信。Condition 中的awaitwait类似,signal则相当于notify。两种机制使用也很相似,都必须在获取锁的情况下操作

下面模拟了Condition 的使用,CountDownLatch 用来使主线程等待两个工作线程结束,可以暂不研究。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Test
public void test() {

Lock lock = new ReentrantLock();

//创建一个条件
Condition condition = lock.newCondition();

CountDownLatch countDownLatch = new CountDownLatch(2);

/**
* 线程A遍历到20后自我阻塞,等待线程B唤醒
*/
new Thread(() -> {
try {
lock.lock();
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
if (i == 20) {
condition.await();
}
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
e.printStackTrace();
} finally {
lock.unlock();
}

countDownLatch.countDown();

}, "ThreadA").start();

/**
* 线程B休眠2秒后唤醒线程A
*/
new Thread(() -> {
try {
lock.lock();
TimeUnit.SECONDS.sleep(2);
System.out.println("唤醒线程");
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
countDownLatch.countDown();

},"ThreadB").start();

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

2. 公平锁与非公平锁

ReentrantLock 有一个很重要的概念就是锁的公平性。所谓公平锁,就是使线程按照请求锁的顺序依次获得锁,反之,就是非公平的。也就是说,如果锁是非公平的,有的线程可能一直不断获取锁,而有的线程可能一直获取不到。而公平锁则不会,它会按请求顺序依次分配。

那为什么要设计非公平锁呢?原因是效率问题。处于CPU调度考虑,采用公平锁会消耗一些性能保证线程调度公平和同步,而且对于重复获取锁的操作中,某个线程连续获取锁的概率是很高的,而公平锁则遏制了这一点。所以,如果线程的执行顺序对你的程序不重要的话,最好使用非公平锁。另外,synchronized内置锁也是非公平锁。

1
2
3
4
5
//默认为非公平锁
Lock lock = new ReentrantLock();

//通过构造函数创建公平锁
Lock lock2 = new ReentrantLock(true);

3. 可重入性

从名字就可以看出,ReentrantLock 是可重入锁。即一个线程获取该锁后可以在后续过程中多次获取该锁,以避免被自己锁死的情况。这个语义synchronized也支持。

二、类结构解析

下面我们深入ReentrantLock 类结构分析一下

1. ReentrantLock类结构

我们刚才提到,ReentrantLock 实现了Lock接口。另外,它还有3个内部类。分别是SyncNonfairSyncFairSyncSync 是一个抽象的内部类,代表锁的基本底层实现。后两者分别是对非公平锁和公平锁的实现。下面来看该类的类继承关系和主要方法。

图中可以看到上面提到的三个内部类。深入Sync类结构,看看它的结构图

Sync类结构中,AbstractQueuedSynchronizer类(简称AQS)有着相当重要的地位。不仅如此,AQS其实是整个java并发包的基础。各个并发工具类如Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask 都是根据自己本身需要在AQS基础上做的定制实现。

这样一来,我们就了解了AQS与ReentrantLock的相互关系。这种实现模式其实是模板模式的典型应用。父类负责制定流程,留下若干接口交给子类具体实现。

2. AQS类概览

AbstractQueuedSynchronizer类结构比较复杂,具体源码工作我们在下面章节分析,这里先简单看看该类主要结构和方法。

AQS的内部类有2个,ConditionObjectNodeConditionObject 实现Condition接口,用于实现前面提到的条件变量await/singal

Node为等待队列的节点类。AQS实现依赖一个先进先出的队列,而Node类即是这个队列的节点。用于保存阻塞的线程引用和线程状态。

Node类主要字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

static final class Node {
//节点状态
volatile int waitStatus;

//前置节点
volatile Node prev;

//后继节点
volatile Node next;

//该节点保存的线程
volatile Thread thread;

//该队列下一个等待者
Node nextWaiter;
}

说完AQS的两个内部类,下面了解下AQS的主要字段

1
2
3
4
5
6
7
8
9
10
11
//队列头结点
private transient volatile Node head;

//队列尾结点
private transient volatile Node tail;

//同步状态,0表示未锁,>0表示某线程锁了多少次
private volatile int state;

//用于保存独占模式下的当前线程
private transient Thread exclusiveOwnerThread;

没有实例理解起来比较吃力,先不研究这个,下步解析源码时再解析。先看看AQS主要方法。

说明:获取、释放锁的若干方法中,带Shared的是共享方式,否则是以独占方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

//获取锁
public final void acquire(int arg);
public final void acquireShared(int arg);

//释放锁
public final boolean release(int arg);
public final boolean releaseShared(int arg);

//获取、设置状态
protected final int getState();
protected final void setState(int newState);

//以原子方式设置值
protected final boolean compareAndSetState(int expect, int update);

private static final boolean compareAndSetWaitStatus(Node node,int expect,int update);

private final boolean compareAndSetHead(Node update);

private final boolean compareAndSetTail(Node expect, Node update);


//尝试获取、释放锁(这些方法留在子类实现)
protected boolean tryAcquire(int arg);

protected int tryAcquireShared(int arg);

protected boolean tryReleaseShared(int arg)

protected boolean tryRelease(int arg);

AQS主要方法如上。以获取锁为例,acquire()方法定义了获取锁的主要流程,其中tryAcquire()由子类根据需要定制。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

说明:

  1. ReentrantLock使用了独占的方式获取释放锁,主要用到了tryAcquiretryRelease

  2. 方法中的compareAndSetXXX(arg0,arg1)调用底层CAS原义。即为若内存中该值为arg0,则将其设为arg1。这个设置是原子的,不会中断。

三、源码解析

终于到解析源码阶段了。

1.lock()方法

先分析非公平锁的lock方法。

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

流程很简单,若state为0(即当前没有线程争用),将其设为1。同时把当前线程写入exclusiveOwnerThread字段,表示该线程独占锁。

否则(此时表明这个锁已经被某个线程占了,我们假设为线程1,而当前线程为线程2),调用acquire()

再往下分析前先想想,既然是独占锁且锁已经被占了,那处理流程应该怎样? 大致流程应该是把该线程放在某个阻塞队列中,等待前面线程执行完后(调用unlock()),通知该线程获取锁去执行。下面来验证一下。

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. 先执行tryAcquire(),成功则说明获取锁成功了,不再执行。
  2. 若失败执行acquireQueued(),执行后返回一个值,用于判断是否需要自中断

流程就这样,在非公平锁中,tryAcquire()调用nonfairTryAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取状态
int c = getState();
//若为0,表明前面线程1已执行完,当前线程(线程2)执行获取锁操作
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//表明获取锁的线程是当前线程,即锁重入,status即锁重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果线程1还持有锁,线程2执行tryAcquire()会直接返回false,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)判断条件。

按猜想,这时应该把线程2(当前线程)加到一个阻塞队列存起来,等待线程1执行完后再尝试获取锁。观察代码,该执行addWaiter()方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
//构建当前线程节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

以线程2(当前线程)创建一个节点,由于此时阻塞队列为空,直接走enq(node)入队操作。

说明:参数mode表示队列的模式,这里传的值是Node.EXCLUSIVE,表示互斥锁,还有一个变量是SHARED,表示共享锁。

看看enq()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 判断尾节点是否为空,为空表示队列无值
if (t == null) { // Must initialize
//设置头节点
if (compareAndSetHead(new Node()))
//设置尾节点
tail = head;
} else {
//尾节点不为空,将当前线程节点入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

方法里面是个for()循环。此时队列为空,走if逻辑,设置队列头、尾节点。然后再循环一次,此时队列不为空,走else,将当前线程节点加入队列,注意形成的是双向队列,而且是头节点为空节点的双向队列。

下面该是acquireQueued()方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//取出当前节点的前一节点,记为p
final Node p = node.predecessor();
//若p是头节点,再次尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取失败,执行`shouldParkAfterFailedAcquire()`方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

主要是一个循环两个if。第一个if里,判断当前线程节点前一个节点是否是head(因为head节点总指向空节点),若是表明该节点是阻塞队列第一个节点,这时再次尝试获取锁。成功重设队列head节点,返回。失败走第二个if。

这里涉及node.predecessor()、setHead(node)两个方法。node.predecessor()返回的是node节点的prev节点。setHead(node)将node设为head节点,thread、prev属性置空。

第二个if,执行shouldParkAfterFailedAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

waitStatus表示节点状态,有以下几种状态

属性 说明
CANCELLED 1 表示当前的线程被取消,处于这种状态的Node会被踢出队列,被GC回收
SIGNAL -1 表示当前节点的后继节点表示的线程需要解除阻塞并执行
CONDITION -2 表示这个节点在条件队列中,因为等待某个条件而被阻塞
PROPAGATE -3 使用在共享模式可能处于此状态,表示后续节点能够得以执行
初始状态 0 表示当前节点在sync队列中,等待着获取锁。

此方法中prev参数是node参数的前置节点。在我们的例子中,prev是head节点。因未赋值,waitStatus为0,走compareAndSetWaitStatus(pred, ws, Node.SIGNAL), 将prev的waitStatus值设为Node.SIGNAL,即-1。返回flase

继续循环,获取不到锁后仍走第二个if,此时shouldParkAfterFailedAcquire()返回true,进parkAndCheckInterrupt()

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

调用 LockSupport.park(this)将当前线程锁住。最终调用的是Unsafe类的park()方法。Unsafe类提供了一些java底层硬件级别的原子操作。可以提供分配内存、修改对象内存位置、挂起恢复线程等操作。我们暂且不研究这个。

篇幅问题,这篇文章先到这里,等待下篇再来分析公平锁与非公平锁以及unlock过程。

——————未完待续————————

参考资料

  1. JDK 5.0 中更灵活、更具可伸缩性的锁定机制
  2. ReentrantLock实现原理深入探究
  3. AbstractQueuedSynchronizer的介绍和原理分析
  4. 深入浅出 Java Concurrency (8): 锁机制 part 3
分享到:
0%