AQS实现探究——Java 锁的基架 AQS(二)

AQS实现探究——Java 锁的基架 AQS(二)

键盘很好看

上篇介绍了AQS的概念,AQS 所支持的两种模式:独占模式和共享模式,如何去使用AQS 实现独占锁。这篇就介绍下AQS 是怎么是将线程锁住的?以及多个线程争夺时AQS 做了哪些事情?

知识点简介

volatile

volatile 是Java 中的关键字,主要有两个特性:可见性和防止重排序。在AQS主要用到第一个特性就是可见性;什么是可见性呢?学过操作系统的都知道,主内存是存放程序运行时数据的地方,但是为了速度每个CPU 都有自己的高速缓存,缓存自己从主内存加载后的数据,一个线程是运行在CPU上的,当这个线程想加载某个数据时优先从自己的高速缓存里找,如果没有找到再从主内存里加载,操作完就放到高速缓存中,别的线程工作时也是优先从自己的高速缓存里读,这样就会读到旧的数据,最后导致计算出错,为了避免这种问题,保证多个CPU 的高速缓存是一致的,OS 内部有个缓存一致性协议(eg:MESI), 而volatile 就是使用这种协议来使修饰的变量在多个线程间可见。

CAS

CAS 全称compareAndSwap,在Java中就是一个函数, 它有三个参数:修改值的内存地址,旧値,期望修改后的値;该函数大致流程如下:

  • 按照内存地址取出旧的値,
  • 与传进来的旧値比较是否相同:不相同则失败,否则执行步骤3
  • 将内存地址的値设置为期望值

上述三个动作是具有原子性的,即不可拆分的,对应着处理器的一个原子指令(CMPXCHG),处理器实现原子操作有两种第一种就是通过对内存总线加锁,第二种就缓存锁定只对某个缓存行进行锁定,第二种对资源的消耗低。

CLH 介绍

官方解释

CLH锁是Craig, Landin和Hagersten (CLH)锁,CLH锁是旋转锁,可以确保没有饥饿,提供公平先到先得的服务。

CLH锁是一种可伸缩的、高性能的、公平的自旋锁,基于列表,应用程序线程仅在本地变量上自旋,它不断轮询前驱状态,如果发现前驱状态释放锁则结束自旋。

简述某个线程获取锁的过程

先将自己加入到队列中,并将自己的状态(true/false)设置为true,然后循环判断队列中前一个节点(线程)中的状态是否为false ,这里的循环即模拟阻塞,直到队列的前一个节点释放锁,将它的状态设置为false ,则当前节点才获取锁。

实操(实现CLH)

简介:和AQS相同只有一个状态表示锁是否空闲,而不是队列中的每个节点都有一个状态,此状态有两个值1或者0: 1表示锁已经被抢占,0表示锁空闲。

原生的CLH 锁 存在一个弊端:当前节点的线程会不断轮询前一个节点的状态,它会造成CPU 使用率100%。对该弊端AQS采用阻塞和通知的手段,如果发现线程获取锁失败则将当前线程阻塞住,等前一个线程释放锁时,将它的后继线程阻塞状态解除即可。思路:

AQS实现探究——Java 锁的基架 AQS(二)

代码如下:

