AQS

2022/02/11 Java

AQS

<前一整子做了一个AQS的技术分享,特将内容整理记录如下,^_^>

什么是AQS?

AQS 的全称是 AbstractQueuedSynchronizer,即抽象队列同步器。是Java并发工具的基础,采用乐观锁,通过CAS与自旋轻量级的获取锁。维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。很多JUC包,比如ReentrantLock、Semaphore、CountDownLatch等并发类均是继承AQS,通过AQS的模板方法,来实现的。

原理

AQS的组成结构

AQS = 同步状态(volatile int state) + 同步队列(即等待队列,FIFO的CLH队列) + 条件队列(ConditionObject)

  • state:代表共享资源。volatile 保证并发读,CAS 保证并发写
  • 同步队列(即等待队列,CLH队列):是CLH变体的虚拟双向队列(先进先出FIFO)来等待获取共享资源。当前线程可以通过signal和signalAll将条件队列中的节点转移到同步队列中
  • 条件队列(ConditionObject):当前线程存在于同步队列的头节点,可以通过await从同步队列转移到条件队列中

同步状态

在AQS中维护了一个同步状态变量state,getState函数获取同步状态,setState、compareAndSetState函数修改同步状态,对于AQS来说,线程同步的关键是对state的操作,可以说获取、释放资源是否成功都是由state决定的,比如state>0代表可获取资源,否则无法获取,所以state的具体语义由实现者去定义,现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。

  • ReentrantLock的state用来表示是否有锁资源
  • ReentrantReadWriteLock的state高16位代表读锁状态,低16位代表写锁状态
  • Semaphore的state用来表示可用信号的个数
  • CountDownLatch的state用来表示计数器的值

CLH队列

CLH是AQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构,当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说AQS通过CLH队列管理竞争资源的线程,CLH队列具有如下几个优点:

  • 先进先出保证了公平性
  • 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
  • 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁

Node内部类

NodeAQS的内部类,每个等待资源的线程都会封装成Node节点组成CLH队列、等待队列,所以说Node是非常重要的部分,理解它是理解AQS的第一步。

waitStatus

nextWaiter

  • NodeCLH队列时,nextWaiter表示共享式或独占式标记 SHARED/EXCLUSIVE
  • Node在条件队列时,nextWaiter表示下个Node节点指针

条件队列

Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能,A Q S的ConditionObject条件变量也提供这样的功能,通过ConditionObject的await和signal两类函数完成。

ConditionObject内部维护着一个单向条件队列,不同于CLH队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在CLH队列, 条件队列出队的节点,会入队到CLH队列。

当某个线程执行了ConditionObject的await函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObject的signal函数,会将条件队列头部线程节点转移到C H L队列参与竞争资源,具体流程如下图:

流程说明

线程获取资源失败,封装成Node节点从CLH队列尾部入队并阻塞线程,某线程释放资源时会把CLH队列首部Node节点关联的线程唤醒(此处的首部是指第二个节点,后面会细说),再次获取资源。

入队

获取资源失败的线程需要封装成Node节点,接着尾部入队,在AQS中提供addWaiter函数完成Node节点的创建与入队。

/**
  * @description:  Node节点入队-CLH队列
  * @param mode 标记Node.EXCLUSIVE独占式 or Node.SHARED共享式
  */
private Node addWaiter(Node mode) {
        // 根据当前线程创建节点,等待状态为0
        Node node = new Node(Thread.currentThread(), mode);
        // 获取尾节点
        Node pred = tail;
        if (pred != null) {
            // 如果尾节点不等于null,把当前节点的前驱节点指向尾节点
            node.prev = pred;
            // 通过CAS把尾节点指向当前节点
            if (compareAndSetTail(pred, node)) {
                // 之前尾节点的下个节点指向当前节点
                pred.next = node;
                return node;
            }
        }

        // 如果添加失败或队列不存在,执行end函数
        enq(node);
        return node;
}

添加节点的时候,如果从CLH队列已经存在,通过CAS快速将当前节点添加到队列尾部,如果添加失败或队列不存在,则指向enq函数自旋入队。

/**
  * @description: 自旋cas入队
  * @param node 节点
  */
private Node enq(final Node node) {
        for (;;) { //循环
            //获取尾节点
            Node t = tail;
            if (t == null) {
                //如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
                if (compareAndSetHead(new Node()))
                    //cas成功,尾节点指向哨兵节点
                    tail = head;
            } else {
                //当前节点的前驱节点设指向之前尾节点
                node.prev = t;
                //cas设置把尾节点指向当前节点
                if (compareAndSetTail(t, node)) {
                    //cas成功,之前尾节点的下个节点指向当前节点
                    t.next = node;
                    return t;
                }
            }
        }
}

通过自旋CAS尝试往队列尾部插入节点,直到成功,自旋过程如果发现CLH队列不存在时会初始化CLH队列,入队过程流程如下图:

第一次循环

  • 刚开始C L H队列不存在,head与tail都指向null
  • 要初始化C L H队列,会创建一个哨兵节点,head与tail都指向哨兵节点

第二次循环

  • 当前线程节点的前驱节点指向尾部节点(哨兵节点)
  • 设置当前线程节点为尾部,tail指向当前线程节点
  • 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)

最后结合addWaiter与enq函数,整体看一下入队流程图:

出队

CLH队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将head.next指向的线程节点唤醒(CLH队列的第二个节点),如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点acquireQueued函数中的部分代码

//1.获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
    //2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
    setHead(node);
    //3.原来首节点下个节点指向为null
    p.next = null; // help GC
    //4.非异常状态,防止指向finally逻辑
    failed = false;
    //5.返回线程中断状态
    return interrupted;
}

private void setHead(Node node) {
    //节点设置为头部
    head = node;
    //清空线程
    node.thread = null;
    //清空前驱节点
    node.prev = null;
}

只需要关注1~3步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用CAS来保证,因为只有一个线程能够成功获取到资源。

Search

    欢迎关注我的微信公众号

    钢仔

    Table of Contents