SynchronousQueue 源码解析

图片标题


从不浪费时间的人,没有工夫抱怨时间不够。 ——杰弗逊

0 前言

SynchronousQueue 一个阻塞队列,其中每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。 同步队列没有任何内部容量,甚至没有一个容量。 你无法窥视SynchronousQueue,因为仅当你尝试删除它时,该元素才存在。 你不能插入元素(使用任何方法),除非另一个线程试图将其删除; 你无法进行迭代,因为没有要迭代的内容。 队列的头部是第一个排队的插入线程试图添加到队列中的元素; 如果没有这样的排队线程,则没有元素可用于删除,并且poll()将返回null。 为了其他Collection方法(例如,contains)的目的,SynchronousQueue充当空集合。 此队列不允许空元素.

同步队列类似于CSP和Ada中使用的集合通道。 它们非常适合切换设计,在该设计中,在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务。

此类支持可选的公平性策略,用于订购正在等待的生产者和使用者线程。 默认情况下,不保证此排序。 但是,将公平性设置为true构造的队列将按FIFO顺序授予线程访问权限。

此类及其迭代器实现Collection和Iterator接口的所有可选方法。

此类是Java Collections Framework的成员。

1 继承体系


图片标题



继承 AbstractQueue 抽象类,定义了对队列的基本操作实现 BlockingQueue 阻塞队列接口,其对队列的操作可能会抛出异常实现 Searializable接口,可以被序列化

2 数据结构

由于SynchronousQueue的支持公平策略和非公平策略,所以底层有两种数据结构

队列(实现公平策略),有一个头结点和尾结点栈(实现非公平策略),有一个头结点

队列与栈都是通过链表来实现的。具体的数据结构如下

图片标题


内部类UML 图

Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现图片标题WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解图片标题

3 非公平的堆栈(默认策略)

3.1 栈元素

put 的时候,就往栈中放数据。take 的时候,就从栈中取数据,两者操作都是在栈顶上操作数据.



volatile SNode next 栈顶的下一个节点volatile SNode match匹配,用来判断阻塞栈元素能被唤醒的时机 比如我们先执行 take,此时队列中没有数据,take 被阻塞了,栈元素为 SNode1 当 put 时,会把当前 put 的栈元素赋值给 SNode1 的 match 属性,并唤醒 take 当 take 被唤醒,发现 SNode1 的 match 属性有值时,就能拿到 put 的数据volatile Thread waiter 阻塞的线程Object item 未投递/未消费的消息

3.2 入栈和出栈

入栈 使用 put 等方法,将数据放到栈中图片标题出栈 使用 take 等方法,把数据从栈中拿出来图片标题

操作的对象都是栈顶,底层实现的方法也是同一个:

<code>@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // e 为空: take 方法,非空: put 方法 int mode = (e == null) ? REQUEST : DATA; // 自旋 for (;;) { // 头节点情况分类 // 1:为空,说明队列中还没有数据 // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据 // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据 SNode h = head; // 栈头为空,说明队列中还没有数据。 // 栈头非空且栈头的类型和本次操作一致 // 比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行 if (h == null || h.mode == mode) { // empty or same-mode // 设置了超时时间,并且 e 进栈或者出栈要超时了, // 就会丢弃本次操作,返回 null 值。 // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费 if (timed && nanos <= 0) { // 无法等待 // 栈头操作被取消 if (h != null && h.isCancelled()) // 丢弃栈头,把栈头的后一个元素作为栈头 casHead(h, h.next); // 将取消的节点弹栈 // 栈头为空,直接返回 null else return null; // 没有超时,直接把 e 作为新的栈头 } else if (casHead(h, s = snode(s, e, h, mode))) { // e 等待出栈,一种是空队列 take,一种是 put SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } // 栈头正在等待其他线程 put 或 take // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处 } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈头已经被取消,把下一个元素作为栈头 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // m 就是栈头,通过上面 snode 方法刚刚赋值 SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // tryMatch 非常重要的方法,两个作用: // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性 // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } /<code>

执行流程:

判断是 put 方法还是 take 方法判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

awaitFulfill

节点阻塞的方法

