Okio深入分析—源码分析部分

一、前言

Okio深入分析——基础使用部分主要了解了Okio这个库的特性,以及最基本的使用。但正如最后所说,我们还不知道它为什么好,为什么快。但是在分析之前,我们先过一遍其主体类的框架图,以便我们对其整体有一个粗略的认识。

OkioFramework.jpg


这个图看上去是不是有点简单啊,都不好意思说是一个框架。确实如此,所以说它是短小精悍呀。言归正传,我们还是来展开一下。

(1) Sink 和 Source 分别定义了写入和读取。

(2) BufferedSink 和 BufferedSource 分别定义了各种写入方法和读取方法,同时还定义了 buffer() 方法应该返回一个实际的 Buffer 用于操作写入与读取。

(3) Buffer 兼具实现了读和写的功能,Buffer 中又进一步将读写单元划分成了 Segment。各个 Segment 之间通过 prev 节点以及 next 节点构成一个双向链表。SegmentPool 是避免为了过多的分配 Segment 对象,用以循环利用 Segment。

(4) RealBufferSink 和 RealBufferSource 是唯一返回给用户的写入与读取实例。其分别由 Sink + Buffer 和 Source + Buffer 的组合模式而成,用以完成真正的写入与读取。

二、写入

案例分析

在分析源码之前,我们还是得先来个 demo 的案例分析 。这次我们的 demo 得先做一个对比,使其看起来更有意思些。

通过 Okio 的 BufferSink 进行的写入代码

