JUC 之 AQS

JUC 之 AQS

五月 12, 2020

AQS

简介

AQS 全称 AbstractQueuedSynchronizer, 它底层有一个先进先出(FIFO)队列,
用来实现同步锁, CountDownLatch, Semaphore, ReentrantLock, Mutex,
ReentrantReadWriteLock, ThreadPoolExecutor 等内部实现都用到了AQS.
AQS是一个抽象类,继承自AbstractOwnableSynchronizer, 主要是通过继承的方式
来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的
方法来提供自定义的同步组件。

同步状态

AQS维护一个全局的状态码 state ,线程通过修改码(加/减指定的数量)是否成功
来决定当前线程是否成功获取到同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

AQS 独占模式(EXCLUSIVE)和共享模式(SHARED)

AQS支持两种获取同步状态的模式既独占模式和共享模式.
独占模式: 同一时刻只允许一个线程获取同步状态.
共享模式: 允许多个线程同时获取同步状态.

数据结构

AQS使用同步队列(一个FIFO双向队列)来完成同步状态的管理,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static final class Node {
// 声明node在同步队列中处于共享模式
static final Node SHARED = new Node();
// 声明node在同步队列中处于独占模式
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* CANCELLED: 表示当前结点已取消调度.当timeout或被中断
* (响应中断的情况下),会触发变更为此状态,
* 进入该状态后的结点将不会再变化.
*
* SIGNAL: 表示后继结点在等待当前结点唤醒.后继结点入队时,
* 会将前继结点的状态更新为SIGNAL.
*
* CONDITION: 表示结点等待在Condition上,当其他线程调用了
* Condition的signal()方法后,CONDITION状态的结点将
* 从等待队列转移到同步队列中,等待获取同步锁.
*
* PROPAGATE: 共享模式下,前继结点不仅会唤醒其后继结点,
* 同时也可能会唤醒后继的后继结点.
*
* 0: 新结点入队时的默认状态。
*
*
* 负值表示结点处于有效等待状态,而正值表示结点已被取消.
* 源码中很多地方用>0、<0来判断结点的状态是否正常.
*/
volatile int waitStatus;

volatile Node prev;
volatile Node next;
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

独占模式

当前线程获取同步状态失败时,AQS就把当前线程的相关状态消息构建
一个node节点加入到同步队列, 以“死循环”的方式(自旋)获取同步状态.
如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠
前驱节点的出队或阻塞线程被中断来实现。

获取失败的线程,加入到同步队列的队尾;
加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态

如果头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;
其他节点则继续自旋。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 获取同步状态
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断
selfInterrupt();
}

// 请求队列 循环获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

// 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}