**
* CLH锁:一个种自旋锁,通过先进先出确保无饥饿的和公平的锁
*/
public class ClhLock {
// java 的一个不安全的帮助类,支持CAS 操作
private static Unsafe unsafe = UnsafeUtil.getUnsafe().orElse(null);
/**
* 0表示未锁住
* 1表示锁住
*/
private volatile int state = 0;
/**
* 独占锁拥有者线程
*/
private Thread exclusiveOwnerThread;
/**
* 获得锁的节点
* 开始为空
*/
private transient volatile Node head;
/**
* 尾部节点
* 开始为空
*/
private transient volatile Node tail;

/**
* 为了CAS 操作而设置的变量.
* 用下面这几个参数获取对应实体上对应字段的地址,充当CAS 第一个参数
*/
private final static long headOffset;
private final static long tailOffset;
private final static long stateOffset;

static {
try {
if (Objects.isNull(unsafe)) {
throw new IllegalStateException("Unsafe instance has not initialized");
}
headOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("tail"));
stateOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("state"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public ClhLock() {
}
/**
* 尝试获取锁,如果没有获取则加入等待队列
*/
public void lock() {
acquire(1);
}
private void acquire(int arg) {
//加入等待队列并返回当前节点
Node node = addWaiter(Thread.currentThread());
//轮询
for (; ; ) {
Node h = head;
//判断自己的前驱是否是占有锁的节点,若是则尝试获取锁
if (node.prev == h && tryAcquire(arg)) {
System.out.println("acquire lock thread:" + Thread.currentThread().getName());
// 获取锁成功将自己的节点设置为head
setHead(node);
return;
}
//若获取锁失败则阻塞当前线程
LockSupport.park(node.thread);
}
}
private void setHead(Node node) {
Node h = head;
// 通过cas 函数将 node 设置为head
if (compareAndSetHeadOrTail(headOffset, h, node)) {

//为了gc
node.prev = null;
node.thread = null;
}
}
/**
* 将线程入队
* @param currentThread 将要入队的线程
* @return
*/
private Node addWaiter(Thread currentThread) {
//新建一个节点代表当前线程
Node node = new Node(currentThread);
Node t = tail;
//判断尾部节点是否为空,开始时尾部节点为空
if (t != null) {
//尾部节点不为空则将尾部节点赋给当前节点的前驱
node.prev = t;
//将自己设置为尾部节点,可能不成功,会被其它线程先一步设置,若设置不了则会进入下面的enq
if (compareAndSetHeadOrTail(tailOffset, t, node)) {
t.next = node;
return node;
}
}
//若尾部节点为空(第一个线程进来),或者将当前节点设置为尾部节点失败
return enq(node);
}
protected boolean tryAcquire(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 0 && compareAndSetState(tempState, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean compareAndSetState(int oldValue, int expectValue) {
return unsafe.compareAndSwapInt(this, stateOffset, oldValue, expectValue);
}
private boolean compareAndSetHeadOrTail(long offset, Node oldValue, Node expectValue) {
return unsafe.compareAndSwapObject(this, offset, oldValue, expectValue);
}
/**
* 释放锁
*/
public void unLock() {
release();
}
protected void release() {
//尝试释放锁
if (tryRelease(1)) {
Node h = head;
if (h != null) {
//唤醒后继线程
unParkSuccessor();
}
}
}
private void unParkSuccessor() {
//唤醒后继线程
Node n = head.next;
//下面逻辑暂时不管,不会出现这种情况
if (n == null) {
for (Node prev = tail.prev; prev != null; prev = prev.prev) {
if (prev != head) {
n = prev;
}
}
}

if (Objects.nonNull(n)) {
LockSupport.unpark(n.thread);
}
}
private boolean tryRelease(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 1 && compareAndSetState(tempState, 0)) {
setExclusiveOwnerThread(null);
return true;
}
return false;
}

/**
* 队列的一个元素
*/
class Node {
/**
* 包含哪个线程,创建时实例化
*/
private Thread thread;
/**
* 前驱
*/
private Node prev;
/**
* 后继
*/
private Node next;
public Node(Thread thread) {
this.thread = thread;
}
public Thread getThread() {
return thread;
}
public Node getPrev() {
return prev;
}
public void setPrev(Node prev) {
this.prev = prev;
}
public Node() {
}
}
public int getState() {
return state;
}
public Node getHead() {
return head;
}
public Node getTail() {
return tail;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}

个人建议:大家在看的时候,在草稿纸上比划下。因为链表比数组要抽象的。

public class ClhLockTest {
private static int i = 1;
public static void incr() {
i++;
}
@Test
public void testLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ClhLock clhLock = new ClhLock();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
clhLock.lock();
incr();
clhLock.unLock();
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
Assertions.assertEquals(101,i);
}
}

AQS 中的CLH

简述

AQS使用了CLH 先入先出的思想,但是并不是公平的,而是抢占式的,但这取决于子类的实现方式。上文也说到AQS 只维护一个状态state,而不像原生CLH 那样每个节点都有自己的状态。

Node 类的 构造

Node 类表示队列中的某个元素

属性

AQS实现探究——Java 锁的基架 AQS(二)

Node 类waitStatus 的几种状态介绍

AQS实现探究——Java 锁的基架 AQS(二)

获取独占锁过程

源码分析 注释

  • 尝试获取锁,由子类实现
public final void acquire(int arg) {
//调用tryAcquire函数,尝试修改state,该函数有子类实现,返回true 或者false
if (!tryAcquire(arg) &&
//若获取失败 则开始入队等待获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued 会判断该线程在获取完锁的时候是否被中断,
//若中断则继续获取锁,获取完再中断自己
selfInterrupt();
}
  • 添加到队列尾部
/**
* 为当前线程和给定的模式(独占模式或者共享模式)创建节点并将节点入队
* @param mode Node.EXCLUSIVE 或者 Node.SHARED ——独占或者共享
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);

// 尝试最快的方式入队,失败就就进入enq,不断地轮询直到成功入队
//入队就是设置tail 节点,因为会存在竞争的情况所以会设置失败
Node pred = tail;
if (pred != null) {
//将当前节点的前驱设置为尾部节点,然后尝试将当前节点设置为尾部节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//入队成功,将其前驱的后继设置为当前节点
pred.next = node;
return node;
}
}
//以轮询的方式入队
enq(node);
// 入队成功返回当前节点
return node;
}
/**
* 将节点插入到队列中,如果需要的话初始化tail节点和head 节点
* @param node 要插入到队列中的节点
* @return 节点的后继
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//如果尾部节点为空,则需要将head 和tail 初始化,延迟加载有不有
if (compareAndSetHead(new Node()))
tail = head;
} else {
//若尾部节点不为空则

// 将当前节点的前驱设置为尾部节点,
// 然后尝试将当前节点设置为尾部节点
node.prev = t;
if (compareAndSetTail(t, node)) {
//入队成功,将其前驱的后继设置为当前节点
t.next = node;
return t;
}
}
}
}
  • 已在队列中的线程以独占模式获取锁
 /**
*
*已在队列中的线程以独占模式获取锁并且不可中断(不是重点)
* @param node 当前线程对应的节点
* @param arg 获取参数 与子类定义有关不用管
* @return 如果在等待的时候被中断则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
//表示获取锁是否失败,若最后还是未成功则取消该节点
boolean failed = true;
try {
//标识当前线程(节点)是否中断
boolean interrupted = false;
//轮训
for (;;) {
//获取前驱

final Node p = node.predecessor();
//判断前驱是否是head 节点,若是则尝试获取锁
if (p == head && tryAcquire(arg)) {
//若获取锁成功,则将当前节点设置为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果获取锁失败,则设置前驱的waitStatus为SIGNAL :下一个循环若还是失败则阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
//若前驱的waitStatus 为SIGNAL 则将当前线程阻塞,被唤醒的时候返回该线程是否被中断
parkAndCheckInterrupt())
// 若被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

流程

  1. 调用tryAcquire方法尝试获取锁,成功就代表已获取锁
  2. 第一步失败则入队
  3. 入队后,然后开始轮询尝试获取锁
  4. 第三步轮询两次失败就将自己阻塞住,等待前驱释放时解除自己的阻塞状态

释放独占锁过程

源码分析 注释

 public final boolean release(int arg) {
//尝试释放锁由子类实现,正常返回true
if (tryRelease(arg)) {
Node h = head;
//如果head 不为空,且head的waitStatus 不为0(其后继等待时将其设置为SIGNAL)
if (h != null && h.waitStatus != 0)
//唤醒后继
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 唤醒给定节点的后继
* @param node 给定的节点
*/
private void unparkSuccessor(Node node) {
/**
* 若果waitStatus 小于零则需要被唤醒
* 这里只考虑为SIGNAL
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 唤醒后继
* 通常其后继就是下一个,但是因为可能后继取消或者为空
* 需要从尾部向前遍历寻找一个距离head最近的不为空且没有被取消的节点

*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒后继线程
LockSupport.unpark(s.thread);
}

流程

  1. 调用tryRelease 释放锁
  2. 若head 节点不为空且waitStatus 不等于0,则唤醒后继线程

获取共享锁过程

源码分析 注释

 public final void acquireShared(int arg) {
/**
* 尝试获取共享锁
* tryAcquired 返回 一个整数,为负数则获取失败,不同的子类实现方式不一样
* Semaphore 类返回的是剩余多少许可证
*/
if (tryAcquireShared(arg) < 0)
//获取失败 则入队等待获取

doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 以共享模式入队,如何入队上文已介绍
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断当前节点的前驱是否是头节点
if (p == head) {
//若是头节点,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功
/**
* 将当前节点设置为头
* 判断其后继节点是否为shared
* 若是则释放自己,因为共享模式表示多个线程都可以同时占有锁
* 注意第二个参数 是tryAcquireShared 的返回值,方法里会用到
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//和独占锁相同 具体可以上面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);

}
}
/**
* 设置队列的头,
* 并检查如果后继以共享模式等待且propagate(tryAcquireShared 返回值) >0
* 或者waiStatus 被设置为PROPAGATE
* @param node 当前节点
* @param propagate 一个tryAcquireShared 的返回值
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 当propagate 大于 零
* 或者h.waitStatus 小于0
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//释放共享锁,下面详解
doReleaseShared();
}
}

流程

  1. 尝试获取共享锁成功即返回
  2. 第一步尝试获取锁失败,添加到队列尾部,若需要初始化则先初始化head和tail 节点
  3. 然后再次尝试获取锁,若失败则将waitStatus 设置为SIGNAL,下一个循环获取锁失败,则挂起当前线程。

释放共享锁过程

源码分析 注释

 /**
* 释放共享锁,唤醒它的后继并保证传播
*/
private void doReleaseShared() {
/*
* 保证一个释放会传播下去,即使有正在获取或释放。
* 如果需要,以一个通常的方式去唤醒head的后继,
* 如果不需要,就把head 的status 设置为PROPAGATE,
* 保证后面的释放传播可以继续
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果h 的waitStatus 是SIGNAL,则先尝试将其设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继
unparkSuccessor(h);
}
//若waitStatus 为0,则将其设置为PROPAGATE,为后续的释放保证传播这个行为
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed

break;
}
}

流程

  1. 解除对共享锁的占用
  2. 唤醒head的后继并保证传播

后记

AQS 作为Java 锁的基架,其重要性不言而喻。本文只是从获取锁释放锁的角度去分析,没有涉及到waitStatus 为条件的情况以及取消状态进行分析,下一节将从这个角度去分析下AQS,大家有啥疑问以及以及指正咱们评论区见。


分享到:


相關文章: