简介
AQS
:AbstractQueuedSynchronizer
是java并发包中最基础最重要的类,很多锁和并发工具都是在它的基础上构建的。
提供了以下功能:
CLH
同步队列(获取不到资源的线程会进入到这个队列等待)- 同步状态(共享资源)
- 资源的获取和释放模板方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
字段
// 同步队列头结点
private transient volatile Node head;
// 同步队列尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 当前持有锁的线程,从AbstractOwnableSynchronizer继承的
private transient Thread exclusiveOwnerThread;
常量
static final long spinForTimeoutThreshold = 1000L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
内部类
- Node
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 节点状态
volatile int waitStatus;
// 节点状态:线程被取消
static final int CANCELLED = 1;
// 节点状态:释放锁时唤醒后继节点
static final int SIGNAL = -1;
// 节点状态:线程在 condition 队列中
static final int CONDITION = -2;
// 节点状态:传播
static final int PROPAGATE = -3;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点对应的线程
volatile Thread thread;
// 用于条件队列或者共享锁
Node nextWaiter;
// 是不是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱节点,为空则报异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待状态waitStatus
可取值如下:
值 | 常量 | 说明 |
---|---|---|
1 | CANCELLED | 线程被取消 |
-1 | SIGNAL | 唤醒后继节点 |
-2 | CONDITION | 线程在 condition 队列中 |
-3 | PROPAGATE | 传播 |
0 | 无状态,初始化时为0 |
主要 API
获取独占锁
acquire
源码如下:
public final void acquire(int arg) {
// 先尝试获取锁,获取到直接返回;否则线程加入等待队列并设置为独占模式
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 在等待的过程中,对中断不处理;获取成功后,在这里补上
selfInterrupt();
}
// 尝试获取锁,具体逻辑由自定义的同步器去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 将当前线程加入等待队列
private Node addWaiter(Node mode) {
// 创建节点,封装当前线程
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 将当前节点通过CAS方式放在队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 快速入队成功,直接返回节点
return node;
}
}
// 将节点入队
enq(node);
return node;
}
// 将节点放入队尾
private Node enq(final Node node) {
// 通过循环保证节点一定能入队
for (;;) {
Node t = tail;
// 队列是延迟到使用时才初始化的,所以这里可能为空
// 队列为空时,则初始化
if (t == null) {
// 头结点和尾节点都指向一个新建的空节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 将当前节点通过CAS方式放在队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 队列中的节点不断尝试获取锁直到成功,最后返回是否中断过标记
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记是否被中断过
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若前驱节点是头结点,则尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功时,将当前节点置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱节点状态为SIGNAL时阻塞当前线程,否则一直循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 若发生了异常,取消当前线程对资源的获取
if (failed)
cancelAcquire(node);
}
}
// 若前驱节点状态为SIGNAL时,直接返回 true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点状态为SIGNAL时,会在释放锁时唤醒后面的节点,也就是当前节点,所以当前节点可以阻塞
if (ws == Node.SIGNAL)
return true;
// 前驱节点状态为CANCELLED(取消)时,向前遍历,直到找到最近的非取消状态的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 若为其他状态,则将前驱节点状态设置为SIGNAL
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 将当前线程阻塞
LockSupport.park(this);
// 当前线程被唤醒后,查看是否被中断过
return Thread.interrupted();
}
释放独占锁
release
源码如下:
public final boolean release(int arg) {
// 先尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 尝试获取锁,具体逻辑由自定义的同步器去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// status置为0并唤醒后驱节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 当前节点有效时,CAS设置节点状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后驱节点失效时,清除节点,并通过遍历找到下一个有效节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后驱节点
if (s != null)
LockSupport.unpark(s.thread);
}
获取共享锁
acquireShared
源码如下:
public final void acquireShared(int arg) {
// 先尝试获取锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 尝试获取锁,具体逻辑由自定义的同步器去实现
// 返回负数表示失败;0 表示成功,没有剩余资源;正数表示成功,还有剩余资源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 当前线程入队等待,直到拿到资源为止
private void doAcquireShared(int arg) {
// 创建共享模式的节点并加入队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 是否被中断过
boolean interrupted = false;
// 循环直到拿到资源
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点若为头结点
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
// 大于0表示获取成功
if (r >= 0) {
// 当前节点设置为头结点,如果还有剩余资源,则继续唤醒下一个节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 若等待的过程中被中断过,这里将处理补上
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 前驱节点状态为SIGNAL时阻塞当前线程,否则一直循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 当前节点设置为头结点
setHead(node);
// 如果还有剩余资源,则继续唤醒下一个节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 节点状态为SIGNAL时,唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 节点状态为 0 时,状态改为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
释放共享锁
releaseShared
源码如下:
public final boolean releaseShared(int arg) {
// 先尝试释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 尝试释放共享锁,具体逻辑由自定义的同步器去实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 节点状态为SIGNAL时,唤醒后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 节点状态为 0 时,状态改为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// status置为0并唤醒后驱节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 当前节点有效时,CAS设置节点状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后驱节点失效时,清除节点,并通过遍历找到下一个有效节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后驱节点
if (s != null)
LockSupport.unpark(s.thread);
}
备注
使用JDK
版本为1.8