欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > JUC AQS(AbstractQueuedSynchronizer)源码

JUC AQS(AbstractQueuedSynchronizer)源码

2025/2/25 21:00:22 来源:https://blog.csdn.net/jiezheee/article/details/139551933  浏览:    关键词:JUC AQS(AbstractQueuedSynchronizer)源码
以下源码以公平锁为例, 理解公平锁后,对AQS、CAS、CLH队列、独占锁、共享锁都会清晰,再去理解非公平锁就会比较简单。
注:本文是基于对ReentrantLock有基本使用经验,至少知道lock和unlock的使用。 建议在Idea里同步参考AbstractQueuedSynchronizer源码,跳转着看调用关系更直观。
 

几个基本概念

1. AQS:AbstractQueuedSynchronizer类
    AQS是JUC中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现,AQS是独占锁和共享锁的公共父类。

2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
    (01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。
    (02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。

3.ReentrantLock与AQS的关系

ReentrantLock与sync是组合关系,ReentrantLock中包含了Sync对象。

Sync是AQS的子类,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。

ReentrantLock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是FairSync的实例还是NonFairSync的实例。

4. CLH队列

双端队列,每个线程在自己节点自旋,直到获取锁或者取消。

5.AQS中CLH队列节点,每个节点代表一个排队的线程。

private transient volatile Node head;    // CLH队列的队首
private transient volatile Node tail;    // CLH队列的队尾// CLH队列的节点
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;// 线程已被取消static final int CANCELLED =  1;// 后继线程需要被unpark(唤醒)// 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。static final int SIGNAL  = -1;// 线程(处在Condition休眠状态)在等待Condition唤醒static final int CONDITION = -2;// (共享锁)其它线程获取到“共享锁”static final int PROPAGATE = -3;// 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。volatile int waitStatus;// 前一节点volatile Node prev;// 后一节点volatile Node next;// 节点所对应的线程volatile Thread thread;// nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”// 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。Node nextWaiter;// “共享锁”则返回true,“独占锁”则返回false。final boolean isShared() {return nextWaiter == SHARED;}// 返回前一节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {    // Used to establish initial head or SHARED marker}// 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

一,获取锁

1,ReentrantLock.lock

获取锁的入口方法,通过调用sync的lock实现。
public void lock() {sync.lock();
}

2,sync.lock

final void lock() {acquire(1);
}

3,acquire

public final void acquire(int arg) {if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();
}

4,tryAcquire

protected final boolean tryAcquire(int acquires) {// 获取“当前线程”final Thread current = Thread.currentThread();// 获取“独占锁”的状态int c = getState();// c=0意味着“锁没有被任何线程锁拥有”,if (c == 0) {// 若“锁没有被任何线程锁拥有”,// 则判断“当前线程”是不是CLH队列中的第一个线程,// 若是的话,则获取该锁,设置锁的状态,并且设置锁的拥有者为“当前线程”。if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {// 如果“独占锁”的拥有者已经为“当前线程”,// 则将更新锁的状态。int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}

5, addWaiter

private Node addWaiter(Node mode) {// 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。enq(node);return node;
}

6,acquireQueued

acquireQueued()的作用就是当前线程在这个方法自旋或阻塞(回忆CLH队列),直到获取锁返回。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// interrupted表示在CLH队列的调度中,// “当前线程”在休眠时,有没有被中断过。boolean interrupted = false;for (;;) {// 获取上一个节点。// node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 前继节点释放锁后,本节点获取到锁,并维护CLH队列头节点 setHead(node);p.next = null; // help GCfailed = false;// 当前线程只有前置节点是head,且成功(唤醒)获取锁才返回!return interrupted;}//  当前线程parkAndCheckInterrupt处阻塞,直到唤醒if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
注意: 线程解除阻塞,可能是由于其它线程调用了该线程的unpark()函数,也 可能是由于线程被中断!
即使线程被中断而获取到cpu执行权利,而如果该线程前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 

7,shouldParkAfterFailedAcquire

判断“当前线程”是否需要被阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 如果前继节点状态为SIGNAL,表明当前节点需要(将来某个时间)被unpark,此时则返回true,阻塞当前线程!if (ws == Node.SIGNAL)return true;// 如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;// 如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
当一个线程刚被加入队列尾部,waitstatus是0,前继节点waitstatus也是0;
② shouldParkAfterFailedAcquire会走到compareAndSetWaitStatus,设置前置waitstatus为-1;
③ 再返回acquireQueued去循环一次,再进入shouldParkAfterFailedAcquire的时候就会返回true了,当前线程就会原地阻塞,等待唤醒;
当前节点waitstatus被修改的时机(主要):
  1. 等后继节点加入CLH队列,通过AQS的shouldParkAfterFailedAcquire方法修改为Node.SIGNAL;
  2. 当前节点释放锁,通过AQS的unparkSuccessor方法修改为0;

8, parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {// 通过LockSupport的park()阻塞“当前线程”。LockSupport.park(this);// 返回线程的中断状态。return Thread.interrupted();
}
阻塞当前线程, 直到被唤醒,并且返回“线程被唤醒之后”的中断状态。
线程被阻塞之后,一般有2种情况唤醒:
1. unpark() 唤醒:前继节点使用完锁之后,通过unpark()唤醒当前线程。
2. 中断唤醒:其它线程通过interrupt()中断当前线程。
LockSupport()中的park()、unpark()的作用和Object中的wait()、notify()作用类似,是阻塞/唤醒。它们的用法不同,park(), unpark()是轻量级的,而wait(), notify()是必须先通过Synchronized获取同步锁。
注意:如果线程是阻塞在LockSupport.park()上,给线程中断后,不会抛出异常,线程唤醒继续执行,且线程中断标志为true。可以参考另篇关于中断的博客。

二,释放锁

1, unlock

public void unlock() {sync.release(1);
}

2, release

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
如果锁完全释放,唤醒后继线程后,后继线程在 acquireQueued自旋获得锁,同时把后继线程设置为head!

3, tryRelease

protected final boolean tryRelease(int releases) {// c是本次释放锁之后的状态int c = getState() - releases;// 如果“当前线程”不是“锁的持有者”,则抛出异常!if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 设置当前线程的锁的状态。setState(c);return free;
}

4, unparkSuccessor

private void unparkSuccessor(Node node) {// 获取当前线程的状态int ws = node.waitStatus;// 如果状态<0,则设置状态=0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//获取当前节点的“有效的后继节点(线程状态<=0)”,无效的话,则通过for循环进行获取。Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒“后继节点对应的线程”if (s != null)LockSupport.unpark(s.thread);
}

三,CLH队列建立、获取、释放锁

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词