AbstractQueuedSynchronizer整体解析

AbstractQueuedSynchronizer整体解析

前言

在此之前,我们深入源码分析过ReentrantLock系列,在那里就探讨过AbstractQueuedSynchronizer(下称AQS)类,称其是同步组件乃至整个并发包的基础类。这篇文章就深入AQS,从AQS的角度了解同步器以及ReentrantLock、ReentrantReadWriteLock等的实现机制,实现自定义的同步组件,以窥探整个同步框架的全貌。

AQS及同步器整体介绍

有关类字段及方法的介绍,在ReentrantLock原理探究(一)就已说过,今天我们从整体上来了解AQS。

从AQS类的注释中,我们可以了解到:该类是一个用于构建锁或其他同步器的基础框架,使用一个int的成员变量表示同步状态。另外,还有一个内置的先进先出的队列可储存竞争同步状态时排队的线程。

将上面描述的翻译成通俗的语言就是:有一个共享资源state(int类型的变量),各个线程去竞争这个资源,竞争到的拥有资源,去处理自己的逻辑;没竞争到去排队(进入先进先出队列),等拥有资源的线程释放共享资源后,队列中线程的再去竞争。

一图胜千言,画成流程图就像下面的样子:

有4个线程去竞争同步变量(锁的内在表示)。这里我们假设线程A得到了,其他竞争失败的线程进入同步队列等待,得到同步变量的线程A执行自己的逻辑。执行完毕后通知同步队列的线程再去竞争锁。

AQS基本实现了以上通用的功能,包括获取锁后的同步处理,释放锁后通知事件等。但有关获取、释放锁的条件等业务相关代码留给了子类去实现。即AQS搭好了整体框架,子类去实现某个业务点。以下是2个具体实例。

  1. ReentrantLock,是排他锁,某个线程获取锁后其他线程就会阻塞直至锁的释放。共享资源state初始值为0,表示资源未被占有。某线程访问并设置state为1,表示该线程占有了锁。当其他线程读取到state不为0后进入队列等待,直到占有锁的线程将其设为0后,队列线程才会得到通知,重新竞争锁。(事实上ReentrantLock作为可重入锁,占有锁的线程再次进入锁会使state加1,退出一次state减1,不会把自己锁死)

  2. CountDownLatch,共享锁。可用于控制线程执行、结束的时机。如我们想要主线程在2个子线程执行完后再结束,这时使用CountDownLatch通过构造函数将共享变量state设为2,将主线程锁住,每个子线程结束后state减一,state为0后表示两子线程执行完毕,此时主线程才得以释放。

也即是说,通过AQS,我们将能很简单的实现同步的要求。这也是模板方法模式的运用。

一个简单的锁

根据上面提到的,我们来自制一个独占类型的锁。

根据AQS的建议,实现AQS的类最好为同步器的内部类,外部类方法再去引用其内部类的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

public class MyLock {

private Sync sync = new Sync();

//AQS的子类,由于是独占锁,实现tryAcquire和tryRelease两方法
private static class Sync extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
//若状态为1,说明有其他线程已占有锁,直接返回false
if(getState()==arg){
return false;
}
//若状态为0,将其设为1,表示占有锁
return compareAndSetState(0, arg);
}

@Override
protected boolean tryRelease(int arg) {
//设置状态为0,表示释放锁
setState(0);
return true;
}
}

//加锁方法
public void lock() {
sync.acquire(1);

}

//解锁方法
public void unlock() {
sync.release(1);

}
}

这样我们就实现了一个简单的锁。不过这个锁相比ReentrantLock来说,没有实现可重入性(也没有实现关联条件Condition)。也就是说它会被自己锁死:当某个线程在获取锁后再次尝试获取锁,会导致死锁。不过,实现类似i++的同步倒是可以做到的。

1
2
3
4
5
6
7
8
public void run() {
myLock.lock();
try {
total++;
} finally {
myLock.unlock();
}
}

示例解析

关于以上示例,tryAcquire()tryRelease()两个方法即为子类需实现的模板方法(这是对于独占锁而言,对于共享锁是tryAcquireShared/tryReleaseShared,下文会提到)。其返回值表示锁是否获取、释放成功。

以获取锁为例,acquire是AQS具体获取锁的方法,在其中会调用子类实现的tryAcquire(),并根据返回值进行具体操作。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && //会调用子类的tryAcquire方法,实现不同的acquire含义
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //锁获取失败,加入同步队列等其他操作
selfInterrupt();
}

