什么是AQS?AQS有什么作用?常见的基于AQS的组件有哪些?

时间: 2023-07-09 admin 互联网

什么是AQS?AQS有什么作用?常见的基于AQS的组件有哪些?

什么是AQS?AQS有什么作用?常见的基于AQS的组件有哪些?

1.什么是AQS

AQS队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架。
它使用了一个int的成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过基础同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了对同步状态进行更改,这时需要使用同步器提供的三个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update)来操作,因为他们能够保证状态的改变是安全的。

2.AQS的原理

AQS队列同步器是如何完成线程同步的?
主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构与模板方法。

2.1同步队列

同步器依赖内部的同步队列(一个FIFO)双向队列完成状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次获取同步状态。

同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。
如果一个线程成功获取了同步状态,其他线程无法获取到同步状态,转而被构造成节点并加入同步队列中,而加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态将会在获取同步状态成功时将自己设置为首节点。该过程见5-3。
因为设首节点是通过获取同步状态完成的线程来完成的,由于只有一个线程能够获取到同步状态,因此设置头节点的方法并不需要CAS来保证。只需要将首节点设置为原首节点的后继节点并断开原首节点的next引用即可。

2.2独占式同步状态的获取与释放

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。
public final void acquire(int arg){if(!tryAcquire(arg)&&acquireQueued(addWaited(Node.EXCLUSIVE),arg){selfInterrupt();}
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以死循环的方式获取同步状态,如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

//同步器的addWaiter和enq方法
private Node addWaiter(Node node){Node node = new Node(Thread.currentThread(),node);//快速尝试在尾部添加Node pred = tail;if(pred!=null){node.prev = pred;if(compareAndSetTail(pred,node){pred.next = node;return node;}}enq(node);return node;
}
private Node enq(final Node node){for(;;){Node t = tail;if(t==null){if(compareAndSetHead(new Node()){tail = head;}else{node.prev = t;if(compareAndSetTail(t,node)){t.next = node;return t;}}}}
}

上述代码通过compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
在enq(final Node node)方法中,同步器通过死循环来保证节点的正确添加,在死循环中只有通过CAS将节点设置为尾节点之后,当前先才能从该方法中返回。否则,当前线程不断尝试设置,可以看出enq(final Node node)方法将并发添加节点的请求通过CAS变得串行化了。
节点进入同步队列之后,就进入了一个自旋的过程。每个节点都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程(并会阻塞节点的线程)

//同步器的acquireQueued方法
final boolean acquireQueued(final Node node,int arg){boolean failed = true;try{boolean interrupted = false;for(;;){final Node p = node.predecessor();if(p==head&&tryAcquire(arg){setHead(node);p.next = null;failde = false;return interuupted;}if(shouldParkAfterFailedAcquire(p,node)&&pardAndCheckInterrpt()){interrupted = true;}}}finally{if(falied){cannelAcquire(node);}}}

在acquireQueued(final Node node,int arg)方法中,当前线程在死循环总获取同步状态,而只有前驱的是头节点才能尝试获取同步状态。
第一:头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒后继节点,后继节点的线程被唤醒需要检查自己的前驱节点是否头节点。
第二:维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下。

在图5.4中,由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否头节点,如果是则尝试获取同步状态。

独占式同步状态获取流程。
前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。

public final boolean release(int arg){if(tryRelease(arg){Node h = head;if(h!=null&&h.waitStatus !=0){unparkSuccessor(h);}return true;}return false;
}

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSuppor来唤醒处于等待状态的线程。
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋‘;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

2.3共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取同步状态。

当共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。

public final void acquireShared(int arg){if(tryAcquireShared(arg)<0){doAcquireShared(arg);}
}
private void doAcquireShared(int arg){final Node node = addWaited(Node.SHARED);boolean failed = true;try{boolean interrupted = false;for(;;){final Node p = node.predecessor();if(p==head){int r = tryAcquireShared(arg);if(r>=0){setHeadAndPropagate(node,r);p.next = null;if(interrupted){selfInterrupt();}failed = false;return ;}}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt()){interrupted = true;}}finally{if(fail){cancelAcquire(node);}}}
}

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquiredShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShard(int arg)方法返回值大于等于0。

public final boolean releaseShared(int arg){if(tryReleaseShared(arg)){doReleaseShared();return true;}return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShard(int arg)方法必须确保同步状态(资源)线程安全释放,一般通过循环和CAS来保证,因为释放同步状态的操作会同时来自多个线程。

2.4独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg,long nanos Timeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则返回false。

private boolean doAcquireNanos(int arg,long nanoTimeout)throws InterruptedException{long lastTime = System.nanoTime();final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try{for(;;){final Node p = node.predecessor();if(p==head&&tryAcquire(arg)){setHead(node);p.next = null;failed = false;return true;}if(nanoTimeout<=0){return false;}if(sholdParkAfterFailedAcquire(p,node)&&nanosTimeout>spinForTimeoutThreshold){LockSupport.parkNanos(this,nanosTimeout);}long now = System.nanoTime();//计算时间,当前时间now减去之前的时间Lasttime得到已经睡眠的时间,然后减去nanoTimeout,得到还应该睡眠的时间nanoTimeout -=now - lastTime;lastTime = now;if(Thread.interrupted()){throw new InterruptedException();}}}finally{if(falied){cancelAcquire(node);}}
}

该方法在自旋过程只能,当前的前驱点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似。不同之处在于获取失败的处理,如果当前线程获取同步状态失败,则判断是否超时,如果没有超时,重新计算超时间隔nanosTimeou,然后使当前线程等待nanosTimeout。
如果nanosTimeout小于等于自旋时间,将不会使该线程进行超时等待,而是进入快速的自旋过程。

3.AQS的组件有哪些

1.Exclusive(独占式):只有一个线程能够获取该资源
如ReentrantLock。
又可分为公平锁和非公平锁。
公平锁:以排队队列中的线程先后顺序为准,先到先得资源的锁。
非公平锁:无视排队顺序,谁抢占了资源就是谁的。
2.Share(共享式):可以有多个线程获取到该资源,如Semaphore、CountDownLatch、CyclicBarrier和ReadWriteLock。

  • Semaphore(信号量)允许多个线程同时访问。
    synchronized和ReentrantLock都是一次只允许一个线程访问某个资源,Semaphore可以指定多个线程同时访问某个资源。
  • CountDownLatch(倒计时器)。
    CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
  • CyclicBarrier(循环栅栏)
    CyclicBarrier和CountLatch非常相似,他可以实现线程间的技术等待,但是它的功能比CountDownLatch更加复杂和强大。主要应用场景和CountDownLatch类似。CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才会继续干活。CycliBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

6.用过CountDownLatch吗

CountLatch的作用就是运行count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。