<code>private void testSink(File file) { if(!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } try { BufferedSink bufferedSink = Okio.buffer(Okio.sink(file)); StringBuilder stringBuilder = new StringBuilder(); for(int k = 0;k < 1000;k++) { stringBuilder.append("Buffer Sink Test Sample"); } int i = 0; System.out.println("Okio sink test start"); long ts = System.currentTimeMillis(); long nts = System.nanoTime(); while(i++ < 100000) { bufferedSink.writeUtf8(stringBuilder.toString()); } bufferedSink.close(); System.out.println("Okio sink ms time " + (System.currentTimeMillis() - ts)); System.out.println("Okio sink nano time " + (System.nanoTime() - nts)); System.out.println("Okio sink test end"); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }/<code>

通过 Java 的 BufferedWriter 进行的写入代码

<code>private void testFileOut(File file) { if(!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } try { FileWriter fileWriter = new FileWriter(file); BufferedWriter bufferedWriter = new BufferedWriter(fileWriter); StringBuilder stringBuilder = new StringBuilder(); for(int k = 0;k < 1000;k++) { stringBuilder.append("Buffer Sink Test Sample"); } int i = 0; System.out.println("Java buffered write test start"); long ts = System.currentTimeMillis(); long nts = System.nanoTime(); while(i++ < 100000) { bufferedWriter.write(stringBuilder.toString()); } bufferedWriter.close(); System.out.println("Java buffered write ms time " + (System.currentTimeMillis() - ts)); System.out.println("Java buffered write time " + (System.nanoTime() - nts)); System.out.println("Java buffered write test end"); } catch (IOException e) { e.printStackTrace(); } }/<code>

这段程序运行之后有 2 个数据是需要我们关注的。

数据大小:

通过下面的图,我们可以看到,这个写入数据大小为2.3GB

image.png



写入所花的时间:

如下 BufferSink 与 BufferedWriter 的时间对比,分别进行 10 次读写,应该能很明显的看到 BufferSink 的整体时间分布是要优于 BufferedWriter 的。



上面的结果是PC上跑出来的结果,再来看看手机上的结果。也是基本得到一样的结果。就是 OKIO 的结果基本趋于稳定下降。当然在手机上写10000次太慢了,所以改成了 5000 次。



总结下来就是,Okio 的写入并不总是比 Java 原生 I/O 快。而是在多次反复写入的情况下,Okio 则速度较快,且比较稳定。

源码分析

前面的案例分析,更加增强了我们对 Okio 的认知,接下来就是我们的重点,源码分析与理解了。按照 demo ,画出时序图,并逐层进行分解并深入。所以首先我们还是先来看看时序图。


OkioSink.jpg


构造Sink —— Okio#sink(file)

<code> /** Returns a sink that writes to {@code file}. */ // 当传入的是一个文件时所调用的 Sink 构造方法 public static Sink sink(File file) throws FileNotFoundException { if (file == null) throw new IllegalArgumentException("file == null"); return sink(new FileOutputStream(file)); } /** Returns a sink that writes to {@code out}. */ // 当传入的是一个 OutputStream 时所调用的 Sink 构建方法 public static Sink sink(OutputStream out) { return sink(out, new Timeout()); } // 这个构建方法是私有的,最终的调用都会来到这里 private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached(); Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } }; }/<code>

上面这段代码中,是构建 Sink 的 3 个重载方法,是Okio 的静态方法,而且他们也是依次调用。第一个告诉了我们其底层的写入也是基于 Java 原生的 I/O ,如 FileOutputStream。第二个加入了 Timeout ,这就使其具备了超时机制。第三个方法是最为关键的,这里才真正生成了 Sink 实例。而在这个实例中,也直接定义出了数据如何写入的,如何关闭流,以及返回哪一个 Timeout 实例。这一波调用总的来说确定了实际的 Sink 对象,以及 Sink 是如何写入的和其应该返回的超时机制。以上是 5 个Sink 构造方法中的其中 3 个,还有另外 2 个也一起来看看吧。

<code> /** Returns a sink that writes to {@code path}. */ // 通过文件路径构建 Sink 实例 @IgnoreJRERequirement // Should only be invoked on Java 7+. public static Sink sink(Path path, OpenOption... options) throws IOException { if (path == null) throw new IllegalArgumentException("path == null"); return sink(Files.newOutputStream(path, options)); } /** * Returns a sink that writes to {@code socket}. Prefer this over {@link * #sink(OutputStream)} because this method honors timeouts. When the socket * write times out, the socket is asynchronously closed by a watchdog thread. */ // 通过 Socket 进行构建 public static Sink sink(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Sink sink = sink(socket.getOutputStream(), timeout); return timeout.sink(sink); }/<code>

上面这 2 个 Sink 构造方法,第一个是以文件路径来进行构建,而第二个则是以 Socket 进行构建,并且对于 Socket 的构建中,其 Timeout 是 AsyncTimeout。这里说明了 2 个其本特性。第一、除了对文件进行支持,同时还支持 Socket,第二、超时机制有同步的 Timeout 以及异步的 AsyncTimeout。关于超时机制,后面还会详细来讲,这里先有个概念即可。

构造BufferedSink——Okio#buffer(sink)

<code>/** * Returns a new sink that buffers writes to {@code sink}. The returned sink * will batch writes to {@code sink}. Use this wherever you write to a sink to * get an ergonomic and efficient access to data. */ public static BufferedSink buffer(Sink sink) { return new RealBufferedSink(sink); }/<code>

该方法就是构建了一个 RealBufferedSink 实例。那就来看看这个类。

<code>final class RealBufferedSink implements BufferedSink { public final Buffer buffer = new Buffer(); public final Sink sink; boolean closed; RealBufferedSink(Sink sink) { if (sink == null) throw new NullPointerException("sink == null"); this.sink = sink; }......}/<code>

这个类中两个重要的成员变量一个 sink ,由外部传入,一个是 Buffer 由内部直构建。这里的 sink 是前面通过 Okio.sink()方法最终构建的那个匿名类。那么再来看看 Buffer 这个类。

<code>/** * A collection of bytes in memory. * *

Moving data from one buffer to another is fast. Instead * of copying bytes from one place in memory to another, this class just changes * ownership of the underlying byte arrays. * *

This buffer grows with your data. Just like ArrayList, * each buffer starts small. It consumes only the memory it needs to. * *

This buffer pools its byte arrays. When you allocate a * byte array in Java, the runtime must zero-fill the requested array before * returning it to you. Even if you're going to write over that space anyway. * This class avoids zero-fill and GC churn by pooling byte arrays. */public final class Buffer implements BufferedSource, BufferedSink, Cloneable { private static final byte[] DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; static final int REPLACEMENT_CHARACTER = '\\\\ufffd'; @Nullable Segment head; long size; public Buffer() { }

/<code>

Buffer 类中最重要的成员变量 head 是一个 Segment 类型,Buffer 中的数据结构也正是由 Segment 所构成的一个双向链表。除此之外,关于 Buffer 类,这里的解释也极其重要。大概的意思是说:(1) 这是一个在内存中的字节集合。(2)将一个 Buffer 中的数据移到另一个 Buffer 中去是非常快的。原因是它并不是通过复制来实现,而是从底层字节数组去改变与 Buffer 这个类的所属关系。(3)它就是 ArrayList 一样的,容量自动增长的。一开始是很小的,且仅分配它所需要的内存大小。(4)Buffer池化了它的字节数组。如果直接从Java中分配一个字节数组,使用其必须为其填充 0。而 Buffer 通过池化字节数组从而避免了 0 填充和GC 抖动的情况。上面详细的说明了 Buffer 的特性,当然也是 Okio 的特性。在后面的章节中我们还会继续分析其特性的实现细节。而关于 Buffer 就先了解到这里,再进一步看看Segment。

<code>package okio;import javax.annotation.Nullable;/** * A segment of a buffer. * *

Each segment in a buffer is a circularly-linked list node referencing the following and * preceding segments in the buffer. * *

Each segment in the pool is a singly-linked list node referencing the rest of segments in the * pool. * *

The underlying byte arrays of segments may be shared between buffers and byte strings. When a * segment's byte array is shared the segment may not be recycled, nor may its byte data be changed. * The lone exception is that the owner segment is allowed to append to the segment, writing data at * {@code limit} and beyond. There is a single owning segment for each byte array. Positions, * limits, prev, and next references are not shared. */final class Segment { /** The size of all segments in bytes. */// 每个 segment 共 8 K字节 static final int SIZE = 8192; /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */// 是否共享底层数据的阈值 static final int SHARE_MINIMUM = 1024;// 存放字节的数组 final byte[] data; /** The next byte of application data byte to read in this segment. */ // 下一个要读的字节的位置 int pos; /** The first byte of available data ready to be written to. */ // 写入的起始位置 int limit; /** True if other segments or byte strings use the same byte array. */ // 表明此 segment 是共享的。那什么时候情况下会被共享呢? boolean shared; /** True if this segment owns the byte array and can append to it, extending {@code limit}. */// 表明这个段是可以追加数据的,而追回的位置是基于 limit。一般情况下都是为 true 的 boolean owner; /** Next segment in a linked or circularly-linked list. */// 双向环形链表的下一个节点 Segment next; /** Previous segment in a circularly-linked list. */// 双向环形链表的前一个节点 Segment prev; Segment() { this.data = new byte[SIZE]; this.owner = true; this.shared = false; } Segment(Segment shareFrom) { this(shareFrom.data, shareFrom.pos, shareFrom.limit); shareFrom.shared = true; } Segment(byte[] data, int pos, int limit) { this.data = data; this.pos = pos; this.limit = limit; this.owner = false; this.shared = true; } /** * Removes this segment of a circularly-linked list and returns its successor. * Returns null if the list is now empty. */// 将自己弹出链表 public @Nullable Segment pop() { ...... } /** * Appends {@code segment} after this segment in the circularly-linked list. * Returns the pushed segment. */// 压入到当前链表 public Segment push(Segment segment) { ...... } /** * Splits this head of a circularly-linked list into two segments. The first * segment contains the data in {@code [pos..pos+byteCount)}. The second * segment contains the data in {@code [pos+byteCount..limit)}. This can be * useful when moving partial segments from one buffer to another. * *

Returns the new head of the circularly-linked list. */// 分割,把数据以 [pos,pos + byteCount] 和 [pos + byteCount,limit] 这两段来进行分割。分割出去的数据依据共享阈值决定数据是否被共享 public Segment split(int byteCount) { ...... } /** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */// 压缩存储,当处于尾节点和它的前一节点的数据都少于一半时,把数据进行压缩存储到一个,以便可以回收调一个 Segment。 public void compact() { ...... }// 移动 byteCount 个字节数据到另一个段中 /** Moves {@code byteCount} bytes from this segment to {@code sink}. */ public void writeTo(Segment sink, int byteCount) { ...... }}

/<code>

来解读一下 Segment 的特性:(1) segment 是 Buffer 中的一个组织单元(2) segment 在 Buffer 中以环形双向链表的数据结构来存储(3) segment 在 SegmentPool 中以单链接表数据结构来存储(4) segment 可以在 Buffer 和 ByteStrings 之间进行共享。对于共享的 segment 不可以回收,也不可以修改其中的数据。但例外的是,段的拥有者可以在 limit 之后追回数据。对于这个段来说,底层的字节数组是共享的,但position,limits,prev以及next这些是不共享的。此外,注释里有对每一个属性的详细解释以及个人的理解。为了方便更容易理解,简单画了一个 Segment 的数据模型图来帮助我们。如下所示。


image.png


关于 Segment 就先了解到这里,其中的方法 split(),compact()以及 writeTo() 后面还会详细说明。接下来继续看看 SegmentPool。

<code>package okio;import javax.annotation.Nullable;/** * A collection of unused segments, necessary to avoid GC churn and zero-fill. * This pool is a thread-safe static singleton. */// 主要是用于收集未使用的 Segments,避免了内存抖动以及0填充。同时,这个 Pool 也是一个线程安全的静态单例类。final class SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?// 一个池子最大存储 64 KB 字节 static final long MAX_SIZE = 64 * 1024; // 64 KiB. /** Singly-linked list of segments. */// 表明其是一个单向链表 static @Nullable Segment next; /** Total bytes in this pool. */// 当前的总数据大小 static long byteCount; private SegmentPool() { } static Segment take() { synchronized (SegmentPool.class) { if (next != null) { Segment result = next; next = result.next; result.next = null; byteCount -= Segment.SIZE; return result; } } return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } static void recycle(Segment segment) { if (segment.next != null || segment.prev != null) throw new IllegalArgumentException(); if (segment.shared) return; // This segment cannot be recycled. synchronized (SegmentPool.class) { if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full. byteCount += Segment.SIZE; segment.next = next; segment.pos = segment.limit = 0; next = segment; } }}/<code>

这个类其实挺简单的,take() 方法中就是获取一个 Segment ,如果池子中有就取一个出来,如果没有就给 new 一个。而 recycle() 方法,如果池子还没涨到 64 KB 的限制,且 Segment 不是共享的,那就把它加到链接里面去。

小结通过对 Sink 以及 BufferedSink 的构造,可以说是对 Okio 整个使用环境的一个初始化。同时,在这一段也详细介绍了其相关的核心概念,Buffer,Segment,SegmentPool 等。有了这些基本的核心概念,我们就可以进一步分析字节的写入了。

写入数据——RealBufferedSink#writeUtf8()

<code> @Override public BufferedSink writeUtf8(String string) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.writeUtf8(string); return emitCompleteSegments(); }/<code>

进一步调用了 Buffer 的 writeUtf8() 和 emitCompleteSegments(),那么就往下看Buffer#writeUft8() 方法。代码很长,但关键的是 2 个调用,writableSegment(1) 和 writeByte(),其他的都是 utf-8 编码相关。

<code> @Override public Buffer writeUtf8(String string) { return writeUtf8(string, 0, string.length()); } @Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) { ...... // Transcode a UTF-16 Java String to UTF-8 bytes. for (int i = beginIndex; i < endIndex;) { int c = string.charAt(i); if (c < 0x80) { Segment tail = writableSegment(1); byte[] data = tail.data; int segmentOffset = tail.limit - i; int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset); // Emit a 7-bit character with 1 byte. data[segmentOffset + i++] = (byte) c; // 0xxxxxxx // Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance // improvement over independent calls to writeByte(). while (i < runLimit) { c = string.charAt(i); if (c >= 0x80) break; data[segmentOffset + i++] = (byte) c; // 0xxxxxxx } int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i). tail.limit += runSize; size += runSize; } else if (c < 0x800) { // Emit a 11-bit character with 2 bytes. writeByte(c >> 6 | 0xc0); // 110xxxxx writeByte(c & 0x3f | 0x80); // 10xxxxxx i++; } else if (c < 0xd800 || c > 0xdfff) { // Emit a 16-bit character with 3 bytes. writeByte(c >> 12 | 0xe0); // 1110xxxx writeByte(c >> 6 & 0x3f | 0x80); // 10xxxxxx writeByte(c & 0x3f | 0x80); // 10xxxxxx i++; } else { // c is a surrogate. Make sure it is a high surrogate & that its successor is a low // surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character. int low = i + 1 < endIndex ? string.charAt(i + 1) : 0; if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) { writeByte('?'); i++; continue; } // UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits) // UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits) // Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits) int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00); // Emit a 21-bit character with 4 bytes. writeByte(codePoint >> 18 | 0xf0); // 11110xxx writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx writeByte(codePoint >> 6 & 0x3f | 0x80); // 10xxyyyy writeByte(codePoint & 0x3f | 0x80); // 10yyyyyy i += 2; } } return this; }/<code>

Buffer有两个重载版本,第一个调用了第二个,主要就是确定了起始位置为0,而写入的数据为全部数据。而第二个是具体的写入,这段代码比较长,其实不应该贴出来的,但我看到它对 Utf8 的编码十分的优秀,所以还是贴出来,有时间的可以仔细读一读。当然,在这里我们只关注其中最关键的几句即可。如前面所说,这里面有两个进一步调用的方法分别是 writableSegment(1) 和 writeByte(),而 writeByte() 里面也有对 writableSegment() 的调用。那我们先来看看 writableSegment() 方法的实现。

<code>/** * Returns a tail segment that we can write at least {@code minimumCapacity} * bytes to, creating it if necessary. */ Segment writableSegment(int minimumCapacity) { ...... if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head; } Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) { tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up. } return tail; }/<code>

