AQS与Condition

jdk的锁实现

Posted by ALID on March 23, 2020

AQS与Condition实现了并发包的核心, 他们实现了Java自己的管程,其中 AQS 用于解决互斥问题,Condition 用于解决同步问题。

jdk为什么要实现lock, synchronized不香吗?

首先提出一个问题, 1.6 版本之后synchronized性能其实已经很好了, 为什么要重复造轮子呢?其实很简单有些地方sunchronized做的并不好, 比如加锁之后不能超时等. 所以Lock增强了以下几点.

  1. 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

这三种方案可以全面弥补 synchronized 的问题。到这里相信你应该也能理解了,这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:

1
2
3
4
5
6
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

AQS

AbstractQueuedSynchronizer 队列式的同步器, 存储了线程以及同步状态.

img 其实结构很简单, 其中使用链表实现了一个队列. 每次队首节点执行完后唤醒下一个节点, 新加入的节点会尝试一下获取锁, 如果没有获取到就设置到队尾.

基础变量

结构其实很简单由一个status表示状态, 以及一个队列来完成线程资源的排队工作, 主要有以下成员变量.

1
2
3
4
private volatile int state; // 同步状态
private transient volatile Node head;
private transient volatile Node tail;
static final long spinForTimeoutThreshold = 1000L; // 自旋锁超时阀值

Node就是一个双向链表的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class Node {
    static final Node SHARED = new Node();//标识等待节点处于共享模式
    static final Node EXCLUSIVE = null;//标识等待节点处于独占模式

    static final int CANCELLED =  1; // 由于超时或中断,节点已被取消
    static final int SIGNAL    = -1; // 表示下一个节点是通过park堵塞的,需要通过unpark唤醒, 后继结点入队时,会将前继结点的状态更新为SIGNAL。
    Static final int CONDITION = -2; // 表示线程在等待在condition上
    static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

    // 等待状态:对于condition节点,初始化为CONDITION;
    // 其它情况,默认为0,通过CAS操作原子更新
    volatile int waitStatus;
    // 前节点
    volatile Node prev;
    // 后节点
    volatile Node next;
    // 线程对象
    volatile Thread thread;
    // 对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式;
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;//判断是否共享模式
    }
    //获取前节点,如果为null,抛出异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     //addWaiter方法使用
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { //Condition使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

以上代码中我们可以看到, 其中记录了节点状态/前后指针/线程对象. 重要信息都标记在注释上了.

acquire 加锁方法

独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
	   // 尝试获取资源, 获取成功直接返回
    if (!tryAcquire(arg) &&
			// 没有获取到资源加入等待队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
			// acquireQueued返回true,表示发生过中断,因此通过selfInterrupt中断当前线程
        selfInterrupt();
}
  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire

1
2
3
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现, AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了. 至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了. 当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
	  // 这里调用了Node的方法创建节点
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试一次直接放到队尾
    Node pred = tail;
    if (pred != null) {
			// 现在设置前指针
        node.prev = pred;
			// 用CAS将新节点加入链表
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
	   // 如果添加失败, 这里自旋添加
    enq(node);
    return node;
}
  1. 首先 new Node() 传入的是 Node.EXCLUSIVE 代表独占锁.
  2. 之后直接尝试一次添加节点, 这里可以看到是非公平锁
  3. 调用 enq 方法自旋保证添加节点成功

以下就是 enq 方法的内容, 标准的自旋+CAS操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

最后就是acquireQueued方法, 这里线程已经入队, 接下来就需要让其等待直到让出资源.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            // 如果前驱是head,即该结点已成是第一个了,那么便有资格去尝试获取资源
    			// 可能是之前的线程释放完资源唤醒自己的,当然也可能被interrupt了
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                failed = false; // 成功获取资源
                return interrupted;//返回等待过程中是否被中断过
            }
            
            //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
            cancelAcquire(node);
    }
}

interrupt操作会将线程的中断标志位置位,至于线程作何动作那要看线程了。

