Java并发框架AQS

前言

AbstractQueuedSynchronizer框架为Doug Lea大神为JDK所写的并发框架,在JDK1.5的时候正式加入了JDK,它是之后的各类锁的实现底层,所以了解它就可以有效了解这些锁的机制。它本质上是通过一个state来实现的。

内部类

首当其冲的是一个Node内部类,我这里只摘取比较重要的属性,首先默认是共享锁:

1
2
3
4
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

然后有一个状态:

1
2
3
4
5
6
7
8
// 当前节点在队列中的状态
volatile int waitStatus;

// 下面这些静态变量用来代表状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

并且为这个状态定义了一堆的相关常量,这里省略。

除此之外,由于我们要构造一个链表来保存相关的Node,所以Node节点中还有对前驱节点后后置节点的引用。而且Node节点本身代表了一个线程,所以有一个线程的引用:

1
2
3
4
volatile Node prev;
volatile Node next;

volatile Thread thread;

AQS类

组成

往简单的说,AQS其实就是一个状态+一个链表组成的东西:

1
2
3
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

对,就这三个属性(其它压根不重要)构成了AQS。其中的state就是用来实现可重入的机制的,每次进入都对其加一,离开锁就减一。

加锁原理

由于如果只是单纯的讲AQS的各种函数,容易让人摸不着头脑,所以还是结合ReentrantLock来进行函数追踪好了。追踪到ReentrantLock中,可以看到实际是调用了sync.acquire这个方法,并且把参数1传入其中。

1
2
3
public void lock() {
sync.acquire(1);
}

其中这个sync是ReentrantLock的一个内部类,这个类就是继承了AQS。然后方法就来到了AQS内部:

1
2
3
4
5
// 注意这里的参数arg = 1
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

然后又看到使用了tryAcquire这个函数。这个函数本身在ReentrantLock内部,它长这样:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

最后终于来到了这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

也就是这里就是最终的上锁逻辑了。

可以看到acquires是1,然后默认的state是0,那么此时就会执行compareAndSetState,传入参数是(0,1),即用CAS,如果原来的state是0,那么就修改成1。然后返回true。并且把当前的线程设置为exclusiveOwnerThread。此时上锁完成。

如果有别的线程和它竞争,那么在上面的第五行是返回false的,这样它就会返回false了。那么就回到if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))这里,可以看到由于返回的是false,所以会触发第二个方法,即acquireQueued方法的执行。

首先我们会创建一个节点,可以看到状态是EXCLUSIVE的。然后把这个节点插入到队列的尾部(如果队列是空的话,那么进行初始化操作,即用一个空节点作为head节点)。OK现在这个竞争锁失败的节点已经加到了队列的尾部了,它会判断一下自己是不是第一个有效的节点(真正的第一个是空的Node),如果是的话,就进行tryAcquire再次尝试获取锁。

从上面这些我们也可以很容易的得出结论:线程首先尝试通过CAS操作获取锁,如果能够获取,那么最好;如果不能够获取,就把自己加入到一个等待队列的最末端。

加锁中的细节

之前的例子是排它锁,如果我们希望是一个共享锁,那么我们可以设置state的值为N,N就是代表至多可以有N个线程来共享。每个线程都是通过CAS来把state减一,如果state到0了就说明共享锁已经不能获取了,需要阻塞。

一般来说,我们自己如果要实现一个锁,那么要么是独占锁,要么是共享锁,所以我们只需要去实现tryAcquire-tryRelease或者是tryAcquireShared-tryReleaseShared这两组中的一个就可以了。

而ReentrantLock是一个排他锁,所以实现了tryAcquire-tryRelease这一组操作。ReentrantLock通过调用AQS的acquire方法,然后AQS的acquire方法又会调用tryAcquire方法,而这个tryAcquire则是ReentrantLock自己实现的。

队列详解

从上面可以看到,如果成功的获取了锁,那是轻松简单又惬意。如果失败了,那就需要放到队列的最后。

那么队列中的Node(每个Node包含了一个thread)是如何来竞争锁的呢?

首先如果一个Node是第一个有效节点,那么就尝试去获取一次锁,成功了的话,就要把这个有效节点从当前队列中移除,AQS的做法是将这个有效节点设置成头节点(再次强调,队列中的头结点并不包含任何线程,只是说明这是一个头节点),等同于把队列中的thread删除了。失败的话(非公平锁中,别的节点是完全有可能比第一个有效节点先拿到锁的),那么就需要判断当前的这个节点,要不要被阻塞。

同理如果这个Node甚至不是第一个有效节点,也需要判断这个节点到底要不要被阻塞。

判断当前节点要不要阻塞,其实是看前面的节点。具体做法是从当前节点往前找:

  • 如果前面一个节点的状态是SIGNAL,那么说明前面那个线程已经准备好了,当前的这个节点就应该阻塞。
  • 如果前面一个节点的状态是CANCELLED,意味前面那个节点已经不再需要锁了,此时循环向前搜索,把所有的状态是CANCELLED的都踢出队列。
  • 如果前面一个节点的状态是其他的,那么就把前面的那个节点的状态设置成SIGNAL。

解锁

解锁过程暂时省略了,了解了加锁是CAS+队列的话,相信解锁应该不难理解。

AQS应用

接下来就是AQS的应用了。下表来自美团技术博客

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

可以看到上面只有Semaphore和ReentrantReadWriteLock用到的是共享锁。

当然我们还可以自己实现一个自己的锁,我这里提供了一个最简单的demo(也是仿照ReentrantLock):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MyLock {

private static class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}

@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}

}

private MySync mySync = new MySync();

public void lock() {
mySync.acquire(1);
}

public void unlock() {
mySync.release(1);
}

private static MyLock myLock = new MyLock();

private static int count = 0;

private static final int RUN_TIME = 100000;

public static void main(String[] args) throws InterruptedException {
Runnable runnable = () -> {
try {
myLock.lock();
for (int i = 0; i < RUN_TIME; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myLock.unlock();
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}

从上面可以发现,真的是特别的简单,只需要重写tryAcquire-tryRelease这一组方法,并且里面用的还是AQS自己提供的方法。最后再通过acquire-release包装一下就好了。