队列源码解析

设计思想

202002231406

  • 数据结构
  • 入队出队方式
  • 通信机制
    • 强关联:take与put要互相等待
    • 无关联:只要队列容器不满,生产者就能放成功,生产者就可以直接返回,和有无消费者一点关系都没有,生产者和消费者完全解耦

LinkedBlockingQueue

类结构层次

202002220959

架构

  • 使用链表来维护先进先出队列
  • 分成三个部分:链表存储 + 锁 + 迭代器

操作

  • 初始化
// 直接根据大小初始化
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 将链表头尾节点置为同一空节点
    last = head = new Node<E>(null);
}
// 根据给定集合初始化
public LinkedBlockingQueue(Collection<? extends E> c) {
    // 初始化近乎无限的队列
    this(Integer.MAX_VALUE);
    // put锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        // 通过循环以此对集合内的元素入列
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        // 维护队列状态信息
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
  • 阻塞新增
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final int c;
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 可中断锁
    putLock.lockInterruptibly();
    try {
        // 当前队列满,等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 等待结束(此时是队列从满变为没满,被唤醒),入队
        enqueue(node);
        // 获得上一刻队列大小
        c = count.getAndIncrement();
        // 如果当前队列大小仍然小于最大容量,唤醒一个put的等待线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 此时,队列实际大小为1,唤醒一个等待put的线程
    if (c == 0)
        signalNotEmpty();
}
  • 阻塞删除
public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 当队列为空时,进行等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 被唤醒,代表队列有数据了,出队
        x = dequeue();
        // 上一刻的队列大小
        c = count.getAndDecrement();
        // 代表队列还有数据,再唤醒一个等待take的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 队列还剩一个空位,唤醒一个等待put的线程
    if (c == capacity)
        signalNotFull();
    return x;
}

使用场景

适合对生产的数据大小不定(时高时低),数据量较大的场景

SynchronousQueue

其本身是没有容量大小,比如我放一个数据到队列中,我是不能够立马返回的,我必须等待别人把我放进去的数据消费掉了,才能够返回

架构

// 堆栈和队列共同的接口
// 负责执行 put or take
abstract static class Transferer<E> {
    // e 为空的,会直接返回特殊值,不为空会传递给消费者
    // timed 为 true,说明会有超时时间
    abstract E transfer(E e, boolean timed, long nanos);
}
// 堆栈 后入先出 非公平
// Scherer-Scott 算法
static final class TransferStack<E> extends Transferer<E> {}

// 队列 先入先出 公平
static final class TransferQueue<E> extends Transferer<E> {}
  • transfer

该方法比较复杂,总而言之,如果传进来的e是null,并且当前有一个put线程阻塞,则会返回这个put的e,并且put线程解除阻塞。否则就一直阻塞到有数据为止

反之,如果传进来的e不是null,并且有一个take线程阻塞,则将e通过节点传给take线程

DelayQueue

DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

操作

  • put
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 利用优先队列进行排序
        q.offer(e);
        // 如果刚放进去的元素在队头
        if (q.peek() == e) {
            leader = null;
            // 则会唤醒在等待可用元素的线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
  • take
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取队头数据
            E first = q.peek();
            // 队头没数据,进行等待
            if (first == null)
                available.await();
            else {
                // 获取队头数据的过期时间
                long delay = first.getDelay(NANOSECONDS);
                // 以及过期了
                if (delay <= 0L)
                    // 直接返回
                    return q.poll();
                first = null; // don't retain ref while waiting
                // leader不为null,代表当前元素以及被设置阻塞时间了
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    // 对当前线程阻塞队头元素的阻塞时间
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

使用场景

用于任务不想立马执行,想等待一段时间才执行的场景

ArrayBlockingQueue

这个队列一个重要的特点是有界的阻塞数组,容量一旦创建,后续大小无法修改

操作

  • 初始化
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    //第二个参数是否公平,主要用于读写锁是否公平,如果是公平锁,那么在锁竞争时,就会按照先来先到的顺序,如果是非公平锁,锁竞争时随机的
    lock = new ReentrantLock(fair);
    // 在put成功之后通知其他等待take的线程
    notEmpty = lock.newCondition();
    // 在take成功之后通知其他等待put的线程
    notFull =  lock.newCondition();
}
  • put
public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 当队列满时,进行等待
        while (count == items.length)
            notFull.await();
        // 被唤醒,入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E e) {
    final Object[] items = this.items;
    // 从这里可以看出,putIndex就是当入队的时候,元素放置的位置
    items[putIndex] = e;
    // 当下次的putInex超过数组大小时,则下次放置的位置就是0,也就说队头
    if (++putIndex == items.length) putIndex = 0;
    count++;
    // 唤醒等待take的线程
    notEmpty.signal();
}
  • take
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 队列为空,进行等待
        while (count == 0)
            notEmpty.await();
        // 被唤醒,出队
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // // 从这里可以看出,takeIndex就是当出队的时候,元素的位置
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    // 同样,当下一次takeIndex超过数组容量时,就从头开始
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒等待put的线程
    notFull.signal();
    return e;
}
  • remove
void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    // 删除的位置等于下一次take的位置
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        // takeIndex往后移动一位
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // 删除的位置非takeIndex
        // 从删除的位置开始遍历
        for (int i = removeIndex, putIndex = this.putIndex;;) {
            int pred = i;
            if (++i == items.length) i = 0;
            // 如果遍历到putIndex的位置,删除这个位置的元素
            if (i == putIndex) {
                items[pred] = null;
                this.putIndex = pred;
                break;
            }
            // 将removeIndex后的元素全部往前移动一位
            items[pred] = items[i];
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    // 通知等待put的线程
    notFull.signal();
}

使用场景

一般用于生产数据固定的场景

问题

对队列的理解

首先队列本身也是个容器,底层也会有不同的数据结构,列把生产数据的一方和消费数据的一方进行解耦,生产者只管生产,消费者只管消费,队列还可以对消费者和生产者进行管理,当队列满时或者空时,会阻塞住生产者或者消费者

队列和集合的区别

队列(部分例外)和集合都提供了数据存储的功能,底层的储存数据结构是有些相似的

但两者为了完成不同的事情,提供的 API 和其底层的操作实现是不同的, 队列提供了阻塞的功能,解耦了生产者和消费者

哪些队列具有阻塞的功能,大概是如何阻塞的

LinkedBlockingQueue 链表阻塞队列和 ArrayBlockingQueue 数组阻塞队列是一类,两个阻塞队列都可以指定容量大小,当队列满时,如果有线程 put 数据,线程会阻塞住,直到有其他线程进行消费数据后,才会唤醒阻塞线程继续 put,当队列空时,如果有线程 take 数据,线程会阻塞到队列不空时,继续 take

SynchronousQueue 同步队列,当线程 put 时,必须有对应线程把数据消费掉,put 线程才能返回,当线程 take 时,需要有对应线程进行 put 数据时,take 才能返回

底层是如何实现阻塞的

利用 Condition 的等待唤醒机制

经常使用队列的 put、take 方法有什么危害,如何避免

两个方法都是无限(永远、没有超时时间的意思)阻塞的方法

使用 offer 和 poll 方法来代替两者,可以设置超时阻塞时间

队列在JDK中的其他运用

线程池

202002231353

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

参考JAVA编程规范中的不要用Executors创建线程池,而要手动创建

同步队列

202002231404

results matching " "

No results matching " "

results matching " "

No results matching " "