再谈生产者消费者模式与阻塞队列

再谈生产者消费者模式与阻塞队列

前言

Wait/Notify通知机制解析文章中,介绍了生产者消费者模式及其应用,而阻塞队列的自身特点也适合生产者消费者。本文即探讨如何一步步用阻塞队列构建生产者、消费者模式。

使用普通队列

使用普通队列构建生产者消费者最需要考虑的问题是,如何保证队列在添加、移除操作时的线程安全。我们本例使用Lock/Condition机制确保。

从实现来说,原生synchronized+wait\notify也能实现相同的功能,不过Lock机制具有更大灵活性,更推荐使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

static final Lock lock = new ReentrantLock(); //锁

static final Condition condition = lock.newCondition(); //等待条件

//使用ArrayDeque作为任务队列,你也可以自定义一个队列
static final Queue<Task> queue = new ArrayDeque<>();

// 其他变量略

//消费者线程
static class Consume implements Runnable {

@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == 0) { //若任务队列为空则等待
condition.await();
}
Task task = queue.poll(); //取出任务消费
System.out.println("模拟消费:" + task.no);
condition.signal(); //通知生产者已消费

} catch (Exception e) {
e.printStackTrace();

} finally {
lock.unlock();
}
try {
TimeUnit.MILLISECONDS.sleep(200); //暂停200ms休息
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 生产者线程
static class Produce implements Runnable {

@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == cap) { //若达到边界值则等待
condition.await();
}
Task task = new Task(number.incrementAndGet()); //生产任务
queue.add(task);
condition.signal(); //通知消费者已生产

} catch (Exception e) {
e.printStackTrace();

} finally {
lock.unlock(); //解锁
}
try {
TimeUnit.MILLISECONDS.sleep(500); //模拟生产流程,等待200毫秒生产一个
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

当生产者超过cap(任务队列最大值)时,阻塞以等待消费者消费;当消费者消费完任务后,阻塞以等待生产者生产。受篇幅限制,全部代码放于github上。

构建阻塞队列

使用普通队列+Lock/Condition机制已初步实现了要求。为简洁,可以将加锁、解锁等同步机制移到队列里实现,即构成了阻塞队列。上述示例即是一个简单的阻塞队列。

另外,仔细思考上面示例,会发现生产者、消费者在调用await阻塞时等待着同一个condition条件。理论上不会出现生产者、消费者同在等待队列的情况,但为结构清晰,一般(对于数组结构的队列)使用两个等待队列实现。

我们知道,synchronized的对象锁一个对象只能关联一个等待队列,而Lock机制则可以关联多个。可以分别为生产者和消费者分别关联各自的等待队列,ArrayBlockingQueue就是这么做的。

ArrayBlockingQueue 有关锁的声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  

/** 锁对象 */
final ReentrantLock lock;

/** 等待take的等待条件对象 */
private final Condition notEmpty;

/** 等待put操作的等待条件对象 */
private final Condition notFull;

//由同一锁关联的等待条件
notEmpty = lock.newCondition();
notFull = lock.newCondition();

这样整体构造如下图所示

.png)

下面就用ArrayBlockingQueue来构建生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private int cap = 100;

//使用ArrayBlockingQueue作为阻塞队列
private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(cap);

private AtomicInteger taskNo = new AtomicInteger(0);

//消费者线程
class Consume implements Runnable {

@Override
public void run() {
try {
Task task = queue.take(); //消费出队,阻塞队列本身就可确保线程安全
System.out.println(task.no); //模拟消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 生产者线程
class Produce implements Runnable {

@Override
public void run() {
Task task = new Task(taskNo.getAndIncrement());
try {
queue.put(task); //生产入队,阻塞队列确保线程安全
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

ArrayBlockingQueue 实现简析

ArrayBlockingQueue实现原理上文已经提及,即与上面的普通队列类似,不同之处在于ArrayBlockingQueue使用的是一个锁和其关联的两个等待条件。一个为notEmpty,表示消费的等待条件(队列没元素可消费了),一个为notFull,表示生产的等待条件(没空位可生产了)。这里以take()方法为例简单了解下。

take()方法可类比消费者消费。含义与前面类似,不同的只是其生产或消费阻塞时用了各自的等待条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //锁对象
lock.lockInterruptibly(); //加锁,可中断
try {
while (count == 0)
notEmpty.await(); //若队列为空,take操作等待
return dequeue();
} finally {
lock.unlock();
}
}
// 出队
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 唤醒可能阻塞的生产者
return e;
}

使用链表式的阻塞队列

上面我们实现了生产者、消费者模式,这样实现的一大硬伤在于:同一时刻只能有一个生产者或消费者操作队列,而生产和消费本就是不相关的操作。两者能各自操作吗?

对于数组来说显然是不能的,本身即一个整体无法同时线程安全的插入和删除。不过可以使用链表:对于添加只在尾指针操作;对于删除则在头指针操作。这样即可以同时添加和删除,互不影响。

链表式阻塞队列的简要实现(代码见github),具体说明见注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

private Lock takeLock = new ReentrantLock();

private Condition takeCondition = takeLock.newCondition();

private Lock putLock = new ReentrantLock();

private Condition putCondition = putLock.newCondition();


/**
* 入队,若队列满则等待
*
* @param e 入队元素
*/
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
Node<E> node = new Node<>(e);
int c = -1;
takeLock.lockInterruptibly(); //takeLock,添加元素的锁
try {
while (count.get() == capacity) { //若队列满,阻塞以等待
takeCondition.await();
}
enqueue(node);
c = count.incrementAndGet(); //更新队列元素数
if (c < capacity) {
takeCondition.signal(); //若入队后发现还有空位,通知其他阻塞的入队线程(若有)
}
} finally {
takeLock.unlock();
}
if (c == 1) { //若入队前队列为空,则通知被阻塞的出队线程,现在可以出队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
}


/**
* 出队,若无元素一直等待
*
* @return 出队元素
*/
public E take() throws InterruptedException {
takeLock.lock(); //takeLock,移除元素的锁
E e = null;
int c = -1;
try {
while (count.get() == 0) { //队列为空,移除操作阻塞
takeCondition.await();
}
e = dequeue();
c = count.decrementAndGet(); //更新队列元素数
if (c > 0) { //若出队后仍有元素,通知其他被阻塞的出队线程(若有)
takeCondition.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity - 1) { //若出队前队列已满,通知阻塞的入队线程,现在可以入队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
return e;
}

参考资源

  1. java8 JDK ArrayBlockingQueue、LinkedBlockingQueue 源码
  2. Wait/Notify通知机制解析
  3. Java 实现生产者 – 消费者模型
分享到:
0%