JDK源码分析——AbstractQueuedSynchronizer类

AbstractQueudSynchronizer是Doug Lea Java并发编程的基础,简称AQS。内部主要包括Node和ConditionObject两个内部类,基于Node节点构建了一个FIFO(先进先出)队列,用来存储等待锁的线程的队列。基于ConditionObject节点也构造了一个FIFO队列,用于存储因为某种原因已经获取到锁而又主动释放锁的线程的队列。在concurrent包下面的大部分的工具类都是以他为基础,包括CountDownLatch,Lock,ReadWriteLock,Semaphare,条件队列……等等。本文将基于JDK1.8分析AQS实现原理。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

/**
* The synchronization state.
*/
private volatile int state;

/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;

/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
  • head: 等待队列头,延迟初始化。除了初始化时设置值,只能通过setHead()方法修改。注意:如果头已存在,头的waitStatus必须保证不为CANCELLED;
  • tail: 等待队列尾,延迟初始化。只能通过调用enq()方法加入新的等待节点才能修改;
  • state: AQS状态位,通过try*方法维护;
  • spinForTimeoutThreshold: 自旋锁超时阀值;

内部类Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 标识等待节点处于共享模式
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 标识等待节点处于独占模式
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
// 由于超时或中断,节点已被取消
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 表示下一个节点是通过park堵塞的,需要通过unpark唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 表示后续结点会传播唤醒的操作,共享模式下起作用
static final int PROPAGATE = -3;

