JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

2023-06-20,,

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用。

1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程。BlockingQueue的核心方法有:

boolean add(E e) ,把 e 添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e),表示如果可能的话,将 e 加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e),把 e 添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit) ,取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() ,取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection<? super E> c) 和 int drainTo(Collection<?
super E> c, int maxElements)
,一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取 数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或
释放锁。

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。

2. BlockingQueue常用的四个实现类

ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2)
LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue
有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先
出)顺序排序的
3) PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4) SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

本文将从JDK源码层面分析对比ArrayBlockingQueue和LinkedBlockingQueue

3. ArrayBlockingQueue源码分析

ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部
是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
     这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:

    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
    }

ArrayBlockingQueue类中定义的变量有:

    /** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;
    /*
    * Concurrency control uses the classic two-condition algorithm
    * found in any textbook.
    */
    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

使
用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁
ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本
文主要讲解put()和take()操作,其他方法类似。

put(E
e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()
方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操
作。

    /**
    * Inserts the specified element at the tail of this queue, waiting
    * for space to become available if the queue is full.
    *
    */
    public void put(E e) throws InterruptedException {
    //不能存放 null  元素
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;   //数组队列
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
    try {
    //当队列满时,调用notFull.await()方法,使该线程阻塞。
    //直到take掉某个元素后,调用notFull.signal()方法激活该线程。
    while (count == items.length)
    notFull.await();
    } catch (InterruptedException ie) {
    notFull.signal(); // propagate to non-interrupted thread
    throw ie;
    }
    //把元素 e 插入到队尾
    insert(e);
    } finally {
    //解锁
    lock.unlock();
    }
    }

insert(E e) 方法如下:

    /**
    * Inserts element at current put position, advances, and signals.
    * Call only when holding lock.
    */
    private void insert(E x) {
    items[putIndex] = x;
    //下标加1或者等于0
    putIndex = inc(putIndex);
    ++count;  //计数加1
    //若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
    //若没有take()线程陷入阻塞,则该操作无意义。
    notEmpty.signal();
    }
    **
    * Circularly increment i.
    */
    final int inc(int i) {
    //此处可以看到使用了循环队列
    return (++i == items.length)? 0 : i;
    }

take()方法代码如下。take操作和put操作相反,故不作详细介绍。

    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  //加锁
    try {
    try {
    //当队列空时,调用notEmpty.await()方法,使该线程阻塞。
    //直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
    while (count == 0)
    notEmpty.await();
    } catch (InterruptedException ie) {
    notEmpty.signal(); // propagate to non-interrupted thread
    throw ie;
    }
    //取出队头元素
    E x = extract();
    return x;
    } finally {
    lock.unlock();  //解锁
    }
    }

extract() 方法如下:

    /**
    * Extracts element at current take position, advances, and signals.
    * Call only when holding lock.
    */
    private E extract() {
    final E[] items = this.items;
    E x = items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
    }

小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!
4. LinkedBlockingQueue 源码分析

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓
冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区
达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生
产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生
产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性
能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大
小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于
消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

LinkedBlockingQueue 类中定义的变量有:

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);
    /** Head of linked list */
    private transient Node<E> head;
    /** Tail of linked list */
    private transient Node<E> last;
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。

    /**
    * Inserts the specified element at the tail of this queue, waiting if
    * necessary for space to become available.
    */
    public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); //加 putLock 锁
    try {
    /*
    * Note that count is used in wait guard even though it is
    * not protected by lock. This works because count can
    * only decrease at this point (all other puts are shut
    * out by lock), and we (or some other waiting put) are
    * signalled if it ever changes from
    * capacity. Similarly for all other uses of count in
    * other wait guards.
    */
    //当队列满时,调用notFull.await()方法释放锁,陷入等待状态。
    //有两种情况会激活该线程
    //第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程
    //第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程
    while (count.get() == capacity) {
    notFull.await();
    }
    // 把元素 e 添加到队列中(队尾)
    enqueue(e);
    c = count.getAndIncrement();
    //发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)
    if (c + 1 < capacity)
    notFull.signal();
    } finally {
    putLock.unlock();
    }
    if (c == 0)
    //队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程
    signalNotEmpty();
    }

enqueue(E e)方法如下:

    /**
    * Creates a node and links it at end of queue.
    * @param x the item
    */
    private void enqueue(E x) {
    // assert putLock.isHeldByCurrentThread();
    last = last.next = new Node<E>(x);
    }

take()方法代码如下。take操作和put操作相反,故不作详细介绍。

    public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    while (count.get() == 0) {
    notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull();
    return x;
    }

dequeue()方法如下:

    /**
    * Removes a node from head of queue.
    * @return the node
    */
    private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
    }

小结:take和put操作各有一把锁,可并行读取。

参考地址:

1). Java多线程-工具篇-BlockingQueue:http://blog.csdn.net/xiaoliang_xie/article/details/6887115

2). Java多线程(五)之BlockingQueue深入分析:http://blog.csdn.net/vernonzheng/article/details/8247564

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue的相关教程结束。

《JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue.doc》

下载本文的Word格式文档,以方便收藏与打印。