如果线程sleep()、wait()、join()等处于阻塞状态,那么线程会定时检查中断状态位如果发现中断状态位为true,则会在这些阻塞方法调用处抛出InterruptedException异常,并且在抛出异常后立即将线程的中断状态位清除,即重新设置为false。抛出异常是为了线程从阻塞状态醒过来,并在结束线程前让程序员有足够的时间来处理中断请求。

如果线程正在运行、争用synchronized、lock()等,那么是不可中断的,他们会忽略。

先来看一下其中几个方法的具体内容

shouldParkAfterFailedAcquire

此方法主要用于检查状态,看看自己是否真的需要将线程 waiting

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点的状态已经是在执行后会唤醒下一个节点的状态了
        return true;
    if (ws > 0) {
        // 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
 			// 那些放弃的结点,这里替换了他们的引用, 之后会被GC回收
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前驱正常,那就把前驱的状态设置成SIGNAL
			// CAS操作有失败的可能
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个方法执行后不一定能设置好前一个节点的状态, 但是外部acquireQueued方法是自旋的, 这里失败了还会进来重试. 而如果这里的校验发现前驱节点已经放弃, 也会回到自旋中尝试拿锁.

1
2
3
4
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞线程
    return Thread.interrupted(); // 查看是否是被中断的
}

这个方法就很简单了, 阻塞并在被唤醒的时候看自己是正常唤醒还是被中断的. 这里的interrupted方法会检查当前线程状态是否被中断, 如果是则返回true并清空清除中断状态.

可以看到AQS对线程的中断并不是立刻响应的, 而是在被唤醒的时候才会检查其是否被中断. 并清除线程中的中断状态, 自己记录. 之后在最外层会调用 Thread.currentThread().interrupt() 来中断线程.

但是在带有超时时间的AQS方法中, 就不会记录中断状态之后再补上了. 而是如果发现被中断直接抛出异常. lock方法是不提供超时时间的, 如果想使用超时, 可以使用带有超时时间的 tryAcquireSharedNanos 方法. 如下是其 doAcquireSharedNanos 方法的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
        return false;
	   // 设置阻塞超时时间 并阻塞
    if (shouldParkAfterFailedAcquire(p, node) &&
        nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
	   // 如果发现线程被中断, 则直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
}

acquireQueued 方法尝试获取资源, 如果不能获取资源会尝试在前一个节点上注册唤醒后续节点的状态. 并且在次之前还会尝试检查一下前驱节点的状态. 最后如果修改前驱节点状态成功则会被阻塞.而这里被唤醒的时候会区分是正常唤醒还是被中断的.

cancelAcquire

如果没有能获取资源, 则会取消掉该节点, 根据情况觉得是否唤醒后续线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;
   //获取node的前向节点
    Node pred = node.prev;
   //如果发现前向节点状态为CANCELLED,则继续向前找,直到找到状态正常的节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;//节点状态设为CANCELLED
    //如果node为tail节点,则将pred更新为tail节点
    if (node == tail && compareAndSetTail(node, pred)) {
       //由于pred为新的尾节点,因此将其next设为null
        compareAndSetNext(pred, predNext, null);
    } else {//如果node不是尾节点
        int ws;
        //当满足下面三个条件,将pred的next指向node的下一节点:
        //1.pred不是head节点:如果pred为头节点,而node又被cancel,则node.next为等待队列中的第一个节点,需要unpark唤醒
        //2.pred节点状态为SIGNAL或能更新为SIGNAL
        //3.pred的thread变量不能为null
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            //更新pred的next,指向node的下一节点
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);//如果pred为头节点,则唤醒node的后节点
        }
        node.next = node; // help GC
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
   
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
   //如果节点为空或者被取消了,则从队列尾部开始查找,找到离node最近的非null且状态正常的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //取出找到节点的线程对象,通过unpark,颁发许可;
    if (s != null)
        LockSupport.unpark(s.thread);
}

release 解锁方法

