再谈生产者消费者模式与阻塞队列
前言
在Wait/Notify通知机制解析文章中,介绍了生产者消费者模式及其应用,而阻塞队列的自身特点也适合生产者消费者。本文即探讨如何一步步用阻塞队列构建生产者、消费者模式。
使用普通队列
使用普通队列构建生产者消费者最需要考虑的问题是,如何保证队列在添加、移除操作时的线程安全。我们本例使用Lock/Condition机制确保。
从实现来说,原生
synchronized
+wait\notify
也能实现相同的功能,不过Lock机制具有更大灵活性,更推荐使用。
1 |
|
当生产者超过cap(任务队列最大值)时,阻塞以等待消费者消费;当消费者消费完任务后,阻塞以等待生产者生产。受篇幅限制,全部代码放于github上。
构建阻塞队列
使用普通队列+Lock/Condition机制已初步实现了要求。为简洁,可以将加锁、解锁等同步机制移到队列里实现,即构成了阻塞队列。上述示例即是一个简单的阻塞队列。
另外,仔细思考上面示例,会发现生产者、消费者在调用await阻塞时等待着同一个condition条件。理论上不会出现生产者、消费者同在等待队列的情况,但为结构清晰,一般(对于数组结构的队列)使用两个等待队列实现。
我们知道,synchronized的对象锁一个对象只能关联一个等待队列,而Lock机制则可以关联多个。可以分别为生产者和消费者分别关联各自的等待队列,
ArrayBlockingQueue
就是这么做的。
ArrayBlockingQueue 有关锁的声明
1 |
|
这样整体构造如下图所示
.png)
下面就用ArrayBlockingQueue
来构建生产者消费者
1 | private int cap = 100; |
ArrayBlockingQueue 实现简析
ArrayBlockingQueue
实现原理上文已经提及,即与上面的普通队列类似,不同之处在于ArrayBlockingQueue
使用的是一个锁和其关联的两个等待条件。一个为notEmpty
,表示消费的等待条件(队列没元素可消费了),一个为notFull
,表示生产的等待条件(没空位可生产了)。这里以take()
方法为例简单了解下。
take()
方法可类比消费者消费。含义与前面类似,不同的只是其生产或消费阻塞时用了各自的等待条件。
1 | public E take() throws InterruptedException { |
使用链表式的阻塞队列
上面我们实现了生产者、消费者模式,这样实现的一大硬伤在于:同一时刻只能有一个生产者或消费者操作队列,而生产和消费本就是不相关的操作。两者能各自操作吗?
对于数组来说显然是不能的,本身即一个整体无法同时线程安全的插入和删除。不过可以使用链表:对于添加只在尾指针操作;对于删除则在头指针操作。这样即可以同时添加和删除,互不影响。
链表式阻塞队列的简要实现(代码见github),具体说明见注释
1 |
|
参考资源
- java8 JDK ArrayBlockingQueue、LinkedBlockingQueue 源码
- Wait/Notify通知机制解析
- Java 实现生产者 – 消费者模型