1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
/**
* 首先尝试获取一次锁,如果成功,则返回;
* 否则会把当前线程包装成Node插入到队列中,在队列中会检测是否为head的直接后继,并尝试获取锁,
* 如果获取失败,则阻塞当前线程,直至被 "释放锁的线程" 唤醒或者被中断,随后再次尝试获取锁,如此反复
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 在队列中新增一个节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 快速尝试
if (pred != null) {
node.prev = pred;
// 通过CAS在队尾插入当前节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 初始情况或者在快速尝试失败后插入节点
enq(node);
return node;
}
/**
* 通过循环+CAS在队列中成功插入一个节点后返回
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* AQS的精妙在于很多细节代码,比如需要用CAS往队尾里增加一个元素
* 此处的else分支是先在CAS的if前设置node.prev = t,而不是在CAS成功之后再设置。
* 一方面是基于CAS的双向链表插入目前没有完美的解决方案,另一方面这样子做的好处是:
* 保证每时每刻tail.prev都不会是一个null值,否则如果node.prev = t
* 放在下面if的里面,会导致一个瞬间tail.prev = null,这样会使得队列不完整
*/
node.prev = t;
// CAS设置tail为node,成功后把老的tail也就是t连接到node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 在队列中的节点通过此方法获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 检测当前节点的前驱节点是否为head,这是试获取锁的资格。
* 如果是的话,则调用tryAcquire尝试获取锁,成功,则将head置为当前节点
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果未成功获取锁,则根据前驱节点判断是否要阻塞。
* 如果阻塞过程中被中断,则置interrupted标志位为true。
* shouldParkAfterFailedAcquire方法在前驱状态不为SIGNAL的情况下都会循环重试获取锁
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 根据前驱节点中的waitStatus来判断是否需要阻塞当前线程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点设置为SIGNAL状态,在释放锁的时候会唤醒后继节点,
* 所以后继节点(也就是当前节点)现在可以阻塞自己
*/
return true;
if (ws > 0) {
/*
* 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点。
* 当前线程会之后会再次回到循环并尝试获取锁
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 等待状态为0或者PROPAGATE(-3),设置前驱的等待状态为SIGNAL,
* 并且之后会回到循环再次重试获取锁
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 该方法实现某个node取消获取锁
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 遍历并更新节点前驱,把node的prev指向前部第一个非取消节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 记录pred节点的后继为predNext,后续CAS会用到
Node predNext = pred.next;
// 直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node节点
node.waitStatus = Node.CANCELLED;
/*
* 如果CAS将tail从node置为pred节点了
* 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。
* 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。
* 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。
*
* 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果node还有后继节点,这种情况要做的事情是把pred和后继非取消节点拼起来
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
/*
* 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点
* 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*
* 这时说明pred == head或者pred状态取消或者pred.thread == null
* 在这些情况下为了保证队列的活跃性,需要去唤醒一次后继线程。
* 举例来说pred == head完全有可能实际上目前已经没有线程持有锁了,
* 自然就不会有释放锁唤醒后继的动作。如果不唤醒后继,队列就挂掉了。
*
* 这种情况下看似由于没有更新pred的next的操作,队列中可能会留有一大把的取消节点。
* 实际上不要紧,因为后继线程唤醒之后会走一次试获取锁的过程,
* 失败的话会走到shouldParkAfterFailedAcquire的逻辑。
* 那里面的if中有处理前驱节点如果为取消则维护pred/next,踢掉这些取消节点的逻辑。
*/
unparkSuccessor(node);
}
/*
* 取消节点的next之所以设置为自己本身而不是null,
* 是为了方便AQS中Condition部分的isOnSyncQueue方法,
* 判断一个原先属于条件队列的节点是否转移到了同步队列。
*
* 因为同步队列中会用到节点的next域,取消节点的next也有值的话,
* 可以断言next域有值的节点一定在同步队列上。
*
* 在GC层面,和设置为null具有相同的效果
*/
node.next = node;
}
}
/**
* 唤醒后继线程
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/*
* 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可
* 否则需要从tail开始向前找到node之后最近的非取消节点。
*
* 这里为什么要从tail开始向前查找也是值得琢磨的:
* 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。
* 不妨考虑到如下场景:
* 1. node某时刻为tail
* 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
* 3. compareAndSetTail成功
* 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
|