再谈生产者消费者模式与阻塞队列

Posted by 梧桐和风的博客 on January 21, 2018

再谈生产者消费者模式与阻塞队列

前言

Wait/Notify通知机制解析文章中,介绍了生产者消费者模式及其应用,而阻塞队列的自身特点也适合生产者消费者。本文即探讨如何一步步用阻塞队列构建生产者、消费者模式。

使用普通队列

使用普通队列构建生产者消费者最需要考虑的问题是,如何保证队列在添加、移除操作时的线程安全。我们本例使用Lock/Condition机制确保。

从实现来说,原生synchronized+wait\notify也能实现相同的功能,不过Lock机制具有更大灵活性,更推荐使用。


   static final Lock lock = new ReentrantLock(); //锁

   static final Condition condition = lock.newCondition(); //等待条件

   //使用ArrayDeque作为任务队列,你也可以自定义一个队列
   static final Queue<Task> queue = new ArrayDeque<>(); 

   // 其他变量略

   //消费者线程
   static class Consume implements Runnable {

        @Override
        public void run() {
            lock.lock();  //加锁
            try {
                while (queue.size() == 0) {  //若任务队列为空则等待
                    condition.await();
                }
                Task task = queue.poll();   //取出任务消费
                System.out.println("模拟消费:" + task.no);
                condition.signal();  //通知生产者已消费

            } catch (Exception e) {
                e.printStackTrace();

            } finally {
                lock.unlock();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(200); //暂停200ms休息
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } 
       
    // 生产者线程
    static class Produce implements Runnable {

        @Override
        public void run() {
            lock.lock();  //加锁
            try {
                while (queue.size() == cap) {  //若达到边界值则等待
                    condition.await();
                }
                Task task = new Task(number.incrementAndGet());  //生产任务
                queue.add(task);
                condition.signal();  //通知消费者已生产
              
            } catch (Exception e) {
                e.printStackTrace();

            } finally {
                lock.unlock();      //解锁
            }
            try {
                TimeUnit.MILLISECONDS.sleep(500);  //模拟生产流程,等待200毫秒生产一个
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

当生产者超过cap(任务队列最大值)时,阻塞以等待消费者消费;当消费者消费完任务后,阻塞以等待生产者生产。受篇幅限制,全部代码放于github上。

构建阻塞队列

使用普通队列+Lock/Condition机制已初步实现了要求。为简洁,可以将加锁、解锁等同步机制移到队列里实现,即构成了阻塞队列。上述示例即是一个简单的阻塞队列。

另外,仔细思考上面示例,会发现生产者、消费者在调用await阻塞时等待着同一个condition条件。理论上不会出现生产者、消费者同在等待队列的情况,但为结构清晰,一般(对于数组结构的队列)使用两个等待队列实现。

我们知道,synchronized的对象锁一个对象只能关联一个等待队列,而Lock机制则可以关联多个。可以分别为生产者和消费者分别关联各自的等待队列,ArrayBlockingQueue就是这么做的。

ArrayBlockingQueue 有关锁的声明

  
 
   /** 锁对象 */
    final ReentrantLock lock;

    /** 等待take的等待条件对象 */
    private final Condition notEmpty;

    /** 等待put操作的等待条件对象 */
    private final Condition notFull;
    
    //由同一锁关联的等待条件
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();

这样整体构造如下图所示

下面就用ArrayBlockingQueue来构建生产者消费者

    private int cap = 100;

    //使用ArrayBlockingQueue作为阻塞队列
    private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(cap);  

    private AtomicInteger taskNo = new AtomicInteger(0);

     //消费者线程
    class Consume implements Runnable {

        @Override
        public void run() {
            try {
                Task task = queue.take();  //消费出队,阻塞队列本身就可确保线程安全
                System.out.println(task.no);  //模拟消费
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 生产者线程
    class Produce implements Runnable {

        @Override
        public void run() {
            Task task = new Task(taskNo.getAndIncrement());
            try {
                queue.put(task);  //生产入队,阻塞队列确保线程安全
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

ArrayBlockingQueue 实现简析

ArrayBlockingQueue实现原理上文已经提及,即与上面的普通队列类似,不同之处在于ArrayBlockingQueue使用的是一个锁和其关联的两个等待条件。一个为notEmpty,表示消费的等待条件(队列没元素可消费了),一个为notFull,表示生产的等待条件(没空位可生产了)。这里以take()方法为例简单了解下。

take()方法可类比消费者消费。含义与前面类似,不同的只是其生产或消费阻塞时用了各自的等待条件。

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock; //锁对象
        lock.lockInterruptibly(); //加锁,可中断
        try {
            while (count == 0)
                notEmpty.await();  //若队列为空,take操作等待
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    // 出队
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();   // 唤醒可能阻塞的生产者
        return e;
    }

使用链表式的阻塞队列

上面我们实现了生产者、消费者模式,这样实现的一大硬伤在于:同一时刻只能有一个生产者或消费者操作队列,而生产和消费本就是不相关的操作。两者能各自操作吗?

对于数组来说显然是不能的,本身即一个整体无法同时线程安全的插入和删除。不过可以使用链表:对于添加只在尾指针操作;对于删除则在头指针操作。这样即可以同时添加和删除,互不影响。

链表式阻塞队列的简要实现(代码见github),具体说明见注释


    private Lock takeLock = new ReentrantLock();

    private Condition takeCondition = takeLock.newCondition();

    private Lock putLock = new ReentrantLock();

    private Condition putCondition = putLock.newCondition();


   /**
     * 入队,若队列满则等待
     *
     * @param e 入队元素
     */
    public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        Node<E> node = new Node<>(e);
        int c = -1;
        takeLock.lockInterruptibly(); //takeLock,添加元素的锁
        try {
            while (count.get() == capacity) {  //若队列满,阻塞以等待
                takeCondition.await();
            }
            enqueue(node);
            c = count.incrementAndGet();  //更新队列元素数
            if (c < capacity) {
                takeCondition.signal();  //若入队后发现还有空位,通知其他阻塞的入队线程(若有)
            }
        } finally {
            takeLock.unlock();
        }
        if (c == 1) {  //若入队前队列为空,则通知被阻塞的出队线程,现在可以出队了
            putLock.lockInterruptibly();
            try {
                putCondition.signal();
            } finally {
                putLock.unlock();
            }
        }
    }
    
    
     /**
     * 出队,若无元素一直等待
     *
     * @return 出队元素
     */
    public E take() throws InterruptedException {
        takeLock.lock();   //takeLock,移除元素的锁
        E e = null;
        int c = -1;
        try {
            while (count.get() == 0) { //队列为空,移除操作阻塞
                takeCondition.await();
            }
            e = dequeue();
            c = count.decrementAndGet(); //更新队列元素数
            if (c > 0) { //若出队后仍有元素,通知其他被阻塞的出队线程(若有)
                takeCondition.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity - 1) {  //若出队前队列已满,通知阻塞的入队线程,现在可以入队了
            putLock.lockInterruptibly();
            try {
                putCondition.signal();
            } finally {
                putLock.unlock();
            }
        }
        return e;
    }

参考资源

  1. java8 JDK ArrayBlockingQueue、LinkedBlockingQueue 源码
  2. Wait/Notify通知机制解析
  3. Java 实现生产者 – 消费者模型