/**
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
// 等待状态:对于condition节点,初始化为CONDITION;其它情况,默认为0,通过CAS操作原子更新
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
// 前驱节点
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
// 后继节点
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
// 当前node节点对应的线程对象
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
// 对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
// 判断是否共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
// 返回前驱节点,如果为空,抛出NullPointerException
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

Node中定义了等待的线程对象、节点状态、前驱节点、后继节点,以及标识了节点所处的模式:独占模式和共享模式。

  • 独占模式:每次只能有一个线程能持有资源;
  • 共享模式:允许多个线程同时持有资源。

例如:

  1. CountDownLatch的await方法可以在多个线程中调用,当CountDownLatch的计数器为0后,调用await的方法都会依次返回。 也就是说多个线程可以同时等待await方法返回,因此它适合被设计成共享模式,因为它获取的是一个共享资源,资源在所有调用await方法的线程间共享;
  2. ReentrantLock提供了lock和unlock方法,只允许一个线程获得锁,因此它适合被设计成独占模式,因为它获取的是一个独占资源,资源不能在调用lock方法的线程间共享;
  3. Semaphore维护了一组许可,acquire方法获取许可,如果有可用的许可,方法返回,否则block;可用看到,acquire获取到也是一个共享资源,只不过资源的数量有限制,因此它适合被设计成共享模式;
  4. ReentrantReadWriteLock提供了读写锁,写操作是独占的,读操作是可以彼此共享的,因此它同时使用了独占和共享模式。

模板方法

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 // 独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

独占模式源码分析

构建等待队列有很多的变种,有的加入了中断,有的加入了时间判断,但是根本的原理是一样的。这个例子是以无中断,无时间判断来讲解的。后面查看源代码的时候,会继续提到其他的方法。

acquire()

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

独占模式获取资源:

  • 调用tryAcquire,成功返回true,失败返回false。
    • true,表示获取资源成功,acquire直接执行结束;
    • false,表示获取资源失败,要进行排队获取;
  • 调用addWaiter,创建独占模式Node,并加入到等待队列的尾部;
  • 调用acquireQueued方法,按照线程加入队列的顺序获取资源;
  • 如果acquireQueued返回true,表示发生中断,因此通过selfInterrupt中断当前线程;
    注意:acquire方法会忽略中断,当中断发生时,并不会马上退出;

tryAcquire(int)

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗?就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。

addWaiter()

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 根据传入的模式(独占or共享)创建Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果pred不为空,说明有线程在等待
// 尝试使用CAS入列,如果入列失败,则调用enq采用自旋的方式入列
// 该逻辑在无竞争的情况下才会成功,快速入列
if (pred != null) {
// 所谓的入列,就是将节点设置为新的tail节点
// 注意:有可能设置node的前节点成功,但是CAS更新失败
// 这种情况下,由于无法从head或tail找到节点,问题不大
// 但是对于isOnSyncQueue这种方法,则会造成影响,需要特殊处理
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 通过CAS更新tail节点
// 将原tail节点的后节点设置为新tail节点
// 由于CAS和设置next不是原子操作,因此可能出现更新tail节点成功,但是未执行pred.next = node,导致无法从head遍历节点
// 但是由于前面已经设置了prev属性,因此可以从尾部遍历
// 像getSharedQueuedThreads、getExclusiveQueuedThreads都是从尾部开始遍历
pred.next = node;
return node;
}
}
enq(node); // 通过自旋入列
return node;
}

enq()

有竞争的情况下通过自旋方式入列,如果队列未初始化则初始化,然后再插入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 记录尾节点
// 由于采用lazy initialize,当队列为空时,需要进行初始化
if (t == null) { // Must initialize
// 通过CAS设置head和tail节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 将node的前节点设置为原tail节点
if (compareAndSetTail(t, node)) {
// CAS更新tail节点,更新成功则将原tail节点的后节点设置为node,返回原tail节点,入列成功
t.next = node;
return t;
}
}
}
}

可以看到,CAS原子性操作可以解决多线程竞争临界资源的问题。例如线程1通过compareAndSetHead初始化了head和tail节点,线程2此时运行到if (t == null),发现判断成立,通过CAS更新head节点,此时会更新失败,继续下一循环;直到线程1执行完tail=head,线程2才会进入else逻辑,节点入列;可以看到:

  • head节点实际上是个空节点;
  • head节点是通过new Node()创建,因此waitStatus=0;
  • 新入列的节点是通过Node(Thread thread, Node mode)创建,waitStatus=0。

acquireQueued()

acquireQueued主要是处理正在排队等待的线程。自旋、阻塞重试获取。如果获取成功则替换当前节点为链表头,然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 标记是否成功拿到资源
try {
boolean interrupted = false; // 标记等待过程中是否被中断过
// 仍然通过自旋,根据前面的逻辑,此处传入的为新入列的节点
for (;;) {
final Node p = node.predecessor(); // 获取前驱节点,即prev指向的节点
// 如果前驱是head,说明node是等待队列里排在最前面的节点(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
// 获取资源成功,将node设置为头节点,setHead清空节点属性thread,prev
setHead(node);
// 将原头节点的next设为null,帮助GC
p.next = null; // help GC
failed = false;
return interrupted; // 返回没有发生中断
}
// 如果node不是头结点或者acquire失败,说明自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 只有循环中出现异常,才会进入该逻辑
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire()

此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态)。整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点状态
int ws = pred.waitStatus;
// 已经设置了状态,由于SIGNAL表示要通过unpark唤醒后一节点,因此当获取失败时,是要调用park堵塞的,返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws > 0,表示前一节点已取消,则往前找,直到找到一个状态正常的节点,其实就是从队列删除取消状态的节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; // 更新next指针,去掉中间取消状态的节点
} else { // 更新pred节点的waitStatus为SIGNAL
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 返回false,表示不需要调用park
}

parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

1
2
3
4
5
6
7
8
9
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 调用park()使线程进入waiting状态
return Thread.interrupted(); // 如果被唤醒,查看自己是不是被中断的。注意:该方法会清除线程的中断状态
}

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:

  1. 被unpark();
  2. 被interrupt()。

需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

可以看到对于等待队列中的节点,shouldParkAfterFailedAcquire会将前节点的状态改为Node.SIGNAL;接着在下一次循环中调用parkAndCheckInterrupt堵塞线程。

cancelAcquire()

将节点的前驱有效节点,和后继有效节点连接起来,取消当前节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
// 获取node的前向节点
Node pred = node.prev;
// 如果发现前向节点状态为CANCELLED,则继续向前找,直到找到状态正常的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // 节点状态设为CANCELLED

// If we are the tail, remove ourselves.
// 如果node为tail节点,则将pred更新为tail节点
if (node == tail && compareAndSetTail(node, pred)) {
// 由于pred为新的尾节点,因此将其next设为null
compareAndSetNext(pred, predNext, null);
} else { // 如果node不是尾节点
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 当满足下面三个条件,将pred的next指向node的下一节点:
// 1.pred不是head节点:如果pred为头节点,而node又被cancel,则node.next为等待队列中的第一个节点,需要unpark唤醒
// 2.pred节点状态为SIGNAL或能更新为SIGNAL
// 3.pred的thread变量不能为null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 更新pred的next,指向node的下一节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 如果pred为头节点,则唤醒node的后节点
}

node.next = node; // help GC
}
}

unparkSuccessor()

激活node节点的后继节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果节点的状态不是已取消,就讲节点的状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 如果节点为空或者被取消了,则从队列尾部开始查找,找到离node最近的非null且状态正常的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 激活s节点,其实就是唤醒这个node节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}

acquireInterruptibly()

该方法和acquire类似,只不过发生中断时,会抛出InterruptedException;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

release(int)

激活并移除等待节点的过程,和加入等待节点的过程正好相反。首先调用子类的tryRelease方法,如果失败,就返回,如果tryRelease方法释放锁成功。就拿到队列的头结点。然后激活头结点的后继节点,激活的过程是,首先找到头结点的第一个后继有效节点,将其从队列中移除,然后激活这个节点对应的线程。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

共享模式源码分析

共享模式构建等待队列的实现的流程和独占模式构建等待队列的实现是一样的,唯一的不一样的地方是“tryAcquireShared”这个由子类实现的方法。他的过程是:首先尝试获取共享锁(注意这里返回的是整数,这是实现共享模式的关键。)如果失败(小于0),就构建一个共享节点添加到等待队列。并将当前线程挂起。

acquireShared()

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireShared()

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate()

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

doReleaseShared()

激活头结点的后继有效节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

releaseShared()

共享模式释放节点的流程和独占模式释放节点的流程基本一致。首先尝试更新释放状态tryReleaseShared方法,由具体的子类实现,如果成功就激活头节点的后继节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

参考博客