AbstractQueudSynchronizer是Doug Lea Java并发编程的基础,简称AQS。内部主要包括Node和ConditionObject两个内部类,基于Node节点构建了一个FIFO(先进先出)队列,用来存储等待锁的线程的队列。基于ConditionObject节点也构造了一个FIFO队列,用于存储因为某种原因已经获取到锁而又主动释放锁的线程的队列。在concurrent包下面的大部分的工具类都是以他为基础,包括CountDownLatch,Lock,ReadWriteLock,Semaphare,条件队列……等等。本文将基于JDK1.8分析AQS实现原理。
成员变量
1 | /** |
- head: 等待队列头,延迟初始化。除了初始化时设置值,只能通过setHead()方法修改。注意:如果头已存在,头的waitStatus必须保证不为CANCELLED;
- tail: 等待队列尾,延迟初始化。只能通过调用enq()方法加入新的等待节点才能修改;
- state: AQS状态位,通过try*方法维护;
- spinForTimeoutThreshold: 自旋锁超时阀值;
内部类Node
1 | static final class Node { |
Node中定义了等待的线程对象、节点状态、前驱节点、后继节点,以及标识了节点所处的模式:独占模式和共享模式。
- 独占模式:每次只能有一个线程能持有资源;
- 共享模式:允许多个线程同时持有资源。
例如:
- CountDownLatch的await方法可以在多个线程中调用,当CountDownLatch的计数器为0后,调用await的方法都会依次返回。 也就是说多个线程可以同时等待await方法返回,因此它适合被设计成共享模式,因为它获取的是一个共享资源,资源在所有调用await方法的线程间共享;
- ReentrantLock提供了lock和unlock方法,只允许一个线程获得锁,因此它适合被设计成独占模式,因为它获取的是一个独占资源,资源不能在调用lock方法的线程间共享;
- Semaphore维护了一组许可,acquire方法获取许可,如果有可用的许可,方法返回,否则block;可用看到,acquire获取到也是一个共享资源,只不过资源的数量有限制,因此它适合被设计成共享模式;
- ReentrantReadWriteLock提供了读写锁,写操作是独占的,读操作是可以彼此共享的,因此它同时使用了独占和共享模式。
模板方法
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
1 | // 独占方式。尝试获取资源,成功则返回true,失败则返回false。 |
独占模式源码分析
构建等待队列有很多的变种,有的加入了中断,有的加入了时间判断,但是根本的原理是一样的。这个例子是以无中断,无时间判断来讲解的。后面查看源代码的时候,会继续提到其他的方法。
acquire()
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:
1 | /** |
独占模式获取资源:
- 调用tryAcquire,成功返回true,失败返回false。
- true,表示获取资源成功,acquire直接执行结束;
- false,表示获取资源失败,要进行排队获取;
- 调用addWaiter,创建独占模式Node,并加入到等待队列的尾部;
- 调用acquireQueued方法,按照线程加入队列的顺序获取资源;
- 如果acquireQueued返回true,表示发生中断,因此通过selfInterrupt中断当前线程;
注意:acquire方法会忽略中断,当中断发生时,并不会马上退出;
tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:
1 | protected boolean tryAcquire(int arg) { |
什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗?就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
addWaiter()
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
1 | /** |
enq()
有竞争的情况下通过自旋方式入列,如果队列未初始化则初始化,然后再插入。
1 | /** |
可以看到,CAS原子性操作可以解决多线程竞争临界资源的问题。例如线程1通过compareAndSetHead初始化了head和tail节点,线程2此时运行到if (t == null),发现判断成立,通过CAS更新head节点,此时会更新失败,继续下一循环;直到线程1执行完tail=head,线程2才会进入else逻辑,节点入列;可以看到:
- head节点实际上是个空节点;
- head节点是通过new Node()创建,因此waitStatus=0;
- 新入列的节点是通过Node(Thread thread, Node mode)创建,waitStatus=0。
acquireQueued()
acquireQueued主要是处理正在排队等待的线程。自旋、阻塞重试获取。如果获取成功则替换当前节点为链表头,然后返回。
1 | /** |
shouldParkAfterFailedAcquire()
此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态)。整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
1 | /** |
parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
1 | /** |
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
- 被unpark();
- 被interrupt()。
需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
可以看到对于等待队列中的节点,shouldParkAfterFailedAcquire会将前节点的状态改为Node.SIGNAL;接着在下一次循环中调用parkAndCheckInterrupt堵塞线程。
cancelAcquire()
将节点的前驱有效节点,和后继有效节点连接起来,取消当前节点。
1 | /** |
unparkSuccessor()
激活node节点的后继节点。
1 | /** |
acquireInterruptibly()
该方法和acquire类似,只不过发生中断时,会抛出InterruptedException;
1 | /** |
release(int)
激活并移除等待节点的过程,和加入等待节点的过程正好相反。首先调用子类的tryRelease方法,如果失败,就返回,如果tryRelease方法释放锁成功。就拿到队列的头结点。然后激活头结点的后继节点,激活的过程是,首先找到头结点的第一个后继有效节点,将其从队列中移除,然后激活这个节点对应的线程。
1 | public final boolean release(int arg) { |
共享模式源码分析
共享模式构建等待队列的实现的流程和独占模式构建等待队列的实现是一样的,唯一的不一样的地方是“tryAcquireShared”这个由子类实现的方法。他的过程是:首先尝试获取共享锁(注意这里返回的是整数,这是实现共享模式的关键。)如果失败(小于0),就构建一个共享节点添加到等待队列。并将当前线程挂起。
acquireShared()
此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
1 | /** |
doAcquireShared()
此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
1 | /** |
setHeadAndPropagate()
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。
1 | /** |
doReleaseShared()
激活头结点的后继有效节点。
1 | /** |
releaseShared()
共享模式释放节点的流程和独占模式释放节点的流程基本一致。首先尝试更新释放状态tryReleaseShared方法,由具体的子类实现,如果成功就激活头节点的后继节点。
1 | /** |
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。