<code>/** * 旋转/阻止,直到节点s通过执行操作匹配。 * @param s 等待的节点 * @param timed true if timed wait * @param nanos 超时时间 * @return 匹配的节点, 或者是 s 如果被取消 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。 // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据 // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断 if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); // 超时了,取消当前线程的等待操作 if (nanos

<= 0L) { s.tryCancel(); continue; } } // 自选次数减1 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) // park 阻塞 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } /<code>

当一个 节点/线程 将要阻塞时,它会设置其 waiter 字段,然后在真正 park 之前至少再检查一次状态,从而涵盖了竞争与实现者的关系,并注意到 waiter 非空,因此应将其唤醒。

当由出现在调用点位于堆栈顶部的节点调用时,对停放的调用之前会进行旋转,以避免在生产者和消费者及时到达时阻塞。 这可能只足以在多处理器上发生。

从主循环返回的检查顺序反映了这样一个事实,即优先级: 中断 > 正常的返回 > 超时。 (因此,在超时时,在放弃之前要进行最后一次匹配检查。)除了来自非定时SynchronousQueue的调用。{poll / offer}不会检查中断,根本不等待,因此陷入了转移方法中 而不是调用awaitFulfill。

而且可以发现其阻塞策略,并不是一上来就阻塞住,而是在自旋一定次数后,仍然没有其它线程来满足自己的要求时,才会真正的阻塞。

3.3 图解非公平模型

线程put1执行 put(1)操作,由于当前无配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待接着,线程put2再次执行了put(2)操作,put2线程入栈,自旋一小会后睡眠等待这时候,来了一个线程take1,执行 take 操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程最后,再来一个线程take2,执行take操作,这跟上一步的逻辑基本一致,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈空

从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平策略.

4 公平队列

4.1 队列元素



volatile QNode next 当前元素的下一个元素volatile Object item // CAS'ed to or from null 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面volatile Thread waiter // to control park/unpark 阻塞线程final boolean isData true 是 put,false 是 take

4.2 transfer

TransferQueue 内部类的 transfer 方法

<code>E transfer(E e, boolean timed, long nanos) { /** * * 这个基本方法, 主要分为两种情况 * * 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node * 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配 * timeout/interrupt awaitFulfill方法返回的是 node 本身 * 匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的) * * 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点, * 进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue */ QNode s = null; // 根据需要构造/重用 // true:put false:get boolean isData = (e != null); for (;;) { // 队列首尾的临时变量,队列空时,t=h QNode t = tail; QNode h = head; if (t == null || h == null) // 看到未初始化的值 continue; // 自旋 // 首尾节点相同,队列空 // 或队尾节点的操作和当前节点操作相同 if (h == t || t.isData == isData) { QNode tn = t.next; // tail 被修改,重试 if (t != tail) continue; // 队尾后面的值还不为空,说明其他线程添加了 tail.next,t 还不是队尾,直接把 tn 赋值给 t if (tn != null) { advanceTail(t, tn); // 自旋 continue; } // 超时直接返回 null if (timed && nanos

<= 0) // 等不及了 return null; // 创建节点 if (s == null) s = new QNode(e, isData); // 如果把 s 放到队尾失败,继续递归放进去 if (!t.casNext(null, s)) // 链接失败 continue; advanceTail(t, s); // 推进 tail 节点并等待 // 阻塞住自己,直到有其他线程与之匹配, 或它自己进行线程的中断 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); // 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe return null; } if (!s.isOffList()) { // 尚未取消链接 advanceHead(t, s); // unlink if head 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了) if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 队列不为空,并且当前操作和队尾不一致 // 也就是说当前操作是队尾是对应的操作 // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put } el***plementary-mode // 如果是第一次执行,此处的 m 代表就是 tail // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled // m 代表栈头 // 这里把当前的操作值赋值给阻塞住的 m 的 item 属性 // 这样 m 被释放时,就可得到此次操作的值 !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 当前操作放到队头 advanceHead(h, m); // successfully fulfilled // 释放队头阻塞节点 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } } /<code>

线程被阻塞住后,当前线程是如何把自己的数据传给阻塞线程的。 假设线程 1 从队列中 take 数据 ,被阻塞,变成阻塞线程 A 然后线程 2 开始往队列中 put 数据 B,大致的流程如下:

线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1线程 1 被唤醒后,就能从 A.item 里面拿到线程 2 put 的数据了,线程 1 成功返回。

在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从堆头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。

avanceTail

尝试 cas 将 nt 作为新的tail图片标题

4.3 图解公平队列模型

公平模式下,底层实现使用的是 TransferQueue 队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。

初始化时的 TransferQueue线程 put1 执行 put(1) ,由于当前没有配对的消费线程,所以 put1 线程入队,自旋一小会后睡眠等待接着,线程 put2 执行 put(2),put2线程入队,自旋一小会后睡眠等待这时来了一个线程 take1,执行了 take,由于 tail 指向 put2 线程,put2 线程跟 take1 线程匹配,这时take1 线程不需要入队 注意了!这时要唤醒的线程并不是 put2,而是put1. 因为现在是公平策略,谁先入队,谁优先被唤醒,这里显然 put1 应优先被唤醒. 公平策略总结一句话就是:队尾匹配队头出队执行后 put1 线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点

5 总结

SynchronousQueue 内没有容器为什么还能够存储一个元素呢?因为内部没有容器指的是没有像数组那样的内存空间存多个元素,但是是有单地址内存空间,用于交换数据. SynchronousQueue 凭借其独有的线程配对通信机制,在大部分平常开发中,可能都不太会用到,但线程池技术中会有所使用,由于内部没有使用AQS,而是直接使用CAS,所以代码理解起来会比较困难,但这并不妨碍我们理解底层的实现模型,在理解了模型的基础上,再翻阅源码,就会有方向感,看起来也会比较容易!