这个方法的主要含义就是如果head为空,则说明当前的链接还是空的,那么就新建一个节点,此时头尾相等。如果不为空就取出尾节点,即头节点的前向节点。而尾节点的空间不能再装下当前要写入的字节数时,就会创建一个新的节点,并把新建的节点作为尾节点,然后继续写入。再来看看 writeByte()。

<code> @Override public Buffer writeByte(int b) { Segment tail = writableSegment(1); tail.data[tail.limit++] = (byte) b; size += 1; return this; }/<code>

这里就是把数据写入到 Segment 的 data[] 中。对于一次 writeUtf8() 的调用来看,目前为止是把数据全部都写入到了 Segment 中,同时也是内存中,还没有发生实际的 I/O 的。接下来继续看 RealBufferedSink#emitCompleteSegments()方法。

<code> @Override public BufferedSink emitCompleteSegments() throws IOException { if (closed) throw new IllegalStateException("closed"); long byteCount = buffer.completeSegmentByteCount(); if (byteCount > 0) sink.write(buffer, byteCount); return this; }/<code>

这里先是调用了 Buffer#completeSegmentByteCount()确定需要从 buffer 中实际取多少字节写入到实际的输出流中。

<code> /** * Returns the number of bytes in segments that are not writable. This is the * number of bytes that can be flushed immediately to an underlying sink * without harming throughput. */ public long completeSegmentByteCount() { long result = size; if (result == 0) return 0; // Omit the tail if it's still writable. Segment tail = head.prev; if (tail.limit < Segment.SIZE && tail.owner) { result -= tail.limit - tail.pos; } return result; }/<code>

上面代码的意思是用当前总的 size 大小 减去 尾节点已写入的字节数,也就是除尾节点,其他的都会被立即写入到底层的输出流中。这说明了不足 8K 的数据写入,在 close() 之前是不会发生实际 I/O 的。接下来,进一步调用了 sink#write()。这个 sink 就是我们在前面通过 Okio#sink() 方法所创建出来的匿名类。 为了连贯性,我们还是把写入相关的代码再贴一下以便于进一步的阅读与分析。

<code>@Override public void write(Buffer source, long byteCount) throws IOException { ...... while (byteCount > 0) { // 循环,直到数据全部写入 timeout.throwIfReached(); // 超时判断 Segment head = source.head; // 总是取头结点 // 确定当次需要写入的字节数,当前 segment 中可写入的数据与剩余字节数,取最小的那个 int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy);//写入数据到输出流 // 更新各位置 head.pos += toCopy; // 读指针往前移 toCopy 个 byteCount -= toCopy; // 写入字节数减 toCopy 个 source.size -= toCopy; // Buffer 中大小减 toCopy 个 // 如果当前 segment 的数据已经全部写入到输出流,那么将其弹出双向循环链接,并将其回收到 SegmentPool 中 if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } }/<code>

贴出来的代码中,已经作了非常详细的说明,因为这里的每一句都很重要,都需要我们去理解。当然,总的来说其实也只有 2 个关键点。其一,超时机制。

<code>/** * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to * asynchronously abort an in-progress operation. */ public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }/<code>

根据这里的注释,以及分析 hasDeadLine 和 deadlineNanoTime 的赋值情况来看,超时的检测只应该发生在异步的情况下。纳尼!!!也就是说同步超时机制是不检查的。其二,就是数据的写入以及各位位置标记的置位和Segment的回收。

关闭——RealBufferedSink#close()

<code>@Override public void close() throws IOException { if (closed) return; // Emit buffered data to the underlying sink. If this fails, we still need // to close the sink; otherwise we risk leaking resources. Throwable thrown = null; try { if (buffer.size > 0) { sink.write(buffer, buffer.size); } } catch (Throwable e) { thrown = e; } try { sink.close(); } catch (Throwable e) { if (thrown == null) thrown = e; } closed = true; if (thrown != null) Util.sneakyRethrow(thrown); }/<code>

close() 也就是将 buffer 中剩余的数据都写入到底层输出流中,最后再调用 sink#close()关闭底层的输出流。至此,写入数据可以说大致分析完了。因为与 writeUtf8() 同级别的其他 write方法,其基本原理都是类似的。通过分析我们知道,其是先将数据都写入到 Buffer 中,然后才将数据写入到实际的 I/O 中,以此减少实际的 I/O。而分析到这里,我们应该也注意到,前面所说的 Segment 的 splite,compact以及share 这些优化内存的方法都没有被提及过。这就接下来要分析的读取与另一不同级别的写入。

三、读取

案例分析

关于读取,我们同样也从案例分析开始。先来看看读取的代码。通过 Okio 的 BufferSource 进行的读取代码

<code>private void testSource(File file) { try { BufferedSource bufferedSource = Okio.buffer(Okio.source(file)); System.out.println("Okio source test start"); long ts = System.currentTimeMillis(); byte[] buffer = new byte[8 * 1024]; while(bufferedSource.read(buffer) > 0) { } bufferedSource.close(); System.out.println("Okio source test end"); System.out.println(" source test " + (System.currentTimeMillis() - ts)); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }/<code>

通过 Java 的 BufferedReader 进行的读取代码

<code>private void testJava(File file) { try { BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); System.out.println("java test start"); long ts = System.currentTimeMillis(); char buffer[] = new char[8 * 1024]; while(bufferedReader.read(buffer) > 0) { } System.out.println("java test end"); System.out.println("java test " + (System.currentTimeMillis() - ts)); bufferedReader.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }/<code>

同样来看看测试结果,不过这里偷了个懒,只测试了PC端的,没有测试手机端。因为平台不影响结果。文件就是上面写入的文件,大小 2.3 G。



对比结果很明显,Okio#BufferSource 的读取速度明显大于 Java 原生的2 倍多。但是,为什么呢?另外有一个细节,上面的 buffer 大小只设置了 8 * 1024 ,也就是 8KiB,这是因为 Okio 的 segment 为 8 KiB,其一次读取最大也只读取 8 KiB

源码分析

有了前面案例分析的基本认知,接下来就是我们的源码分析与理解了。同样,先画出时序图,再逐层进行分解并深入。


OkioSource.jpg


从时序图上可以看到,前面几个步骤都是相同的,如 Buffer,Segment,Timeout 这些概念和构造都是一样的。就不重复介绍了。主要看看 Source 和 RealBufferSource 的构造吧。

构造Source——Okio#source(file)

<code> /** Returns a source that reads from {@code file}. */ public static Source source(File file) throws FileNotFoundException { if (file == null) throw new IllegalArgumentException("file == null"); return source(new FileInputStream(file)); } /** Returns a source that reads from {@code in}. */ public static Source source(InputStream in) { return source(in, new Timeout()); }private static Source source(final InputStream in, final Timeout timeout) { if (in == null) throw new IllegalArgumentException("in == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; }/<code>

上面的代码看着多,但和 Sink 的构造几乎一样。这里的重点也构造了一个匿名的 Source 接口的实例,以完成实际从输入中的读取。同样的,Source 也是支持 Socket 的,顺便也一起来看看吧。

<code>/** * Returns a source that reads from {@code socket}. Prefer this over {@link * #source(InputStream)} because this method honors timeouts. When the socket * read times out, the socket is asynchronously closed by a watchdog thread. */ public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); }/<code>

支持 Socket 也是比较简单的,即从 socket 中获取输入流以便读取数据,同时这里也设定了 Timeout 为 AsyncTimeout。关于 AsyncTimeout 就放到后面专门的分节里来讲解。

构造 BufferdSource —— Okio#buffer(Source)

<code> /** * Returns a new source that buffers reads from {@code source}. The returned * source will perform bulk reads into its in-memory buffer. Use this wherever * you read a source to get an ergonomic and efficient access to data. */ public static BufferedSource buffer(Source source) { return new RealBufferedSource(source); }/<code>

构造 RealBufferedSource 实例,继续看。

<code>final class RealBufferedSource implements BufferedSource { public final Buffer buffer = new Buffer(); public final Source source; boolean closed; RealBufferedSource(Source source) { if (source == null) throw new NullPointerException("source == null"); this.source = source; }....../<code>

同样是组合了 Source 和 Buffer 这两个实例。Source 就是前面通过 Okio.source()方法最终构建的那个匿名类。而 Buffer 和其内部聚合的 Segment 都已经介绍过了。所以直接来看 read() 方法吧。

读取数据——RealBufferedSource#read()

<code> @Override public int read(byte[] sink) throws IOException { return read(sink, 0, sink.length); } @Override public int read(byte[] sink, int offset, int byteCount) throws IOException { checkOffsetAndCount(sink.length, offset, byteCount); if (buffer.size == 0) { long read = source.read(buffer, Segment.SIZE); if (read == -1) return -1; } int toRead = (int) Math.min(byteCount, buffer.size); return buffer.read(sink, offset, toRead); }/<code>

read() 是个重载方法,我们调用的是第一个,然后它会进一步调用指定参数 offset 和 byteCount 的 read() 方法。在这个方法里,buffer.size 为 0 说明当前 buffer 中为没有数据,即数据已经被从buffer 中读出来了。如果不为0,说明 buffer 中还有数据那么就进一步读取剩下的或者剩余的数据。这里最初肯定是要走为 0 的情况。这里调用的是 source.read(),也即前面匿名类 Source 的。注意,这里传入的参数中除了 buffer ,还有 Segment.SIZE,即 8KiB。这里为了方便分析,也重复贴一下 read() 方法的相关代码。

<code>@Override public long read(Buffer sink, long byteCount) throws IOException { ..... ...... try { timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } }/<code>

同步 Timeout.throwIfReached() 前面已经分析过了,是不起作用的,其实现在来看看也能明白过来,不管是在读还是写的时候,它都没有设置起始时间,那它又如何能计算出所消耗的时间呢?而 Buffer#writeableSegment() 在前面也分析过了,主要是返回一个 Segment。然后就是实际从文件读出数据并只保存在 Segment 里面。

在 RealBufferedSource#read() 中,走了不为 0 的情况后,就是将Buffer中的数据按需要读到内存中返回给调用者。

有了前面写入的分析,再来看读取显然要简单的多了。至此,读取和写入的最基本流程都分析完了。Okio 为打包了很多的读取和写入的方法,这里只分析了其中最基本的,当然有了这些最基本的认知后,再来理解基本的就容易许多了。

三、读取到写入,写入自读取

除了基本的读取与写入,还可以将读取与写入直接串起来。也即读取出来数据可以直接送进写入,而写入的内容也可以是来自 Source 。先来看看写入自读取吧。

<code>@Override public long writeAll(Source source) throws IOException { if (source == null) throw new IllegalArgumentException("source == null"); long totalBytesRead = 0; for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) { totalBytesRead += readCount; emitCompleteSegments(); } return totalBytesRead; } @Override public BufferedSink write(Source source, long byteCount) throws IOException { while (byteCount > 0) { long read = source.read(buffer, byteCount); if (read == -1) throw new EOFException(); byteCount -= read; emitCompleteSegments(); } return this; }/<code>

上面两段代码就是将数据从 Source 读出,再写入到Buffer 中去。这里需要关注的有两个事情:source实例 :这里的 Source 是 RealBufferdSource 实例。关键调用 : 这里的关键调用是 RealBufferdSource#read(Buffer,byteCount)。来看看其实现。

<code>@Override public long read(Buffer sink, long byteCount) throws IOException { ...... return buffer.read(sink, toRead); }/<code>

调用了 Buffer#read(Buffer,toRead)

<code> @Override public long read(Buffer sink, long byteCount) { ...... sink.write(this, byteCount); return byteCount; }/<code>

看到这里可能会有点晕,所以强调一下,这里的 sink 是 RealBufferdSink 中的 Buffer,而 this 是 RealBufferedSource 中的 buffer。意思就是将 RealBufferdSource 中的 buffer 的数据写入到 RealBufferdSink 中的 buffer 中去。 下面来看看具体的实现,其实代码并不长,注释就占了一大半,为了完整性,我并没有将其删除去。

<code>@Override public void write(Buffer source, long byteCount) { // Move bytes from the head of the source buffer to the tail of this buffer // while balancing two conflicting goals: don't waste CPU and don't waste // memory. // // // Don't waste CPU (ie. don't copy data around). // // Copying large amounts of data is expensive. Instead, we prefer to // reassign entire segments from one buffer to the other. // // // Don't waste memory. // // As an invariant, adjacent pairs of segments in a buffer should be at // least 50% full, except for the head segment and the tail segment. // // The head segment cannot maintain the invariant because the application is // consuming bytes from this segment, decreasing its level. // // The tail segment cannot maintain the invariant because the application is // producing bytes, which may require new nearly-empty tail segments to be // appended. // // // Moving segments between buffers // // When writing one buffer to another, we prefer to reassign entire segments // over copying bytes into their most compact form. Suppose we have a buffer // with these segment levels [91%, 61%]. If we append a buffer with a // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied. // // Or suppose we have a buffer with these segment levels: [100%, 2%], and we // want to append it to a buffer with these segment levels [99%, 3%]. This // operation will yield the following segments: [100%, 2%, 99%, 3%]. That // is, we do not spend time copying bytes around to achieve more efficient // memory use like [100%, 100%, 4%]. // // When combining buffers, we will compact adjacent buffers when their // combined level doesn't exceed 100%. For example, when we start with // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%]. // // // Splitting segments // // Occasionally we write only part of a source buffer to a sink buffer. For // example, given a sink [51%, 91%], we may want to write the first 30% of // a source [92%, 82%] to it. To simplify, we first transform the source to // an equivalent buffer [30%, 62%, 82%] and then move the head segment, // yielding sink [51%, 91%, 30%] and source [62%, 82%]. if (source == null) throw new IllegalArgumentException("source == null"); if (source == this) throw new IllegalArgumentException("source == this"); checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { // Is a prefix of the source's head segment all that we need to move? if (byteCount < (source.head.limit - source.head.pos)) { Segment tail = head != null ? head.prev : null; if (tail != null && tail.owner && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) { // Our existing segments are sufficient. Move bytes from source's head to our tail. source.head.writeTo(tail, (int) byteCount); source.size -= byteCount; size += byteCount; return; } else { // We're going to need another segment. Split the source's head // segment in two, then move the first of those two to this buffer. source.head = source.head.split((int) byteCount); } } // Remove the source's head segment and append it to our tail. Segment segmentToMove = source.head; long movedByteCount = segmentToMove.limit - segmentToMove.pos; source.head = segmentToMove.pop(); if (head == null) { head = segmentToMove; head.next = head.prev = head; } else { Segment tail = head.prev; tail = tail.push(segmentToMove); tail.compact(); } source.size -= movedByteCount; size += movedByteCount; byteCount -= movedByteCount; } }/<code>

通过注释,可以明白,这个方法的核心是在讲:(1) 这个方法主要完成的是从 Source 的首部将数据移入到 Sink 的尾部(2) 移动过程中需要平衡两个重要的指标,优化 CPU 资源,优化内存资源。(3) 移动数据是通过改变引用指向,而不是数据的复制(4) Segment 合并,是说前一个 Segment 与当前的 Segment 数据大小之和如果没有超过 100%,那么就会将当前的 Segment 合并到前一个中去。看看下面的代码,你可能会更加的清楚。

<code>/** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */ public void compact() { if (prev == this) throw new IllegalStateException(); if (!prev.owner) return; // Cannot compact: prev isn't writable. int byteCount = limit - pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space. writeTo(prev, byteCount); pop(); SegmentPool.recycle(this); }/<code>

(5) 分割 Segment,对于有一些 Segment ,如果只是部分被读取出来,那么可以通过将其分割成 2 个 Segment ,然后取需要的那个 Segment 加入到当前的 Buffer 中。

<code>/** * Splits this head of a circularly-linked list into two segments. The first * segment contains the data in {@code [pos..pos+byteCount)}. The second * segment contains the data in {@code [pos+byteCount..limit)}. This can be * useful when moving partial segments from one buffer to another. * *

Returns the new head of the circularly-linked list. */ public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }

/<code>

明白了这个方法的主要意图,再来分析具体的代码。分两类情况:其一,当要读取的数据大小小于 source.head 中的字节数时,如果当前 sink.head 不为 null,则将 source.head 的数据都写入到 sink.tail 中去,当然也是其最后的要读取的数据了,即 soruce.head.writeTo();而当 sink.head 为空时,则先将 source.head 再 byteCount 大小进行数据的分割,再进入到第二种情况。而在分割时,如果需要的数据大于分割的阈值,那么被分割出来的 Segment 就会被共享成同一个 Segment。其二,当要读取的数据大小大于等于 source.head 字节时,如果当前 sink.head 为 null ,那么就让 source.head 从 source.buffer 中取出,并让 sink.head 直接指向 source.head 即可。而如果当前 sink.head 不为空,则将取出来的 segment 并到当前 sink.buffer 的尾部。根据前面的分析,如果尾部的数据与其前一个数据大小之和没有达到 100% ,那就可以进行合并。

以上便是 Segment 的合并与分割了。Okio 在这里同时考虑到了 CPU 和内存的优化,并且实现的非常微妙。

同时上面有部分也提到了 Segment 的共享,从整个源码以及分析来看,共享主要的场景就是发生在复制时以及可能的分割时。当然,最重要的是分割时了。而关于复制时的共享,其实也是非常符合逻辑了。即使被标记成分享的 Segment 不能再写入,但是对于内存的利用来上讲也是非常优化的。

四、异步的超时机制

异步超时机制主要只是用于 Socket 通信,这里也重复贴一下 Source 的代码,Sink 差不多。

<code> /** * Returns a source that reads from {@code socket}. Prefer this over {@link * #source(InputStream)} because this method honors timeouts. When the socket * read times out, the socket is asynchronously closed by a watchdog thread. */ public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); }/<code>

如前面所说,Socket 通信时,主要是拿出了其 InputStream,以便于读取数据。而这里的 Timeout 是其子类 AsyncTimeout。当通过重载方法 source() 构造出了 Source 之后,进一步送进了 AsyncTimeout#source() 方法中

<code>/** * Returns a new source that delegates to {@code source}, using this to implement timeouts. This * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation. */ public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; enter(); try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }/<code>

Okio#source()主要是定义了如何读取数据,而AsyncTimeout#source()定义了读取数据的超时机制。在 read() 方法中,主要体现在 enter() 以及 exit() 这两个方法上。

思考一下当前的场景,当前线程正在读取数据,那管理超时必然需要另一个线程。在 AsyncTimeout 中就定义了 WatchDog 线程,来监测超时的读写。

WatchDog 线程,俗名 “看门狗”,其在嵌入式方面应用非常多。而Android系统中的SystemServer是否存在耗时操作的监测也是由一个 WatchDog 线程来实现的。此处的 WatchDog 线程是由第一次 enter() 进入所触发的,起用后便被设置成守护线程运行,并且线程以链表的形式来管理。与此同时, enter() 也会添加一个新的 AsyncTimeout 节点到 WatchDog 线程中。而 exit() 则会检查节点是否还在链表中,否则时间到了不在链表中就会被认为是超时状态了,从而抛出超时的异常。具体我们来看看代码的实现。

<code>public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }/<code>

同 enter() 调用 scheduleTimeout() ,调度起一个 Timeout。

<code>private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled.// 链表为空说明 Watchdog 还没有启动起来,这里添加了头结点,并且启动了 watchdog 线程。 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); }// 分不同的条件讲计算终止时间 long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, // Math.min() is undefined for absolute values, but meaningful for relative ones. node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); }// 按剩余时间升序排序 // Insert the node in sorted order. long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }/<code>

以上便是插入了一个 AsyncTimeout 的节点。再来看看 Watchdog 线程是如何监测超时的。

<code>private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); // Didn't find a node to interrupt. Try again. if (timedOut == null) continue; // The queue is completely empty. Let this thread exit and let another watchdog thread // get created on the next call to scheduleTimeout(). if (timedOut == head) { head = null; return; } } // Close the timed out node. timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }/<code>