这里就是独占加锁对应的解锁方法

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了, 所以自定义同步器在设计tryRelease()的时候要明确这一点

1
2
3
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

tryRelease方法和刚刚的tryAcquire方法一样也是没有默认实现的. 正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。

unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {// 如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)// 从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这里逻辑很简单, 用unpark()唤醒等待队列中最前边的那个未放弃线程.

自己实现一个锁

到这里已经看完了独占模式的实现, 既然AQS是锁的组件, 那我们是不是可以使用刚刚看过的方法自己实现一个锁呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class MyLock extends AbstractQueuedSynchronizer {

    public void lock() {
        acquire(1);
    }

    public boolean tryLock() {
        return tryAcquire(1);
    }

    public void unlock() {
        release(0);
    }

    public boolean isLocked() {
        return isHeldExclusively();
    }

    @Override
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    @Override
    protected boolean tryAcquire(int uesd) {
        // 就是用cas操作将状态设置为1
        return compareAndSetState(0, uesd);
    }

    @Override
    protected boolean tryRelease(int unUsed) {
        // 修改状态
        setState(unUsed);
        return true;
    }
}

可以看到其实很简单, 继承了 AbstractQueuedSynchronizer 类, 并实现了 tryAcquire 方法. 之后只要调用我们刚刚分析过的 acquire 方法就可以加锁. tryAcquire 的实现就是用了CAS的方法将状态改为我们希望的状态.

而对应的解锁方法也是调用了 release 方法, 并实现了 tryRelease 方法. 刚刚也提到了解锁并不需要考虑线程安全, 只要修改状态后返回 true 即可.

这里要注意一点, 如果release方法异常无法唤醒后续节点, 会让后续节点一直阻塞. 而正常情况是不会被中断的, 除非tryRelease方法的实现有bug. 这样就好导致后续的节点无限阻塞了.

以上独占锁的部分就结束了, 之后来分析独占锁.

acquireShared 共享加锁

1
2
3
4
public final void acquireShared(long arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

有之前的铺垫再看这个方法就很简单了, tryAcquireShared 方法是需要自己实现的. 继续看一下 doAcquireShared 方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void doAcquireShared(int arg) {
		// 注意这里创建的是共享模式的 Node.SHARED
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
				 // 前驱节点
            final Node p = node.predecessor();
				// 如果前驱就是头指针, 当前节点已经再队首了
            if (p == head) {
  				 // 尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
						  // 将head指向自己,还有剩余资源可以再唤醒之后的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
						  // 如果等待过程中被打断过,此时将中断补上
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
  			 // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里其他方法都已经看过了, 来看一下 setHeadAndPropagate 方法.

1
2
3
4
5
6
7
8
9
10
11
private void setHeadAndPropagate(Node node, long propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
	   // 如果还有剩余量,继续唤醒下一个线程	
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

这里唯一的区别其实就是会尝试获取资源, 而不是能不能加锁. 如果有多余资源会唤醒后续节点.

releaseShared 共享解锁

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(long arg) {
		// 尝试释放资源 
    if (tryReleaseShared(arg)) {
			// 唤醒后继结点
        doReleaseShared();
        return true;
    }
    return false;
}

独占模式下的tryRelease()在完全释放掉资源(state=0)后, 才会返回true去唤醒其他线程. 而这里并不需要释放全部资源后才能继续执行, 而是拿到一部分资源就可以调用后续节点了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);// 唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}

这里每次唤醒下一个节点, 下一个节点运行的时候会再次获取剩余资源, 只要有资源就会一直唤醒

condition

我们已经可以通过AQS实现锁, 但是对于Synchronized就会发现还缺少了阻塞和唤醒线程的方法. 那为什么不能用Object的wait/notify方法呢? 因为这个方法是依赖于Synchronized的, 只能在Synchronized锁的范围内使用. 那为什么不直接用 LockSupport 呢? 因为一次阻塞的线程不止一个, 阻塞的线程我们需要用线程安全的方式存下来, 并且可以提供唤醒单个和唤醒全部阻塞线程的方法. Contidion 接口提供了和Lock配和的方法.

实现分析

ContidionObject是AQS的内部类, Contidion操作需要关联锁, 并且也和AQS一样需要一个队列来保存阻塞的线程. 每个condition都包含一个等待队列. 而队列的Node就是AQS中的Node.

await

await 方法, 会将线程和AQS中阻塞一样在队列尾部插入节点. 和AQS不一样的是, 这里插入节点并不需要CAS, 因为调用condition的线程肯定是拿到锁的. 而一个AQS可以对于多个condition, 每个condition都维护一个等待队列. 如下图: img

如果从AQS的角度看, 其实是将Node从AQS的队列头移动到了condition的队列尾部. 在加入之后就会释放锁. 并唤醒AQS上之后的等待线程执行. 这里的队首节点其实不会直接移动, 而是通过addConditionWaiter方法构建成一个新节点再加入contidion的等待队列中. img

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	   // 当前线程加入condition的等待队列
    Node node = addConditionWaiter();
		// 释放锁, 并唤醒AQS队列下一个线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
			// 阻塞线程
        LockSupport.park(this);	
			// 判断是否是被中断了
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
		// 线程醒来会尝试加锁
		// THROW_IE 表示这里尝试失败, 但signal调用enq方法成功入队
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

这里就是将节点加入condition队列, 并释放锁(移出AQS队列), 开始阻塞. 唤醒后会尝试加锁, 如果失败可能是signal方法加锁成功了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
			// 清理失效节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
		// 包装了一个新节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

这里可以看到是创建了一个新节点. 没有直接用之前的节点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取 state 变量
        int savedState = getState();
        // 如果释放成功, 则返回 state
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 如果释放失败,抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        //释放失败
        if (failed)
            // 将这个节点是指成取消状态.随后将从队列中移除.
            node.waitStatus = Node.CANCELLED;
    }
}

这里可以看到就是调用了 release 方法释放锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        // 如果将 node 放入 AQS 队列失败,就返回 REINTERRUPT, 成功则返回 THROW_IE
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 将 node 的状态设置成 0 成功后,将这个 node 放进 AQS 队列中.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    // 如果 CAS 失败, 返回 false
    // 当 node 不在 AQS 节点上, 就自旋
    // 自旋是在等待在 signal 中执行 enq 方法让 node 入队.
    while (!isOnSyncQueue(node))
			// 每次自旋都让出持有的时间片
        Thread.yield();
    return false;
}

这里可以看到尝试调用一次enq方法, 如果失败则等待 signal 方法中调用成功.

调用yield方法会让当前线程交出CPU权限,让CPU去执行其他的线程。它跟sleep方法类似,同样不会释放锁。但是yield不能控制具体的交出CPU的时间,另外,yield方法只能让拥有相同优先级的线程有获取CPU执行时间的机会。
注意,调用yield方法并不会让线程进入阻塞状态,而是让线程重回就绪状态,它只需要等待重新获取CPU执行时间,这一点是和sleep方法不一样的。

signal

signal 解锁方法也要求调用的线程必须已经获得了锁, 会进行检查. 接着获取等待队列的头节点, 将其移动到 AQS 的队列并通过 LockSupport 唤醒线程. 被唤醒后会从 await 方法中的while循环退出, 进而加入获取锁的竞争中. 获取锁之后就可以继续执行之前调用 await 方法后的代码.

signalAll 方法就是对于每个节点执行 signal 方法, 效果就是将等待队列中的所有节点移到的 AQS 的同步队列中, 并唤醒每个节点的线程. img

源码如下:

1
2
3
4
5
6
7
8
public final void signal() {
	   // 先校验是否获取了锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doSignal(Node first) {
    do {
			// 修改订单队列头指针
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
			// 转移节点
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
	  // 调用enq方法将node放到安全位置, 也就是修改前一个节点的状态成功
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
			// 唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}