AQS关于获取、释放锁方法如下

方法 描述
acquire / acquireInterruptibly 独占式获取同步状态,若获取失败,将进入同步队列。后者与前者的区别在于,后者能在同步队列中响应中断
acquireShared / acquireSharedInterruptibly 共享式获取同步状态,后者能响应中断
release 独占式释放同步状态,成功后将同步队列的第一个线程唤醒
releaseShared 共享式释放同步状态

AQS关于同步状态的方法如下

方法 描述
getState 获取同步状态
setState(state) 设置同步状态
compareAndSetState(except,update) 使用CAS设置同步状态,只有当同步状态值为except时,才将其设置update

需要子类实现的方法如下

方法 实现思路
tryAcquire 独占式获取同步状态,实现该方法需要查询当前状态,并判断状态是否符合预期(根据各子类不同功能判断条件各异),然后再根据CAS设置同步状态
tryRelease 独占式释放同步状态
tryAcquireShared 共享式获取同步状态,若返回值大于等于0,表示获取成功,否则表示失败
tryReleaseShared 共享式释放同步状态
isHeldExclusively 在独占模式下,同步状态是否被占用

了解这些知识后再来看上面的例子,尤其是开始展示的那张流程图,对AQS的实现机制应该有了大致了解。你可以尝试实现ReentrantLock试试,需注意的是,可重入锁要保存持有锁的线程,当加锁时,判断当前线程是否持有锁,若持有,直接进入同步块,同时将state加1,当试图释放锁时,将state减1。若state减到0,释放锁。其他过程与其他一致。

Condition条件变量

synchronized配合wait/notify可实现等待通知模式。同样,AQS及其子类也可实现类似语义。这就是AQS的Condition接口。

Condition使用方式与wait/notify类似,都需要在持有锁的情况下调用,都有等待和超时等待,唤醒和全部唤醒。具体操作流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

private Lock lock = new ReentrantLock(); //创建锁

Condition condition = lock.newCondition(); //创建条件

//条件等待
lock.lock(); //先加锁
//条件等待操作
condition.await(); //等待
lock.unlock(); //释放锁

// 条件唤醒
lock.lock(); //先加锁
//唤醒操作
condition.signal(); //唤醒
lock.unlock(); //释放锁

以上使用的是ReentrantLock类作为示例,其他类的条件变量操作与之类似。与前面一样,我们可以简单在AQS子类中简单重写即可实现此功能。

还用上面的MyLock类实现Condition,其他部分省略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MyLock {

private Sync sync = new Sync();

//AQS的子类,由于是独占锁,实现tryAcquire和tryRelease两方法
private static class Sync extends AbstractQueuedSynchronizer {


//判断锁被占有的条件,在ConditionObject的方法中会使用
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}

//ConditionObject类为AQS的成员类,返回ConditionObject实例即可
Condition newCondition() {
return new ConditionObject();
}

//省略其他方法
}

public Condition getCondition(){
return sync.newCondition();
}

//省略其他方法

}

AQS内部有个称为ConditionObject的内部成员类,该类实现了Condition接口,且与AQS的状态相关联。实现Condition功能时只需在子类构建出ConditionObject对象即可。就如MyLock展示的一样。

关于Condition的实现原理,首先需要知道这2点:

  1. 无论调用await或是signal方法,都必须获取到该Condition关联的锁。
  2. Condition持有一个等待队列(不同于上面提到的同步队列,上面的同步队列为AQS类持有,而这个等待队列由Condition(AQS的内部类)持有)

同步队列与等待队列结构图如下所示

我们假设持有锁的线程A调用了await,线程A会进入等待队列,随后释放锁,通知同步队列其他节点去竞争锁。做完这些操作会就等着被其他线程唤醒或超时时间了。

若此时线程B调用了signal,则会从等待队列中取出一个节点加入同步队列并唤醒(也就是将线程A从等待队列移到同步队列)。此时线程A已被B唤醒并处于同步队列中,这时就可以重新竞争锁并执行了。

结语

有关AQS的整体分析就到这了,有时间再来从源码具体实现角度解析。

若本文有不正确之处,还请各位指正。

参考资料

  1. java并发编程的艺术
  2. ReentrantLock原理探究
分享到:
0%