Nivelle 开拓视野冲破艰险看见世界 身临其境贴近彼此感受生活

常见数据结构(7)--队列

2017-12-02

队列

阻塞队列与非阻塞队列最大区别是:阻塞队列能够阻塞当前试图从队列中获取元素的线程,而非阻塞队列则不会.因此在面对类似生产者-消费者的模型时,使用非阻塞队列必须额外实现同步策略以及唤醒策略.但是有了阻塞队列,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动被唤醒.

jdk中的阻塞队列概况

  • ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockQueue对象时必须定制容量大小.并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的线程最优先能够访问队列.
  • LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE;
  • PriorityBlockingQueue:无界阻塞队列,它会按照元素的优先级对元素排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素.
  • DelayQueue:基于PriorityQueue实现的延迟队列,是一个无界的阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.因此向队列中插入时永远不会阻塞,获取时才可能不阻塞.
  • SynchronousQueue:同步阻塞队列,队列大小为1,一个元素要放到该队列中必须有一个线程在等待获取元素.
  • DelayedWorkQueue:该队列为ScheduledThreadPoolExecutor中的静态内部类,ScheduledThreadPoolExecutor便是便是通过该队列使得队列中的元素按一定顺序排列从而延迟任务和周期性任务得以顺利执行.
  • BlockingDeque:双向阻塞队列的接口.
  • TransferQueue:接口.定义了另一种阻塞情况:生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费,而BlockingQueue只需将元素添加到队列中后生产者便会停止被阻塞。

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
}


put方法实现:

 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

 private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
 
 

JDK1.8 ,首先获取锁,判断当前队列是否满,满则阻塞,被其他线程唤醒时插入元素,插入成功后通过notEmpty.signal()唤醒因队列为空等待取元素的线程.最后释放锁.

 
  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
 
 

依然是先获取锁,若队列为空则等待,被其他线程唤醒后调用dequeue方法获取元素,获得成功后通过notFull.signal()唤醒因队列满无法放元素的线程.

#### LinkedBlockingQueue

LinkedBlockingQueue使用node结构来存储数据结构:

 static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
 
 
 

#### PriorityBlockingQueue

该队列为无界队列并且会按照元素的优先级对元素进行排序:

 
 /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.
     */
    private transient volatile int allocationSpinLock;

    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */
    private PriorityQueue<E> q;
 
 
 
 

四种构造函数:

 
 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }


    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
                                 ...

    }

    public PriorityBlockingQueue(Collection<? extends E> c) {
    ...

    }
 
 
 
 

#### put()和take()方法

 public void put(E e) {
        offer(e); // never need to block
    }

   public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
 
 
 

put委托给了offer实现,首先是获取锁,判断队列是否还能放入元素,不能则进行扩容:

 private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
 
 
 

扩容时进行了同步处理,在数组小于64时每次扩容2倍,大于64时每次扩容1.5倍.扩容完成后如果传入comparator则调用siftUpConparable插入元素:

 private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
 
 
 

#### take方法

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
 
 

若队列为空则等待,将获取元素委托给了dequeue(),看看它的实现:

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
 
 

 

若队列为空则等待,将获取元素委托给了dequeue(),看看它的实现:

   private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
 
 

返回数组头部即堆顶的元素并对堆进行调整实现siftDownComparable方法:

 
   private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
 


下一篇 随记

评论