再谈生产者消费者模式与阻塞队列
前言
在Wait/Notify通知机制解析文章中,介绍了生产者消费者模式及其应用,而阻塞队列的自身特点也适合生产者消费者。本文即探讨如何一步步用阻塞队列构建生产者、消费者模式。
使用普通队列
使用普通队列构建生产者消费者最需要考虑的问题是,如何保证队列在添加、移除操作时的线程安全。我们本例使用Lock/Condition机制确保。
从实现来说,原生
synchronized
+wait\notify
也能实现相同的功能,不过Lock机制具有更大灵活性,更推荐使用。
static final Lock lock = new ReentrantLock(); //锁
static final Condition condition = lock.newCondition(); //等待条件
//使用ArrayDeque作为任务队列,你也可以自定义一个队列
static final Queue<Task> queue = new ArrayDeque<>();
// 其他变量略
//消费者线程
static class Consume implements Runnable {
@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == 0) { //若任务队列为空则等待
condition.await();
}
Task task = queue.poll(); //取出任务消费
System.out.println("模拟消费:" + task.no);
condition.signal(); //通知生产者已消费
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
TimeUnit.MILLISECONDS.sleep(200); //暂停200ms休息
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者线程
static class Produce implements Runnable {
@Override
public void run() {
lock.lock(); //加锁
try {
while (queue.size() == cap) { //若达到边界值则等待
condition.await();
}
Task task = new Task(number.incrementAndGet()); //生产任务
queue.add(task);
condition.signal(); //通知消费者已生产
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
try {
TimeUnit.MILLISECONDS.sleep(500); //模拟生产流程,等待200毫秒生产一个
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
当生产者超过cap(任务队列最大值)时,阻塞以等待消费者消费;当消费者消费完任务后,阻塞以等待生产者生产。受篇幅限制,全部代码放于github上。
构建阻塞队列
使用普通队列+Lock/Condition机制已初步实现了要求。为简洁,可以将加锁、解锁等同步机制移到队列里实现,即构成了阻塞队列。上述示例即是一个简单的阻塞队列。
另外,仔细思考上面示例,会发现生产者、消费者在调用await阻塞时等待着同一个condition条件。理论上不会出现生产者、消费者同在等待队列的情况,但为结构清晰,一般(对于数组结构的队列)使用两个等待队列实现。
我们知道,synchronized的对象锁一个对象只能关联一个等待队列,而Lock机制则可以关联多个。可以分别为生产者和消费者分别关联各自的等待队列,
ArrayBlockingQueue
就是这么做的。
ArrayBlockingQueue 有关锁的声明
/** 锁对象 */
final ReentrantLock lock;
/** 等待take的等待条件对象 */
private final Condition notEmpty;
/** 等待put操作的等待条件对象 */
private final Condition notFull;
//由同一锁关联的等待条件
notEmpty = lock.newCondition();
notFull = lock.newCondition();
这样整体构造如下图所示
下面就用ArrayBlockingQueue
来构建生产者消费者
private int cap = 100;
//使用ArrayBlockingQueue作为阻塞队列
private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(cap);
private AtomicInteger taskNo = new AtomicInteger(0);
//消费者线程
class Consume implements Runnable {
@Override
public void run() {
try {
Task task = queue.take(); //消费出队,阻塞队列本身就可确保线程安全
System.out.println(task.no); //模拟消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者线程
class Produce implements Runnable {
@Override
public void run() {
Task task = new Task(taskNo.getAndIncrement());
try {
queue.put(task); //生产入队,阻塞队列确保线程安全
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ArrayBlockingQueue 实现简析
ArrayBlockingQueue
实现原理上文已经提及,即与上面的普通队列类似,不同之处在于ArrayBlockingQueue
使用的是一个锁和其关联的两个等待条件。一个为notEmpty
,表示消费的等待条件(队列没元素可消费了),一个为notFull
,表示生产的等待条件(没空位可生产了)。这里以take()
方法为例简单了解下。
take()
方法可类比消费者消费。含义与前面类似,不同的只是其生产或消费阻塞时用了各自的等待条件。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //锁对象
lock.lockInterruptibly(); //加锁,可中断
try {
while (count == 0)
notEmpty.await(); //若队列为空,take操作等待
return dequeue();
} finally {
lock.unlock();
}
}
// 出队
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 唤醒可能阻塞的生产者
return e;
}
使用链表式的阻塞队列
上面我们实现了生产者、消费者模式,这样实现的一大硬伤在于:同一时刻只能有一个生产者或消费者操作队列,而生产和消费本就是不相关的操作。两者能各自操作吗?
对于数组来说显然是不能的,本身即一个整体无法同时线程安全的插入和删除。不过可以使用链表:对于添加只在尾指针操作;对于删除则在头指针操作。这样即可以同时添加和删除,互不影响。
链表式阻塞队列的简要实现(代码见github),具体说明见注释
private Lock takeLock = new ReentrantLock();
private Condition takeCondition = takeLock.newCondition();
private Lock putLock = new ReentrantLock();
private Condition putCondition = putLock.newCondition();
/**
* 入队,若队列满则等待
*
* @param e 入队元素
*/
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
Node<E> node = new Node<>(e);
int c = -1;
takeLock.lockInterruptibly(); //takeLock,添加元素的锁
try {
while (count.get() == capacity) { //若队列满,阻塞以等待
takeCondition.await();
}
enqueue(node);
c = count.incrementAndGet(); //更新队列元素数
if (c < capacity) {
takeCondition.signal(); //若入队后发现还有空位,通知其他阻塞的入队线程(若有)
}
} finally {
takeLock.unlock();
}
if (c == 1) { //若入队前队列为空,则通知被阻塞的出队线程,现在可以出队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
}
/**
* 出队,若无元素一直等待
*
* @return 出队元素
*/
public E take() throws InterruptedException {
takeLock.lock(); //takeLock,移除元素的锁
E e = null;
int c = -1;
try {
while (count.get() == 0) { //队列为空,移除操作阻塞
takeCondition.await();
}
e = dequeue();
c = count.decrementAndGet(); //更新队列元素数
if (c > 0) { //若出队后仍有元素,通知其他被阻塞的出队线程(若有)
takeCondition.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity - 1) { //若出队前队列已满,通知阻塞的入队线程,现在可以入队了
putLock.lockInterruptibly();
try {
putCondition.signal();
} finally {
putLock.unlock();
}
}
return e;
}
参考资源
- java8 JDK ArrayBlockingQueue、LinkedBlockingQueue 源码
- Wait/Notify通知机制解析
- Java 实现生产者 – 消费者模型