并发集合

CopyOnWriteArrayList

通过锁 + 数组拷贝 + volatile 关键字保证了线程安全,每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去

架构

在对数组进行操作的时候,基本会分四步走:

  • 加锁;
  • 从原数组中拷贝出新数组;
  • 在新数组上进行操作,并把新数组赋值给数组容器;
  • 解锁。

操作

  • 新增
public boolean add(E e) {
    synchronized (lock) {
        // 获取真实数组
        Object[] es = getArray();
        int len = es.length;
        // 根据真实数组复制出一个数组
        es = Arrays.copyOf(es, len + 1);
        // 在新数组上做操作
        es[len] = e;
        // 把新数组作为真实数组
        setArray(es);
        return true;
    }
}

都已经加锁了,为什么需要拷贝数组?

  • volatile 关键字修饰的是数组,如果我们简单的在原来数组上修改其中某几个元素的值,是无法触发可见性的,我们必须通过修改数组的内存地址才行
  • 第二个原因是get并没有加锁,通过复制一个新数组进行操作,可以防止get访问到一些数据的中间组合

  • 在指定位置新增

public void add(int index, E element) {
    synchronized (lock) {
        Object[] es = getArray();
        int len = es.length;
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException(outOfBounds(index, len));
        Object[] newElements;
        // numMoved指定是后半部分数组的长度
        int numMoved = len - index;
        // 如果删除的是最后一个,直接创建一个比原来数组小1的数组复制数据
        if (numMoved == 0)
            newElements = Arrays.copyOf(es, len + 1);
        else {
            // 这里会根据计算,通过两次复制分别将原数组的前半部分以及后半部分复制到新数组里
            newElements = new Object[len + 1];
            System.arraycopy(es, 0, newElements, 0, index);
            System.arraycopy(es, index, newElements, index + 1,
                             numMoved);
        }
        newElements[index] = element;
        setArray(newElements);
    }
}
  • 删除
public E remove(int index) {
    synchronized (lock) {
        Object[] es = getArray();
        int len = es.length;
        E oldValue = elementAt(es, index);
        int numMoved = len - index - 1;
        Object[] newElements;
        if (numMoved == 0)
            newElements = Arrays.copyOf(es, len - 1);
        else {
            // 复制前半部分数组同上
            newElements = new Object[len - 1];
            System.arraycopy(es, 0, newElements, 0, index);
            // 复制后半部分数组需要位置向左偏移1个单位
            System.arraycopy(es, index + 1, newElements, index,
                             numMoved);
        }
        setArray(newElements);
        return oldValue;
    }
}
  • indexOf
private static int indexOfRange(Object o, Object[] es, int from, int to) {
    if (o == null) {
        // 返回第一个为null的下标
        for (int i = from; i < to; i++)
            if (es[i] == null)
                return i;
    } else {
        // 返回equals的对象
        for (int i = from; i < to; i++)
            if (o.equals(es[i]))
                return i;
    }
    return -1;
}
  • 迭代

由于CopyOnWriteArrayList的迭代器需要持有一个数组引用,所以即使在迭代的过程中对CopyOnWriteArrayList进行修改也不会抛异常

202002211513

ConcurrentHashMap

操作

  • put
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // table为空,初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 当前索引位置为空,直接存放到这里
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                   // no lock when adding to empty bin
        }
        // 当前节点是在扩容,等待扩容完成
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent // check first node without acquiring lock
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        else {
            V oldVal = null;
            // 锁住当前节点
            synchronized (f) {
                // 其他的跟HashMap差不多
                if (tabAt(tab, i) == f) {
                    // 链表添加
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // 红黑树添加
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            // 新增成功
            if (binCount != 0) {
                // 是否需要转为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

数组初始化,如何保证只有一个线程初始化并且能初始化成功?

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 自旋
    while ((tab = table) == null || tab.length == 0) {
        // 代表当前有线程在初始化,让线程调度器重新调度线程
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 保证只有一个线程能初始化
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                // 双重检查
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

新增槽点,通过自旋死循环保证一定可以新增成功。

如果当前槽点为空,通过CAS新增

槽点如果有值,会锁住当前槽点

红黑树旋转时,锁住红黑树的根节点,保证同一时刻,当前红黑树只能被一个线程旋转

  • 扩容

  • 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝;

  • 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时,把原数组槽点赋值为转移节点;
  • 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在扩容完成之前,该槽点对应的数据是不会发生变化的;
  • 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点;
  • 直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成

  • get

get跟HashMap很像

应用

流程引擎

其主要功能就是对我们需要完成的事情,进行编排和组装

通过使用List与Map来定义事件的标志与流程,这样就可以通过一个标识来获取事件的行为,可以直接调用事件行为来产生效果

results matching " "

No results matching " "

results matching " "

No results matching " "