图片标题
从不浪费时间的人,没有工夫抱怨时间不够。 ——杰弗逊
0 前言
SynchronousQueue 一个阻塞队列,其中每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。 同步队列没有任何内部容量,甚至没有一个容量。 你无法窥视SynchronousQueue,因为仅当你尝试删除它时,该元素才存在。 你不能插入元素(使用任何方法),除非另一个线程试图将其删除; 你无法进行迭代,因为没有要迭代的内容。 队列的头部是第一个排队的插入线程试图添加到队列中的元素; 如果没有这样的排队线程,则没有元素可用于删除,并且poll()将返回null。 为了其他Collection方法(例如,contains)的目的,SynchronousQueue充当空集合。 此队列不允许空元素.
同步队列类似于CSP和Ada中使用的集合通道。 它们非常适合切换设计,在该设计中,在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便向其传递一些信息,事件或任务。
此类支持可选的公平性策略,用于订购正在等待的生产者和使用者线程。 默认情况下,不保证此排序。 但是,将公平性设置为true构造的队列将按FIFO顺序授予线程访问权限。
此类及其迭代器实现Collection和Iterator接口的所有可选方法。
此类是Java Collections Framework的成员。
1 继承体系
图片标题
- 继承 AbstractQueue 抽象类,定义了对队列的基本操作
- 实现 BlockingQueue 阻塞队列接口,其对队列的操作可能会抛出异常
- 实现 Searializable接口,可以被序列化
2 数据结构
由于SynchronousQueue的支持公平策略和非公平策略,所以底层有两种数据结构
- 队列(实现公平策略),有一个头结点和尾结点
- 栈(实现非公平策略),有一个头结点
队列与栈都是通过链表来实现的。具体的数据结构如下
图片标题
内部类UML 图
- Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现图片标题
- WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解图片标题
3 非公平的堆栈(默认策略)
3.1 栈元素
put 的时候,就往栈中放数据。take 的时候,就从栈中取数据,两者操作都是在栈顶上操作数据.
- volatile SNode next 栈顶的下一个节点
- volatile SNode match匹配,用来判断阻塞栈元素能被唤醒的时机 比如我们先执行 take,此时队列中没有数据,take 被阻塞了,栈元素为 SNode1 当 put 时,会把当前 put 的栈元素赋值给 SNode1 的 match 属性,并唤醒 take 当 take 被唤醒,发现 SNode1 的 match 属性有值时,就能拿到 put 的数据
- volatile Thread waiter 阻塞的线程
- Object item 未投递/未消费的消息
3.2 入栈和出栈
- 入栈 使用 put 等方法,将数据放到栈中图片标题
- 出栈 使用 take 等方法,把数据从栈中拿出来图片标题
操作的对象都是栈顶,底层实现的方法也是同一个:
<code>@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // e 为空: take 方法,非空: put 方法 int mode = (e == null) ? REQUEST : DATA; // 自旋 for (;;) { // 头节点情况分类 // 1:为空,说明队列中还没有数据 // 2:非空,并且是 take 类型的,说明头节点线程正等着拿数据 // 3:非空,并且是 put 类型的,说明头节点线程正等着放数据 SNode h = head; // 栈头为空,说明队列中还没有数据。 // 栈头非空且栈头的类型和本次操作一致 // 比如都是 put,那么就把本次 put 操作放到该栈头的前面即可,让本次 put 能够先执行 if (h == null || h.mode == mode) { // empty or same-mode // 设置了超时时间,并且 e 进栈或者出栈要超时了, // 就会丢弃本次操作,返回 null 值。 // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费 if (timed && nanos <= 0) { // 无法等待 // 栈头操作被取消 if (h != null && h.isCancelled()) // 丢弃栈头,把栈头的后一个元素作为栈头 casHead(h, h.next); // 将取消的节点弹栈 // 栈头为空,直接返回 null else return null; // 没有超时,直接把 e 作为新的栈头 } else if (casHead(h, s = snode(s, e, h, mode))) { // e 等待出栈,一种是空队列 take,一种是 put SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } // 本来 s 是栈头的,现在 s 不是栈头了,s 后面又来了一个数,把新的数据作为栈头 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } // 栈头正在等待其他线程 put 或 take // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处 } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈头已经被取消,把下一个元素作为栈头 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // snode 方法第三个参数 h 代表栈头,赋值给 s 的 next 属性 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear // m 就是栈头,通过上面 snode 方法刚刚赋值 SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // tryMatch 非常重要的方法,两个作用: // 1 唤醒被阻塞的栈头 m,2 把当前节点 s 赋值给 m 的 match 属性 // 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s // 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } /<code>
执行流程:
- 判断是 put 方法还是 take 方法
- 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
- 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
- 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
- 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
- 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
- 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。
awaitFulfill
节点阻塞的方法
<code>/** * 旋转/阻止,直到节点s通过执行操作匹配。 * @param s 等待的节点 * @param timed true if timed wait * @param nanos 超时时间 * @return 匹配的节点, 或者是 s 如果被取消 */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // deadline 死亡时间,如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间,否则就是 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次。 // 比如本次操作是 take 操作,自旋次数后,仍无其他线程 put 数据 // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断 if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); // 超时了,取消当前线程的等待操作 if (nanos <= 0L) { s.tryCancel(); continue; } } // 自选次数减1 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 把当前线程设置成 waiter,主要是通过线程来完成阻塞和唤醒 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) // park 阻塞 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } /<code>
当一个 节点/线程 将要阻塞时,它会设置其 waiter 字段,然后在真正 park 之前至少再检查一次状态,从而涵盖了竞争与实现者的关系,并注意到 waiter 非空,因此应将其唤醒。
当由出现在调用点位于堆栈顶部的节点调用时,对停放的调用之前会进行旋转,以避免在生产者和消费者及时到达时阻塞。 这可能只足以在多处理器上发生。
从主循环返回的检查顺序反映了这样一个事实,即优先级: 中断 > 正常的返回 > 超时。 (因此,在超时时,在放弃之前要进行最后一次匹配检查。)除了来自非定时SynchronousQueue的调用。{poll / offer}不会检查中断,根本不等待,因此陷入了转移方法中 而不是调用awaitFulfill。
而且可以发现其阻塞策略,并不是一上来就阻塞住,而是在自旋一定次数后,仍然没有其它线程来满足自己的要求时,才会真正的阻塞。
3.3 图解非公平模型
- 线程put1执行 put(1)操作,由于当前无配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待
- 接着,线程put2再次执行了put(2)操作,put2线程入栈,自旋一小会后睡眠等待
- 这时候,来了一个线程take1,执行 take 操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程
- 最后,再来一个线程take2,执行take操作,这跟上一步的逻辑基本一致,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈空
从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平策略.
4 公平队列
4.1 队列元素
- volatile QNode next 当前元素的下一个元素
- volatile Object item // CAS'ed to or from null 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
- volatile Thread waiter // to control park/unpark 阻塞线程
- final boolean isData true 是 put,false 是 take
4.2 transfer
TransferQueue 内部类的 transfer 方法
<code>E transfer(E e, boolean timed, long nanos) { /** * * 这个基本方法, 主要分为两种情况 * * 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node * 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配 * timeout/interrupt awaitFulfill方法返回的是 node 本身 * 匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的) * * 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点, * 进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue */ QNode s = null; // 根据需要构造/重用 // true:put false:get boolean isData = (e != null); for (;;) { // 队列首尾的临时变量,队列空时,t=h QNode t = tail; QNode h = head; if (t == null || h == null) // 看到未初始化的值 continue; // 自旋 // 首尾节点相同,队列空 // 或队尾节点的操作和当前节点操作相同 if (h == t || t.isData == isData) { QNode tn = t.next; // tail 被修改,重试 if (t != tail) continue; // 队尾后面的值还不为空,说明其他线程添加了 tail.next,t 还不是队尾,直接把 tn 赋值给 t if (tn != null) { advanceTail(t, tn); // 自旋 continue; } // 超时直接返回 null if (timed && nanos <= 0) // 等不及了 return null; // 创建节点 if (s == null) s = new QNode(e, isData); // 如果把 s 放到队尾失败,继续递归放进去 if (!t.casNext(null, s)) // 链接失败 continue; advanceTail(t, s); // 推进 tail 节点并等待 // 阻塞住自己,直到有其他线程与之匹配, 或它自己进行线程的中断 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); // 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe return null; } if (!s.isOffList()) { // 尚未取消链接 advanceHead(t, s); // unlink if head 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了) if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 队列不为空,并且当前操作和队尾不一致 // 也就是说当前操作是队尾是对应的操作 // 比如说队尾是因为 take 被阻塞的,那么当前操作必然是 put } el***plementary-mode // 如果是第一次执行,此处的 m 代表就是 tail // 也就是这行代码体现出队列的公平,每次操作时,从头开始按照顺序进行操作 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled // m 代表栈头 // 这里把当前的操作值赋值给阻塞住的 m 的 item 属性 // 这样 m 被释放时,就可得到此次操作的值 !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 当前操作放到队头 advanceHead(h, m); // successfully fulfilled // 释放队头阻塞节点 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } } /<code>
线程被阻塞住后,当前线程是如何把自己的数据传给阻塞线程的。 假设线程 1 从队列中 take 数据 ,被阻塞,变成阻塞线程 A 然后线程 2 开始往队列中 put 数据 B,大致的流程如下:
- 线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A
- 线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
- 线程 1 被唤醒后,就能从 A.item 里面拿到线程 2 put 的数据了,线程 1 成功返回。
在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从堆头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。
avanceTail
- 尝试 cas 将 nt 作为新的tail图片标题
4.3 图解公平队列模型
公平模式下,底层实现使用的是 TransferQueue 队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
- 初始化时的 TransferQueue
- 线程 put1 执行 put(1) ,由于当前没有配对的消费线程,所以 put1 线程入队,自旋一小会后睡眠等待
- 接着,线程 put2 执行 put(2),put2线程入队,自旋一小会后睡眠等待
- 这时来了一个线程 take1,执行了 take,由于 tail 指向 put2 线程,put2 线程跟 take1 线程匹配,这时take1 线程不需要入队 注意了!这时要唤醒的线程并不是 put2,而是put1. 因为现在是公平策略,谁先入队,谁优先被唤醒,这里显然 put1 应优先被唤醒. 公平策略总结一句话就是:队尾匹配队头出队
- 执行后 put1 线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信
- 最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点
5 总结
SynchronousQueue 内没有容器为什么还能够存储一个元素呢?因为内部没有容器指的是没有像数组那样的内存空间存多个元素,但是是有单地址内存空间,用于交换数据. SynchronousQueue 凭借其独有的线程配对通信机制,在大部分平常开发中,可能都不太会用到,但线程池技术中会有所使用,由于内部没有使用AQS,而是直接使用CAS,所以代码理解起来会比较困难,但这并不妨碍我们理解底层的实现模型,在理解了模型的基础上,再翻阅源码,就会有方向感,看起来也会比较容易!