锁源码解析

AbstractQueuedSynchronizer

中文翻译叫做同步器,简称 AQS,是各种各样锁的基础,比如说 ReentrantLock、CountDownLatch 等等

整体架构

20202251401

  • 提供了一种框架,自定义了先进先出的同步队列,让获取不到锁的线程能进入同步队列中排队
  • 同步器有个状态字段,子类需要实现一些方法,通过判断状态字段来判断能否得到锁

属性

// 这个状态用来判断是否可以获得锁,每次获得锁时+1,释放锁-1
// 当子类继承AQS来实现锁时,要根据这个状态判断能否获得锁(为0才能获得)跟释放锁(为1才能释放)
private volatile int state;

// 同步队列的头与尾,底层是一个双向链表,用来阻塞获取不到锁的线程,并在适当时机释放这些线程
private transient volatile Node head;
private transient volatile Node tail;

// 条件队列,管理获取不到锁的线程,但条件队列不直接和锁打交道,但常常和锁配合使用
public class ConditionObject implements Condition, java.io.Serializable {
    // Condition 可以用来代替 Object 中相应的监控方法
    // 它提供了一种线程协作方式,并且都有明确语义

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

static final class Node {
    ...
    volatile int waitStatus;
    // 在共享锁中用来表示下一个等待线程,排它锁则用来表示当前节点是共享还是排它模式
    Node nextWaiter;
}

获取锁

  • 排它锁
// 排它模式下,尝试获得锁
public final void acquire(int arg) {
    // tryAcquire让子类实现的,思路一般是根据state的值决定是否能获取到锁
    if (!tryAcquire(arg) &&
        // 如果获取不到就调用addWaiter让线程进入同步队列,然后acquireQueued这个方法代表进入之后会阻塞,直到被唤醒获得锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 获取锁失败,打断自身
        selfInterrupt();
}
// 追加到同步队列的队尾
private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        // 如果队尾不为空,就将node插入到队尾
        if (oldTail != null) {
            // 将原来队尾的node设置为新加入node的prev
            node.setPrevRelaxed(oldTail);
            // 原子方式将node设置为队尾
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        // 队尾为空,需要初始化同步队列
        } else {
            initializeSyncQueue();
        }
    }
}
// 阻塞当前线程
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果这个当前线程节点的前置节点是头节点,并且自己已经能获得锁了
            if (p == head && tryAcquire(arg)) {
                // 将自己设置为头节点
                setHead(node);
                p.next = null; // help GC
                // 然后返回
                return interrupted;
            }
            // 前一个节点状态为SIGNAL了,那么就阻塞自己(park)
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
  • 共享锁
public final void acquireShared(int arg) {
    // 同样由子类实现
    if (tryAcquireShared(arg) < 0)
        // 里面的这个方法主要做的是不断自旋直到获取到锁,当获取到锁之后,会通知排在它后面的节点
        doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 当前节点的前置节点如果是头节点
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                // 获取锁成功
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

释放锁

  • 排它锁
public final boolean release(int arg) {
    // 同样留给子类实现判断是否能释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 后面有一些等待唤醒的节点
        if (h != null && h.waitStatus != 0)
            // 从头开始唤醒等待锁的节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 共享锁
public final boolean releaseShared(int arg) {
    // 基本跟上面一样
    if (tryReleaseShared(arg)) {
        // 唤醒后面的线程
        doReleaseShared();
        return true;
    }
    return false;
}

条件队列

  • 入队列等待 await

获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列

  • 单个唤醒 signal

之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线程

ReentrantLock

  • 语义同 synchronized 锁,可重入互斥锁
  • 构造器接受 fairness 的参数,fairness 是 ture 时,保证获得锁时的顺序,false 不保证

类层次结构

2020225152237

同步器

  • 非公平地获取锁
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 代表锁还没被获取
    if (c == 0) {
        // 设置状态标记获取锁
        if (compareAndSetState(0, acquires)) {
            // 标记获取锁的线程是当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 锁已经被获取了,并且获取锁的线程是当前线程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 设置锁的状态+1
        setState(nextc);
        return true;
    }
    // 加入等待队列
    return false;
}
  • 尝试释放锁
protected final boolean tryRelease(int releases) {
    // 释放锁后线程持有的锁数
    int c = getState() - releases;
    // 当前的线程没有持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 锁释放完了
    if (c == 0) {
        // 可以让其他线程获取锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 如果锁没有释放完,设置state为当前线程持有的锁数
    setState(c);
    return free;
}

FairSync公平锁

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 锁没有被持有
    if (c == 0) {
        // 如果当前线程处于同步队列的头节点,则获取锁成功,否则等待
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 锁被当前线程持有,重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 锁被其他线程持有,等待
    return false;
}

NonfairSync非公平锁

这里的非公平锁tryAcquire实现就是上面同步器sync中的实现

CountDownLatch

其最大的作用不是为了加锁,而是通过计数达到等待的功能,主要有两种形式的等待:

  • 让一组线程在全部启动完成之后,再一起执行
  • 主线程等待另外一组线程都执行完成之后,再继续执行

await

// await方法的实现是获取共享锁,如果获得后就返回,否则就等待
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// 这里sync判断能否获得锁的标志是state是否=0
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

countDown

// countDown的实现就是释放一个锁
public void countDown() {
    sync.releaseShared(1);
}
// sync判断能否释放锁的标志是 释放这次锁之后,锁的个数为0
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        // 已经没有锁了
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

问题

对 AQS 的理解

AQS 是一个锁框架,它定义了锁的实现机制,并开放出扩展的地方,让子类去实现,比如我们在 lock 的时候,AQS 开放出 state 字段,让子类可以根据 state 字段来决定是否能够获得锁,对于获取不到锁的线程 AQS 会自动进行管理,无需子类锁关心

使用场景

synchronized

与ReentrantLock :可重入+排它锁

但是有几点不同:ReentrantLock功能更加丰富,ReentrantLock有公平锁和非公平锁之分synchronized都是非公平锁,synchronized使用起来更加简单

  • 共享资源初始化

对于一些诸如配置信息等的共享资源,我们希望在项目初始化后加载,但是不希望它被重复加载

public class Service{
    public void init(){
        // 双重判断
        if (loaded){
            synchronized(this){
                if (loaded){
                    // load resources
                }

            }
        }
    }
}

CountDownLatch

  • 批量任务

我们有时会使用线程来并发地进行某些操作,等这些操作全部完成之后,再进行下一步操作

int n = 10;
CountDownLatch latch = new CountDownLatch(10);
for (int i =0;i<n;i++){
    executorService.submit(()->{
        // task i
        latch.countDown();
    })
}
latch.await();
// 后续操作

results matching " "

No results matching " "

results matching " "

No results matching " "