重点在 awaitTimeout (),即等待一下,看看是否有 AsyncTimeout 超时了。

<code>/** * Removes and returns the node at the head of the list, waiting for it to time out if necessary. * This returns {@link #head} if there was no node at the head of the list when starting, and * there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a * new node was inserted while waiting. Otherwise this returns the node being waited on that has * been removed. */ static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { // Get the next eligible node. AsyncTimeout node = head.next; // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. if (node == null) { long startNanos = System.nanoTime(); AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } long waitNanos = node.remainingNanos(System.nanoTime()); // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { // Waiting is made complicated by the fact that we work in nanoseconds, // but the API wants (millis, nanos) in two arguments. long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } // The head of the queue has timed out. Remove it. head.next = node.next; node.next = null; return node; }/<code>

首先找到剩余时间最小的那个节点,按照前面升序排序,也就是 head.next 节点。然后再检查剩余时间是否大于 0 ,说明还没有超时,这时便将等待到其剩余时间,并且返回空,表示并未超时。这里提一下的是 class.wait(time) 方法,如果时间没有到,收到对应 class 的 notify()/notifyAll() 也是会提前被唤醒的。假设这里并没有超时,最后再来看看 exit()。

<code>/** * Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See * {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown. */ final void exit(boolean throwOnTimeout) throws IOException { boolean timedOut = exit(); if (timedOut && throwOnTimeout) throw newTimeoutException(null); } /** * Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout * occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception * returned. */ final IOException exit(IOException cause) throws IOException { if (!exit()) return cause; return newTimeoutException(cause); } /** Returns true if the timeout occurred. */ public final boolean exit() { if (!inQueue) return false; inQueue = false; return cancelScheduledTimeout(this); }/<code>

exit() 方法又重载了 3 个,很显然,前面 2 个主要是调用第 3 个来判断是否超时以便进一步的抛出超异常。这里主要来看看第 3 个。其又进一步调用了 cancelScheduledTimeout() 方法,这是与 scheduledTimeout()相对应的方法。

<code>/** Returns true if the timeout occurred. */ private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { // Remove the node from the linked list. for (AsyncTimeout prev = head; prev != null; prev = prev.next) { if (prev.next == node) { prev.next = node.next; node.next = null; return false; } } // The node wasn't found in the linked list: it must have timed out! return true; }/<code>

从链表中寻找当前结点,如果找到则从链接中移除并表示没有超时。如果没有找到,则表示已经超时了。

至此,异步超时机制分析完毕。如果看前面的分析还有点晕,不防来看看总结:

(1) 管理超时机制的数据结构是一个升序链表,而管理这个链表的是守护线程 Watchdog ,它会每次选择一个节点,并通过剩余时间的等待来等待当前 AsyncTimeout 的超时与否。

(2) 通过 enter() 方法,再经过 scheduleTimeout() 方法添加一个 AsyncTimeout节点

(3) 通过 exit() 方法,再经过 cancleScheduleTimeut() 方法将未超时的 AsyncTimeout 从链表中移除。移除后,当 Watchdog 进入下一次遍历时就不会再遍历到被它等待超时的那个节点了。

五、总结

感谢你能读到并读完这篇文章,对于 Okio 的全部分析到此为止了。文章确实贴了不少的源码,但我仍然觉得是有必要的,它能使我们读起来更加的通畅以及完整。当然,也许还有更好的表述方式。本文并没有分析到了 Okio 所有的方方面面,但我相信能够认真读完并且理解其中大部分的意思,再去分析其他部分应该也非常简单了。

最后,由于本人水平有限,在分析过程中难免会有错误,表述上也会存在不尽人意的地方。对于一切问题,欢迎下方留言讨论,不甚感激。