Skip to Content
DocumentationJavaJava并发编程AQS

AQS

AQS是一个基本框架,用于实现各类同步操作,例如同步锁,信号量等。

AQS基于一个int变量作为锁定字段(state),包含一个先进先出的等待队列,用户可以通过改变state的值来控制当前同步状态

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

CLH队列

CLH队列是一个先进先出的双向队列,用于保存等待线程队列

节点

队列中的每个等待线程由java.util.concurrent.locks.AbstractQueuedSynchronizer.Node描述,类结构如下。

同时,每个节点支持两种模式,即共享(SHARED)和独占(EXCLUSIVE)

  • 独占表示当前state只能被一个线程持有
  • 共享表示当前state可以被多个线程持有
/** Marker to indicate a node is waiting in shared mode */ /** * 标记当前节点是一个处于共享模式下的节点 */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** * 当前节点状态 */ volatile int waitStatus; /** * 前继节点 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 节点绑定的线程 */ volatile Thread thread; /** * 等待队列下一个节点,如果等于SHARED,则表示当前节点处于共享模式中 */ Node nextWaiter;

节点状态描述

状态描述
-1SIGNAL, 当前线程被park住了,需要等待unpark操作
1CANCELLED,表示当前线程取消了等待,比如在等待过程中发生了interrupt操作或者超时了
-2CONDITION,表示当前队列暂时挂起进入了Conditional队列
-3PROPAGATE,共享节点状态,如果当前节点的下一个节点也是共享的,此状态将传递下去

初始化

CLH采用懒加载的机制,在第一个等待队列加入时,初始化队列头节点

private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }

入队

添加节点

private Node addWaiter(Node mode) { //创建一个指定独占/共享模式的节点 Node node = new Node(mode); for (;;) { Node oldTail = tail; //如果尾节点为null,表示队列需要进行初始化 if (oldTail != null) { //当前节点前继节点就是之前的尾节点 node.setPrevRelaxed(oldTail); //将当前节点作为尾节点(CAS操作) if (compareAndSetTail(oldTail, node)) { CAS操作成功后,原尾节点设置成当前节点 oldTail.next = node; return node; } } else { //初始化操作 initializeSyncQueue(); } } }

独占模式下入队

public final void acquire(int arg) { if (!tryAcquire(arg) && //try方法需要用户自行实现,判断当前状态是否支持当前线程持有 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //入队 selfInterrupt(); //如果发生了打断,则需要传递下去 } /** * 此时node已经加入过等待队列 */ final boolean acquireQueued(final Node node, int arg) { //记录当前线程是否被打断过 boolean interrupted = false; try { for (;;) { //获取当前节点的前继节点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//如果前置节点是head,说明已经到了第一个,那么需要尝试获取状态 //将当前节点作为头节点,当前节点的thread,prev清空 setHead(node); //前置节点取消引用 p.next = null; // help GC //返回等待过程中是否被中断过 return interrupted; } //前节点不是头节点或者获取状态失败,此时需要判断是否需要park //需要park的条件 前节点状态==SIGNAL(前节点被已park了) //如果检查出前节点状态为CANCELED,那么需要往前遍历,找到第一个状态不为CANCELED的节点,将其作为node的前继节点 //如果前节点状态为其他,则需要进一步判断,这里就不需要进行park操作了,继续循环到下一步 //这里主要作用 if (shouldParkAfterFailedAcquire(p, node)) //写入是否被中断过 interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { //当前节点等待过程中发生了任何异常,取消等待,待后续有节点加入时被移除 cancelAcquire(node); if (interrupted) //保留打断状态 selfInterrupt(); //异常传播 throw t; } }

共享模式下入队

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//尝试共享获取状态,用户自行判断 doAcquireShared(arg); //共享模式下入队操作 } private void doAcquireShared(int arg) { // 添加第一个共享模式的节点 final Node node = addWaiter(Node.SHARED); // 判断线程是否被打断过 boolean interrupted = false; try { for (;;) { //找到前继节点 final Node p = node.predecessor(); if (p == head) {//前节点是头节点,此时再次尝试获取状态 int r = tryAcquireShared(arg); if (r >= 0) {//表示允许获取 //将当前节点设置成头节点,并将waitingState状态传播下去 setHeadAndPropagate(node, r); 原先头节点取消关联 p.next = null; // help GC return; } } //同独占模式一样,判断是否需要进行park,避免空循环 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { //出现异常,取消获取 cancelAcquire(node); throw t; } finally { if (interrupted) //传递打断状态 selfInterrupt(); } }

共享模式下状态传播

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //当前节点设置成head setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //检查下一个节点是否是共享模式,如果是则唤醒下一个节点,因为共享模式下允许多个节点同时持有状态 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果头节点被park住了,尝试unpark操作,因为头节点如果是刚放上去的,此时状态还是被park,此时需要解除状态,执行到这里说明head的设置已经完成了 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //头节点unpark后,状态设置成PROPAGATE else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } //在这个过程中head此时有变化了,重新执行流程 if (h == head) // loop if head changed break; } } private void unparkSuccessor(Node node) { //重置状态,如果是>0说明可能被取消了 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); //遍历当前节点的下一个节点,找到第一个非取消状态的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //从尾往前遍历,这是因为取消操作会改变每个节点的next值,但是不会改变节点的prev for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } //唤醒第一个非取消状态的节点线程,参与竞争 if (s != null) LockSupport.unpark(s.thread); }

取消

取消操作一般发生在请求状态超时或者等待竞争中出现异常,代码实现

private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //既然取消,先取消节点上的线程绑定 node.thread = null; //从所有前节点中,找到第一个不为取消状态的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary, although with // a possibility that a cancelled node may transiently remain // reachable. //往前第一个不为取消状态的节点开始 //predNext即为连续取消节点的第一个 Node predNext = pred.next; //当前节点状态变更 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) {//将pred设置成尾节点 //本身就是尾节点了,取消next pred.compareAndSetNext(predNext, null); } else { //如果当前节点不是尾节点 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { //pred不是head,把pred的next指向当前节点的next Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { //pred是head,说明当前节点的前面所有节点都是取消状态,直接唤醒下一个节点 unparkSuccessor(node); } //当前节点的下一个引用取消 node.next = node; // help GC } //注意此时只取消了next的关联,prev的关联会在后续shouldParkAfterFailedAcquire方法中取消 } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //这里会改变prev的状态,清理状态为取消的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; }
Last updated on