用Atomic实现可以等待的锁

一直想写ReentrantLock,就得先介绍AbstractQueueSynchronizer,可是我觉得这样写,不过瘾,我把代码贴一遍,懂的人自己就能找到这些代码看,不懂的人还是不懂。直到昨天灵机一动,不如自己从简到难重新写一遍,带着读者跳几个坑,可能就好了。java.util.concurrent.lock下的几个锁以及synchronized锁其实背后都要使用atomic操作,那我们不妨就使用atomic操作把锁实现一遍。

咱们先从最简单的开始。

互斥

并发模型中,最简单的问题,就是互斥。一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源,这个公共资源,我们通常会称之为关键区。如何保护这个关键区就是互斥的问题。

这个其实比较简单,我只需要用一个atomic变量,让它为 0,不管有多少线程过来,谁先抢到这个变量把它置为1,谁就相当于拿到了关键区的使用权,而其他没抢到的就不能进入关键区。来看这样一个例子:

public class Mutex { public static void main(String[] args) { TestMutex test = new TestMutex(); int THREAD_NUM = 10; Thread[] threads = new Thread[THREAD_NUM]; // 创建10个线程,让它们不断地去增加test.sum的值 for (int i = 0; i < THREAD_NUM; i++) { Thread t= new Thread() { public void run() { for (int j = 0; j < 10000; j++) { test.add(); } } }; t.start(); threads[i] = t; } for (Thread t : threads) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(test.sum); }}class TestMutex { public int sum = 0; public void add() { if (sum < 30_000) { try { // 这里sleep一下,增加线程切换的概率 Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } sum += 50; } }}

运行这个程序,可以看到结果是随机的,大概率会是落在30300到30450这个区间。这个是典型的因为并发引起的。那么想改正它,我们就可以把add用一个atomic变量保护起来。一个线程只有获得了这个许可,才能继续执行 add 操作。如果没有许可,就直接放弃,修改过后的Mutex变成这样:

class TestMutex { public int sum = 0; AtomicInteger mutex = new AtomicInteger(0); public void add() { if (!mutex.compareAndSet(0, 1)) return; if (sum < 30_000) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } sum += 50; } mutex.set(0); }}

经过这样的修改,运行的结果就一定会是30000了。

同步

在互斥的例子里,如果一个线程拿不到许可(即mutex变量),那就直接放弃什么都不做了。但是如果我们希望它继续完成加法操作呢?那么线程之间就必须有一定的手段相互可以知道有没有线程在关键区里,如果有,那么我就不再进入关键区了,如果没有,我就尝试进去。

举个例子,有一条仅容一人通过的小巷子,有两个人相对而行,那么当左端的人进入了这个巷子以后,右端的人就不能再进去了,他得先等待。必须等到左端的人到达右端,然后告诉右端等待的人可以进了,右端的人才可以进去。这个小巷子就不光是互斥的关键区了,还得有两个人相互通知的机制。

还是拿之前的课程里的例子来说吧:

class LockTest { public int total = 0; public void testTwoThreads() throws InterruptedException { Thread t1 = new Thread() { public void run() { for (int i = 0; i < 5_000; i++) { incTotal(); } } }; Thread t2 = new Thread() { public void run() { for (int i = 0; i < 5_000; i++) { incTotal(); } } }; t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(total); } public void incTotal() { total += 1; }}

这个例子,启动了两个线程,每个线程都执行5000次加法,但如果真正运行的话,大概率不会是10000。这就又是并发的问题了,具体的原因,咱们之前的课程里分析过,这里就不再重复了。如果我们使用Atomic保护关键区的思路来改写,应该怎么做呢?

自旋锁

今天介绍一种自旋锁的思想。举个实际的例子,去公司的卫生间蹲坑,一直没位置,这时候由于没有任何的通知机制,所以我只能每隔一会去看看有没有空位,有空位就赶紧抢,然后把门锁上,如果没有,就只能一直在门口等。

这就是最典型的自旋锁。它不需要任何的通知机制,一个线程去抢许可变量,抢到了就进关键区,抢不到就死循环一直抢。好,我们来实现一个自旋锁:

public class SpinLock implements Lock{ AtomicInteger state = new AtomicInteger(0); public void lock() { for(;;) { if (state.get() == 1) continue; else if (state.compareAndSet(0, 1)) { break; } } } public void unlock() { state.set(0); }}

原理很简单,就是一直CAS抢锁,如果抢不到,就一直死循环,直到抢到了才退出这个循环。然后,我们使用这个工具改写一下incTotal:

 private Lock lock = new SpinLock(); //..... public void incTotal() { lock.lock(); total += 1; lock.unlock(); }

这次再运行,结果就是10_000了。我们使用一个Atomic变量把整个关键区保护起来了。

自旋锁实现起来非常简单,如果关键区的执行时间很短,往往自旋等待会是一种比较高效的做法,它可以避免线程的频繁切换和调度。但如果关键区的执行时间很长,那这种做法就会大量地浪费CPU资源。

针对关键区执行时间长的情况,该怎么办呢?下篇文章再说吧。


分享到:


相關文章: