Okio深入分析—源码分析部分

一、前言

Okio深入分析——基础使用部分主要了解了Okio这个库的特性,以及最基本的使用。但正如最后所说,我们还不知道它为什么好,为什么快。但是在分析之前,我们先过一遍其主体类的框架图,以便我们对其整体有一个粗略的认识。

Okio深入分析—源码分析部分

OkioFramework.jpg


这个图看上去是不是有点简单啊,都不好意思说是一个框架。确实如此,所以说它是短小精悍呀。言归正传,我们还是来展开一下。

(1) Sink 和 Source 分别定义了写入和读取。

(2) BufferedSink 和 BufferedSource 分别定义了各种写入方法和读取方法,同时还定义了 buffer() 方法应该返回一个实际的 Buffer 用于操作写入与读取。

(3) Buffer 兼具实现了读和写的功能,Buffer 中又进一步将读写单元划分成了 Segment。各个 Segment 之间通过 prev 节点以及 next 节点构成一个双向链表。SegmentPool 是避免为了过多的分配 Segment 对象,用以循环利用 Segment。

(4) RealBufferSink 和 RealBufferSource 是唯一返回给用户的写入与读取实例。其分别由 Sink + Buffer 和 Source + Buffer 的组合模式而成,用以完成真正的写入与读取。

二、写入

案例分析

在分析源码之前,我们还是得先来个 demo 的案例分析 。这次我们的 demo 得先做一个对比,使其看起来更有意思些。

通过 Okio 的 BufferSink 进行的写入代码

<code>private void testSink(File file) {        if(!file.exists()) {            try {                file.createNewFile();            } catch (IOException e) {                e.printStackTrace();            }        }        try {            BufferedSink bufferedSink = Okio.buffer(Okio.sink(file));            StringBuilder stringBuilder = new StringBuilder();            for(int k = 0;k < 1000;k++) {                stringBuilder.append("Buffer Sink Test Sample");            }            int i = 0;            System.out.println("Okio sink test start");            long ts = System.currentTimeMillis();            long nts = System.nanoTime();            while(i++ < 100000) {                bufferedSink.writeUtf8(stringBuilder.toString());            }            bufferedSink.close();            System.out.println("Okio sink ms time " + (System.currentTimeMillis() - ts));            System.out.println("Okio sink nano time " + (System.nanoTime() - nts));            System.out.println("Okio sink test end");        } catch (FileNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }/<code>

通过 Java 的 BufferedWriter 进行的写入代码

<code>private void testFileOut(File file) {        if(!file.exists()) {            try {                file.createNewFile();            } catch (IOException e) {                e.printStackTrace();            }        }        try {            FileWriter fileWriter = new FileWriter(file);            BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);            StringBuilder stringBuilder = new StringBuilder();            for(int k = 0;k < 1000;k++) {                stringBuilder.append("Buffer Sink Test Sample");            }            int i = 0;            System.out.println("Java buffered write test start");            long ts = System.currentTimeMillis();            long nts = System.nanoTime();            while(i++ < 100000) {                bufferedWriter.write(stringBuilder.toString());            }            bufferedWriter.close();            System.out.println("Java buffered write ms time " + (System.currentTimeMillis() - ts));            System.out.println("Java buffered write time " + (System.nanoTime() - nts));            System.out.println("Java buffered write test end");        } catch (IOException e) {            e.printStackTrace();        }    }/<code>

这段程序运行之后有 2 个数据是需要我们关注的。

数据大小:

通过下面的图,我们可以看到,这个写入数据大小为2.3GB

Okio深入分析—源码分析部分

image.png



写入所花的时间:

如下 BufferSink 与 BufferedWriter 的时间对比,分别进行 10 次读写,应该能很明显的看到 BufferSink 的整体时间分布是要优于 BufferedWriter 的。


Okio深入分析—源码分析部分


上面的结果是PC上跑出来的结果,再来看看手机上的结果。也是基本得到一样的结果。就是 OKIO 的结果基本趋于稳定下降。当然在手机上写10000次太慢了,所以改成了 5000 次。


Okio深入分析—源码分析部分


总结下来就是,Okio 的写入并不总是比 Java 原生 I/O 快。而是在多次反复写入的情况下,Okio 则速度较快,且比较稳定。

源码分析

前面的案例分析,更加增强了我们对 Okio 的认知,接下来就是我们的重点,源码分析与理解了。按照 demo ,画出时序图,并逐层进行分解并深入。所以首先我们还是先来看看时序图。


Okio深入分析—源码分析部分

OkioSink.jpg


构造Sink —— Okio#sink(file)

<code>  /** Returns a sink that writes to {@code file}. */  // 当传入的是一个文件时所调用的 Sink 构造方法  public static Sink sink(File file) throws FileNotFoundException {    if (file == null) throw new IllegalArgumentException("file == null");    return sink(new FileOutputStream(file));  }   /** Returns a sink that writes to {@code out}. */  // 当传入的是一个 OutputStream 时所调用的 Sink 构建方法  public static Sink sink(OutputStream out) {    return sink(out, new Timeout());  }  // 这个构建方法是私有的,最终的调用都会来到这里  private static Sink sink(final OutputStream out, final Timeout timeout) {    if (out == null) throw new IllegalArgumentException("out == null");    if (timeout == null) throw new IllegalArgumentException("timeout == null");    return new Sink() {      @Override public void write(Buffer source, long byteCount) throws IOException {        checkOffsetAndCount(source.size, 0, byteCount);        while (byteCount > 0) {          timeout.throwIfReached();          Segment head = source.head;          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);          out.write(head.data, head.pos, toCopy);          head.pos += toCopy;          byteCount -= toCopy;          source.size -= toCopy;          if (head.pos == head.limit) {            source.head = head.pop();            SegmentPool.recycle(head);          }        }      }      @Override public void flush() throws IOException {        out.flush();      }      @Override public void close() throws IOException {        out.close();      }      @Override public Timeout timeout() {        return timeout;      }      @Override public String toString() {        return "sink(" + out + ")";      }    };  }/<code>

上面这段代码中,是构建 Sink 的 3 个重载方法,是Okio 的静态方法,而且他们也是依次调用。第一个告诉了我们其底层的写入也是基于 Java 原生的 I/O ,如 FileOutputStream。第二个加入了 Timeout ,这就使其具备了超时机制。第三个方法是最为关键的,这里才真正生成了 Sink 实例。而在这个实例中,也直接定义出了数据如何写入的,如何关闭流,以及返回哪一个 Timeout 实例。这一波调用总的来说确定了实际的 Sink 对象,以及 Sink 是如何写入的和其应该返回的超时机制。以上是 5 个Sink 构造方法中的其中 3 个,还有另外 2 个也一起来看看吧。

<code>  /** Returns a sink that writes to {@code path}. */  // 通过文件路径构建 Sink 实例  @IgnoreJRERequirement // Should only be invoked on Java 7+.  public static Sink sink(Path path, OpenOption... options) throws IOException {    if (path == null) throw new IllegalArgumentException("path == null");    return sink(Files.newOutputStream(path, options));  }  /**   * Returns a sink that writes to {@code socket}. Prefer this over {@link   * #sink(OutputStream)} because this method honors timeouts. When the socket   * write times out, the socket is asynchronously closed by a watchdog thread.   */  // 通过  Socket 进行构建  public static Sink sink(Socket socket) throws IOException {    if (socket == null) throw new IllegalArgumentException("socket == null");    AsyncTimeout timeout = timeout(socket);    Sink sink = sink(socket.getOutputStream(), timeout);    return timeout.sink(sink);  }/<code>

上面这 2 个 Sink 构造方法,第一个是以文件路径来进行构建,而第二个则是以 Socket 进行构建,并且对于 Socket 的构建中,其 Timeout 是 AsyncTimeout。这里说明了 2 个其本特性。第一、除了对文件进行支持,同时还支持 Socket,第二、超时机制有同步的 Timeout 以及异步的 AsyncTimeout。关于超时机制,后面还会详细来讲,这里先有个概念即可。

构造BufferedSink——Okio#buffer(sink)

<code>/**   * Returns a new sink that buffers writes to {@code sink}. The returned sink   * will batch writes to {@code sink}. Use this wherever you write to a sink to   * get an ergonomic and efficient access to data.   */  public static BufferedSink buffer(Sink sink) {    return new RealBufferedSink(sink);  }/<code>

该方法就是构建了一个 RealBufferedSink 实例。那就来看看这个类。

<code>final class RealBufferedSink implements BufferedSink {  public final Buffer buffer = new Buffer();  public final Sink sink;  boolean closed;  RealBufferedSink(Sink sink) {    if (sink == null) throw new NullPointerException("sink == null");    this.sink = sink;  }......}/<code>

这个类中两个重要的成员变量一个 sink ,由外部传入,一个是 Buffer 由内部直构建。这里的 sink 是前面通过 Okio.sink()方法最终构建的那个匿名类。那么再来看看 Buffer 这个类。

<code>/** * A collection of bytes in memory. * * 

Moving data from one buffer to another is fast. Instead * of copying bytes from one place in memory to another, this class just changes * ownership of the underlying byte arrays. * *

This buffer grows with your data. Just like ArrayList, * each buffer starts small. It consumes only the memory it needs to. * *

This buffer pools its byte arrays. When you allocate a * byte array in Java, the runtime must zero-fill the requested array before * returning it to you. Even if you're going to write over that space anyway. * This class avoids zero-fill and GC churn by pooling byte arrays. */public final class Buffer implements BufferedSource, BufferedSink, Cloneable { private static final byte[] DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; static final int REPLACEMENT_CHARACTER = '\\\\ufffd'; @Nullable Segment head; long size; public Buffer() { }

/<code>

Buffer 类中最重要的成员变量 head 是一个 Segment 类型,Buffer 中的数据结构也正是由 Segment 所构成的一个双向链表。除此之外,关于 Buffer 类,这里的解释也极其重要。大概的意思是说:(1) 这是一个在内存中的字节集合。(2)将一个 Buffer 中的数据移到另一个 Buffer 中去是非常快的。原因是它并不是通过复制来实现,而是从底层字节数组去改变与 Buffer 这个类的所属关系。(3)它就是 ArrayList 一样的,容量自动增长的。一开始是很小的,且仅分配它所需要的内存大小。(4)Buffer池化了它的字节数组。如果直接从Java中分配一个字节数组,使用其必须为其填充 0。而 Buffer 通过池化字节数组从而避免了 0 填充和GC 抖动的情况。上面详细的说明了 Buffer 的特性,当然也是 Okio 的特性。在后面的章节中我们还会继续分析其特性的实现细节。而关于 Buffer 就先了解到这里,再进一步看看Segment。

<code>package okio;import javax.annotation.Nullable;/** * A segment of a buffer. * * 

Each segment in a buffer is a circularly-linked list node referencing the following and * preceding segments in the buffer. * *

Each segment in the pool is a singly-linked list node referencing the rest of segments in the * pool. * *

The underlying byte arrays of segments may be shared between buffers and byte strings. When a * segment's byte array is shared the segment may not be recycled, nor may its byte data be changed. * The lone exception is that the owner segment is allowed to append to the segment, writing data at * {@code limit} and beyond. There is a single owning segment for each byte array. Positions, * limits, prev, and next references are not shared. */final class Segment { /** The size of all segments in bytes. */// 每个 segment 共 8 K字节 static final int SIZE = 8192; /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */// 是否共享底层数据的阈值 static final int SHARE_MINIMUM = 1024;// 存放字节的数组 final byte[] data; /** The next byte of application data byte to read in this segment. */ // 下一个要读的字节的位置 int pos; /** The first byte of available data ready to be written to. */ // 写入的起始位置 int limit; /** True if other segments or byte strings use the same byte array. */ // 表明此 segment 是共享的。那什么时候情况下会被共享呢? boolean shared; /** True if this segment owns the byte array and can append to it, extending {@code limit}. */// 表明这个段是可以追加数据的,而追回的位置是基于 limit。一般情况下都是为 true 的 boolean owner; /** Next segment in a linked or circularly-linked list. */// 双向环形链表的下一个节点 Segment next; /** Previous segment in a circularly-linked list. */// 双向环形链表的前一个节点 Segment prev; Segment() { this.data = new byte[SIZE]; this.owner = true; this.shared = false; } Segment(Segment shareFrom) { this(shareFrom.data, shareFrom.pos, shareFrom.limit); shareFrom.shared = true; } Segment(byte[] data, int pos, int limit) { this.data = data; this.pos = pos; this.limit = limit; this.owner = false; this.shared = true; } /** * Removes this segment of a circularly-linked list and returns its successor. * Returns null if the list is now empty. */// 将自己弹出链表 public @Nullable Segment pop() { ...... } /** * Appends {@code segment} after this segment in the circularly-linked list. * Returns the pushed segment. */// 压入到当前链表 public Segment push(Segment segment) { ...... } /** * Splits this head of a circularly-linked list into two segments. The first * segment contains the data in {@code [pos..pos+byteCount)}. The second * segment contains the data in {@code [pos+byteCount..limit)}. This can be * useful when moving partial segments from one buffer to another. * *

Returns the new head of the circularly-linked list. */// 分割,把数据以 [pos,pos + byteCount] 和 [pos + byteCount,limit] 这两段来进行分割。分割出去的数据依据共享阈值决定数据是否被共享 public Segment split(int byteCount) { ...... } /** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */// 压缩存储,当处于尾节点和它的前一节点的数据都少于一半时,把数据进行压缩存储到一个,以便可以回收调一个 Segment。 public void compact() { ...... }// 移动 byteCount 个字节数据到另一个段中 /** Moves {@code byteCount} bytes from this segment to {@code sink}. */ public void writeTo(Segment sink, int byteCount) { ...... }}

/<code>

来解读一下 Segment 的特性:(1) segment 是 Buffer 中的一个组织单元(2) segment 在 Buffer 中以环形双向链表的数据结构来存储(3) segment 在 SegmentPool 中以单链接表数据结构来存储(4) segment 可以在 Buffer 和 ByteStrings 之间进行共享。对于共享的 segment 不可以回收,也不可以修改其中的数据。但例外的是,段的拥有者可以在 limit 之后追回数据。对于这个段来说,底层的字节数组是共享的,但position,limits,prev以及next这些是不共享的。此外,注释里有对每一个属性的详细解释以及个人的理解。为了方便更容易理解,简单画了一个 Segment 的数据模型图来帮助我们。如下所示。


Okio深入分析—源码分析部分

image.png


关于 Segment 就先了解到这里,其中的方法 split(),compact()以及 writeTo() 后面还会详细说明。接下来继续看看 SegmentPool。

<code>package okio;import javax.annotation.Nullable;/** * A collection of unused segments, necessary to avoid GC churn and zero-fill. * This pool is a thread-safe static singleton. */// 主要是用于收集未使用的 Segments,避免了内存抖动以及0填充。同时,这个 Pool 也是一个线程安全的静态单例类。final class SegmentPool {  /** The maximum number of bytes to pool. */  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?// 一个池子最大存储 64 KB 字节  static final long MAX_SIZE = 64 * 1024; // 64 KiB.  /** Singly-linked list of segments. */// 表明其是一个单向链表  static @Nullable Segment next;  /** Total bytes in this pool. */// 当前的总数据大小  static long byteCount;  private SegmentPool() {  }  static Segment take() {    synchronized (SegmentPool.class) {      if (next != null) {        Segment result = next;        next = result.next;        result.next = null;        byteCount -= Segment.SIZE;        return result;      }    }    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.  }  static void recycle(Segment segment) {    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();    if (segment.shared) return; // This segment cannot be recycled.    synchronized (SegmentPool.class) {      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.      byteCount += Segment.SIZE;      segment.next = next;      segment.pos = segment.limit = 0;      next = segment;    }  }}/<code> 

这个类其实挺简单的,take() 方法中就是获取一个 Segment ,如果池子中有就取一个出来,如果没有就给 new 一个。而 recycle() 方法,如果池子还没涨到 64 KB 的限制,且 Segment 不是共享的,那就把它加到链接里面去。

小结通过对 Sink 以及 BufferedSink 的构造,可以说是对 Okio 整个使用环境的一个初始化。同时,在这一段也详细介绍了其相关的核心概念,Buffer,Segment,SegmentPool 等。有了这些基本的核心概念,我们就可以进一步分析字节的写入了。

写入数据——RealBufferedSink#writeUtf8()

<code>  @Override public BufferedSink writeUtf8(String string) throws IOException {    if (closed) throw new IllegalStateException("closed");    buffer.writeUtf8(string);    return emitCompleteSegments();  }/<code>

进一步调用了 Buffer 的 writeUtf8() 和 emitCompleteSegments(),那么就往下看Buffer#writeUft8() 方法。代码很长,但关键的是 2 个调用,writableSegment(1) 和 writeByte(),其他的都是 utf-8 编码相关。

<code>  @Override public Buffer writeUtf8(String string) {    return writeUtf8(string, 0, string.length());  }  @Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {    ......    // Transcode a UTF-16 Java String to UTF-8 bytes.    for (int i = beginIndex; i < endIndex;) {      int c = string.charAt(i);      if (c < 0x80) {        Segment tail = writableSegment(1);        byte[] data = tail.data;        int segmentOffset = tail.limit - i;        int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);        // Emit a 7-bit character with 1 byte.        data[segmentOffset + i++] = (byte) c; // 0xxxxxxx        // Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance        // improvement over independent calls to writeByte().        while (i < runLimit) {          c = string.charAt(i);          if (c >= 0x80) break;          data[segmentOffset + i++] = (byte) c; // 0xxxxxxx        }        int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).        tail.limit += runSize;        size += runSize;      } else if (c < 0x800) {        // Emit a 11-bit character with 2 bytes.        writeByte(c >>  6        | 0xc0); // 110xxxxx        writeByte(c       & 0x3f | 0x80); // 10xxxxxx        i++;      } else if (c < 0xd800 || c > 0xdfff) {        // Emit a 16-bit character with 3 bytes.        writeByte(c >> 12        | 0xe0); // 1110xxxx        writeByte(c >>  6 & 0x3f | 0x80); // 10xxxxxx        writeByte(c       & 0x3f | 0x80); // 10xxxxxx        i++;      } else {        // c is a surrogate. Make sure it is a high surrogate & that its successor is a low        // surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.        int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;        if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {          writeByte('?');          i++;          continue;        }        // UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)        // UTF-16 low surrogate:  110111yyyyyyyyyy (10 bits)        // Unicode code point:    00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)        int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);        // Emit a 21-bit character with 4 bytes.        writeByte(codePoint >> 18        | 0xf0); // 11110xxx        writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx        writeByte(codePoint >>  6 & 0x3f | 0x80); // 10xxyyyy        writeByte(codePoint       & 0x3f | 0x80); // 10yyyyyy        i += 2;      }    }    return this;  }/<code>

Buffer有两个重载版本,第一个调用了第二个,主要就是确定了起始位置为0,而写入的数据为全部数据。而第二个是具体的写入,这段代码比较长,其实不应该贴出来的,但我看到它对 Utf8 的编码十分的优秀,所以还是贴出来,有时间的可以仔细读一读。当然,在这里我们只关注其中最关键的几句即可。如前面所说,这里面有两个进一步调用的方法分别是 writableSegment(1) 和 writeByte(),而 writeByte() 里面也有对 writableSegment() 的调用。那我们先来看看 writableSegment() 方法的实现。

<code>/**   * Returns a tail segment that we can write at least {@code minimumCapacity}   * bytes to, creating it if necessary.   */  Segment writableSegment(int minimumCapacity) {    ......    if (head == null) {      head = SegmentPool.take(); // Acquire a first segment.      return head.next = head.prev = head;    }    Segment tail = head.prev;    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.    }    return tail;  }/<code>

这个方法的主要含义就是如果head为空,则说明当前的链接还是空的,那么就新建一个节点,此时头尾相等。如果不为空就取出尾节点,即头节点的前向节点。而尾节点的空间不能再装下当前要写入的字节数时,就会创建一个新的节点,并把新建的节点作为尾节点,然后继续写入。再来看看 writeByte()。

<code>  @Override public Buffer writeByte(int b) {    Segment tail = writableSegment(1);    tail.data[tail.limit++] = (byte) b;    size += 1;    return this;  }/<code>

这里就是把数据写入到 Segment 的 data[] 中。对于一次 writeUtf8() 的调用来看,目前为止是把数据全部都写入到了 Segment 中,同时也是内存中,还没有发生实际的 I/O 的。接下来继续看 RealBufferedSink#emitCompleteSegments()方法。

<code>  @Override public BufferedSink emitCompleteSegments() throws IOException {    if (closed) throw new IllegalStateException("closed");    long byteCount = buffer.completeSegmentByteCount();    if (byteCount > 0) sink.write(buffer, byteCount);    return this;  }/<code>

这里先是调用了 Buffer#completeSegmentByteCount()确定需要从 buffer 中实际取多少字节写入到实际的输出流中。

<code>  /**   * Returns the number of bytes in segments that are not writable. This is the   * number of bytes that can be flushed immediately to an underlying sink   * without harming throughput.   */  public long completeSegmentByteCount() {    long result = size;    if (result == 0) return 0;    // Omit the tail if it's still writable.    Segment tail = head.prev;    if (tail.limit < Segment.SIZE && tail.owner) {      result -= tail.limit - tail.pos;    }    return result;  }/<code>

上面代码的意思是用当前总的 size 大小 减去 尾节点已写入的字节数,也就是除尾节点,其他的都会被立即写入到底层的输出流中。这说明了不足 8K 的数据写入,在 close() 之前是不会发生实际 I/O 的。接下来,进一步调用了 sink#write()。这个 sink 就是我们在前面通过 Okio#sink() 方法所创建出来的匿名类。 为了连贯性,我们还是把写入相关的代码再贴一下以便于进一步的阅读与分析。

<code>@Override public void write(Buffer source, long byteCount) throws IOException {        ......        while (byteCount > 0) { // 循环,直到数据全部写入          timeout.throwIfReached(); // 超时判断          Segment head = source.head; // 总是取头结点         // 确定当次需要写入的字节数,当前 segment 中可写入的数据与剩余字节数,取最小的那个          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);          out.write(head.data, head.pos, toCopy);//写入数据到输出流                  // 更新各位置          head.pos += toCopy; // 读指针往前移 toCopy 个          byteCount -= toCopy; // 写入字节数减 toCopy 个          source.size -= toCopy; // Buffer 中大小减 toCopy 个          // 如果当前 segment 的数据已经全部写入到输出流,那么将其弹出双向循环链接,并将其回收到 SegmentPool 中          if (head.pos == head.limit) {            source.head = head.pop();            SegmentPool.recycle(head);          }        }/<code>

贴出来的代码中,已经作了非常详细的说明,因为这里的每一句都很重要,都需要我们去理解。当然,总的来说其实也只有 2 个关键点。其一,超时机制。

<code>/**   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to   * asynchronously abort an in-progress operation.   */  public void throwIfReached() throws IOException {    if (Thread.interrupted()) {      throw new InterruptedIOException("thread interrupted");    }    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {      throw new InterruptedIOException("deadline reached");    }  }/<code>

根据这里的注释,以及分析 hasDeadLine 和 deadlineNanoTime 的赋值情况来看,超时的检测只应该发生在异步的情况下。纳尼!!!也就是说同步超时机制是不检查的。其二,就是数据的写入以及各位位置标记的置位和Segment的回收。

关闭——RealBufferedSink#close()

<code>@Override public void close() throws IOException {    if (closed) return;    // Emit buffered data to the underlying sink. If this fails, we still need    // to close the sink; otherwise we risk leaking resources.    Throwable thrown = null;    try {      if (buffer.size > 0) {        sink.write(buffer, buffer.size);      }    } catch (Throwable e) {      thrown = e;    }    try {      sink.close();    } catch (Throwable e) {      if (thrown == null) thrown = e;    }    closed = true;    if (thrown != null) Util.sneakyRethrow(thrown);  }/<code>

close() 也就是将 buffer 中剩余的数据都写入到底层输出流中,最后再调用 sink#close()关闭底层的输出流。至此,写入数据可以说大致分析完了。因为与 writeUtf8() 同级别的其他 write方法,其基本原理都是类似的。通过分析我们知道,其是先将数据都写入到 Buffer 中,然后才将数据写入到实际的 I/O 中,以此减少实际的 I/O。而分析到这里,我们应该也注意到,前面所说的 Segment 的 splite,compact以及share 这些优化内存的方法都没有被提及过。这就接下来要分析的读取与另一不同级别的写入。

三、读取

案例分析

关于读取,我们同样也从案例分析开始。先来看看读取的代码。通过 Okio 的 BufferSource 进行的读取代码

<code>private void testSource(File file) {        try {            BufferedSource bufferedSource = Okio.buffer(Okio.source(file));            System.out.println("Okio source test start");            long ts = System.currentTimeMillis();            byte[] buffer = new byte[8 * 1024];            while(bufferedSource.read(buffer) > 0) {            }            bufferedSource.close();            System.out.println("Okio source test end");            System.out.println(" source test " + (System.currentTimeMillis() - ts));        } catch (FileNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }/<code>

通过 Java 的 BufferedReader 进行的读取代码

<code>private void testJava(File file) {        try {            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));            System.out.println("java test start");            long ts = System.currentTimeMillis();            char buffer[] = new char[8 * 1024];            while(bufferedReader.read(buffer) > 0) {            }            System.out.println("java test end");            System.out.println("java test " + (System.currentTimeMillis() - ts));            bufferedReader.close();        } catch (FileNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }/<code>

同样来看看测试结果,不过这里偷了个懒,只测试了PC端的,没有测试手机端。因为平台不影响结果。文件就是上面写入的文件,大小 2.3 G。


Okio深入分析—源码分析部分


对比结果很明显,Okio#BufferSource 的读取速度明显大于 Java 原生的2 倍多。但是,为什么呢?另外有一个细节,上面的 buffer 大小只设置了 8 * 1024 ,也就是 8KiB,这是因为 Okio 的 segment 为 8 KiB,其一次读取最大也只读取 8 KiB

源码分析

有了前面案例分析的基本认知,接下来就是我们的源码分析与理解了。同样,先画出时序图,再逐层进行分解并深入。


Okio深入分析—源码分析部分

OkioSource.jpg


从时序图上可以看到,前面几个步骤都是相同的,如 Buffer,Segment,Timeout 这些概念和构造都是一样的。就不重复介绍了。主要看看 Source 和 RealBufferSource 的构造吧。

构造Source——Okio#source(file)

<code>  /** Returns a source that reads from {@code file}. */  public static Source source(File file) throws FileNotFoundException {    if (file == null) throw new IllegalArgumentException("file == null");    return source(new FileInputStream(file));  }  /** Returns a source that reads from {@code in}. */  public static Source source(InputStream in) {    return source(in, new Timeout());  }private static Source source(final InputStream in, final Timeout timeout) {    if (in == null) throw new IllegalArgumentException("in == null");    if (timeout == null) throw new IllegalArgumentException("timeout == null");    return new Source() {      @Override public long read(Buffer sink, long byteCount) throws IOException {        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);        if (byteCount == 0) return 0;        try {          timeout.throwIfReached();          Segment tail = sink.writableSegment(1);          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);          if (bytesRead == -1) return -1;          tail.limit += bytesRead;          sink.size += bytesRead;          return bytesRead;        } catch (AssertionError e) {          if (isAndroidGetsocknameError(e)) throw new IOException(e);          throw e;        }      }      @Override public void close() throws IOException {        in.close();      }      @Override public Timeout timeout() {        return timeout;      }      @Override public String toString() {        return "source(" + in + ")";      }    };  }/<code>

上面的代码看着多,但和 Sink 的构造几乎一样。这里的重点也构造了一个匿名的 Source 接口的实例,以完成实际从输入中的读取。同样的,Source 也是支持 Socket 的,顺便也一起来看看吧。

<code>/**   * Returns a source that reads from {@code socket}. Prefer this over {@link   * #source(InputStream)} because this method honors timeouts. When the socket   * read times out, the socket is asynchronously closed by a watchdog thread.   */  public static Source source(Socket socket) throws IOException {    if (socket == null) throw new IllegalArgumentException("socket == null");    AsyncTimeout timeout = timeout(socket);    Source source = source(socket.getInputStream(), timeout);    return timeout.source(source);  }/<code>

支持 Socket 也是比较简单的,即从 socket 中获取输入流以便读取数据,同时这里也设定了 Timeout 为 AsyncTimeout。关于 AsyncTimeout 就放到后面专门的分节里来讲解。

构造 BufferdSource —— Okio#buffer(Source)

<code>  /**   * Returns a new source that buffers reads from {@code source}. The returned   * source will perform bulk reads into its in-memory buffer. Use this wherever   * you read a source to get an ergonomic and efficient access to data.   */  public static BufferedSource buffer(Source source) {    return new RealBufferedSource(source);  }/<code>

构造 RealBufferedSource 实例,继续看。

<code>final class RealBufferedSource implements BufferedSource {  public final Buffer buffer = new Buffer();  public final Source source;  boolean closed;  RealBufferedSource(Source source) {    if (source == null) throw new NullPointerException("source == null");    this.source = source;  }....../<code>

同样是组合了 Source 和 Buffer 这两个实例。Source 就是前面通过 Okio.source()方法最终构建的那个匿名类。而 Buffer 和其内部聚合的 Segment 都已经介绍过了。所以直接来看 read() 方法吧。

读取数据——RealBufferedSource#read()

<code>  @Override public int read(byte[] sink) throws IOException {    return read(sink, 0, sink.length);  } @Override public int read(byte[] sink, int offset, int byteCount) throws IOException {    checkOffsetAndCount(sink.length, offset, byteCount);    if (buffer.size == 0) {      long read = source.read(buffer, Segment.SIZE);      if (read == -1) return -1;    }    int toRead = (int) Math.min(byteCount, buffer.size);    return buffer.read(sink, offset, toRead);  }/<code>

read() 是个重载方法,我们调用的是第一个,然后它会进一步调用指定参数 offset 和 byteCount 的 read() 方法。在这个方法里,buffer.size 为 0 说明当前 buffer 中为没有数据,即数据已经被从buffer 中读出来了。如果不为0,说明 buffer 中还有数据那么就进一步读取剩下的或者剩余的数据。这里最初肯定是要走为 0 的情况。这里调用的是 source.read(),也即前面匿名类 Source 的。注意,这里传入的参数中除了 buffer ,还有 Segment.SIZE,即 8KiB。这里为了方便分析,也重复贴一下 read() 方法的相关代码。

<code>@Override public long read(Buffer sink, long byteCount) throws IOException {       .....       ......        try {          timeout.throwIfReached();          Segment tail = sink.writableSegment(1);          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);          if (bytesRead == -1) return -1;          tail.limit += bytesRead;          sink.size += bytesRead;          return bytesRead;        } catch (AssertionError e) {          if (isAndroidGetsocknameError(e)) throw new IOException(e);          throw e;        }      }/<code>

同步 Timeout.throwIfReached() 前面已经分析过了,是不起作用的,其实现在来看看也能明白过来,不管是在读还是写的时候,它都没有设置起始时间,那它又如何能计算出所消耗的时间呢?而 Buffer#writeableSegment() 在前面也分析过了,主要是返回一个 Segment。然后就是实际从文件读出数据并只保存在 Segment 里面。

在 RealBufferedSource#read() 中,走了不为 0 的情况后,就是将Buffer中的数据按需要读到内存中返回给调用者。

有了前面写入的分析,再来看读取显然要简单的多了。至此,读取和写入的最基本流程都分析完了。Okio 为打包了很多的读取和写入的方法,这里只分析了其中最基本的,当然有了这些最基本的认知后,再来理解基本的就容易许多了。

三、读取到写入,写入自读取

除了基本的读取与写入,还可以将读取与写入直接串起来。也即读取出来数据可以直接送进写入,而写入的内容也可以是来自 Source 。先来看看写入自读取吧。

<code>@Override public long writeAll(Source source) throws IOException {    if (source == null) throw new IllegalArgumentException("source == null");    long totalBytesRead = 0;    for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {      totalBytesRead += readCount;      emitCompleteSegments();    }    return totalBytesRead;  }  @Override public BufferedSink write(Source source, long byteCount) throws IOException {    while (byteCount > 0) {      long read = source.read(buffer, byteCount);      if (read == -1) throw new EOFException();      byteCount -= read;      emitCompleteSegments();    }    return this;  }/<code>

上面两段代码就是将数据从 Source 读出,再写入到Buffer 中去。这里需要关注的有两个事情:source实例 :这里的 Source 是 RealBufferdSource 实例。关键调用 : 这里的关键调用是 RealBufferdSource#read(Buffer,byteCount)。来看看其实现。

<code>@Override public long read(Buffer sink, long byteCount) throws IOException {    ......    return buffer.read(sink, toRead);  }/<code>

调用了 Buffer#read(Buffer,toRead)

<code>  @Override public long read(Buffer sink, long byteCount) {   ......    sink.write(this, byteCount);    return byteCount;  }/<code>

看到这里可能会有点晕,所以强调一下,这里的 sink 是 RealBufferdSink 中的 Buffer,而 this 是 RealBufferedSource 中的 buffer。意思就是将 RealBufferdSource 中的 buffer 的数据写入到 RealBufferdSink 中的 buffer 中去。 下面来看看具体的实现,其实代码并不长,注释就占了一大半,为了完整性,我并没有将其删除去。

<code>@Override public void write(Buffer source, long byteCount) {    // Move bytes from the head of the source buffer to the tail of this buffer    // while balancing two conflicting goals: don't waste CPU and don't waste    // memory.    //    //    // Don't waste CPU (ie. don't copy data around).    //    // Copying large amounts of data is expensive. Instead, we prefer to    // reassign entire segments from one buffer to the other.    //    //    // Don't waste memory.    //    // As an invariant, adjacent pairs of segments in a buffer should be at    // least 50% full, except for the head segment and the tail segment.    //    // The head segment cannot maintain the invariant because the application is    // consuming bytes from this segment, decreasing its level.    //    // The tail segment cannot maintain the invariant because the application is    // producing bytes, which may require new nearly-empty tail segments to be    // appended.    //    //    // Moving segments between buffers    //    // When writing one buffer to another, we prefer to reassign entire segments    // over copying bytes into their most compact form. Suppose we have a buffer    // with these segment levels [91%, 61%]. If we append a buffer with a    // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.    //    // Or suppose we have a buffer with these segment levels: [100%, 2%], and we    // want to append it to a buffer with these segment levels [99%, 3%]. This    // operation will yield the following segments: [100%, 2%, 99%, 3%]. That    // is, we do not spend time copying bytes around to achieve more efficient    // memory use like [100%, 100%, 4%].    //    // When combining buffers, we will compact adjacent buffers when their    // combined level doesn't exceed 100%. For example, when we start with    // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].    //    //    // Splitting segments    //    // Occasionally we write only part of a source buffer to a sink buffer. For    // example, given a sink [51%, 91%], we may want to write the first 30% of    // a source [92%, 82%] to it. To simplify, we first transform the source to    // an equivalent buffer [30%, 62%, 82%] and then move the head segment,    // yielding sink [51%, 91%, 30%] and source [62%, 82%].    if (source == null) throw new IllegalArgumentException("source == null");    if (source == this) throw new IllegalArgumentException("source == this");    checkOffsetAndCount(source.size, 0, byteCount);    while (byteCount > 0) {      // Is a prefix of the source's head segment all that we need to move?      if (byteCount < (source.head.limit - source.head.pos)) {        Segment tail = head != null ? head.prev : null;        if (tail != null && tail.owner            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {          // Our existing segments are sufficient. Move bytes from source's head to our tail.          source.head.writeTo(tail, (int) byteCount);          source.size -= byteCount;          size += byteCount;          return;        } else {          // We're going to need another segment. Split the source's head          // segment in two, then move the first of those two to this buffer.          source.head = source.head.split((int) byteCount);        }      }      // Remove the source's head segment and append it to our tail.      Segment segmentToMove = source.head;      long movedByteCount = segmentToMove.limit - segmentToMove.pos;      source.head = segmentToMove.pop();      if (head == null) {        head = segmentToMove;        head.next = head.prev = head;      } else {        Segment tail = head.prev;        tail = tail.push(segmentToMove);        tail.compact();      }      source.size -= movedByteCount;      size += movedByteCount;      byteCount -= movedByteCount;    }  }/<code>

通过注释,可以明白,这个方法的核心是在讲:(1) 这个方法主要完成的是从 Source 的首部将数据移入到 Sink 的尾部(2) 移动过程中需要平衡两个重要的指标,优化 CPU 资源,优化内存资源。(3) 移动数据是通过改变引用指向,而不是数据的复制(4) Segment 合并,是说前一个 Segment 与当前的 Segment 数据大小之和如果没有超过 100%,那么就会将当前的 Segment 合并到前一个中去。看看下面的代码,你可能会更加的清楚。

<code>/**   * Call this when the tail and its predecessor may both be less than half   * full. This will copy data so that segments can be recycled.   */  public void compact() {    if (prev == this) throw new IllegalStateException();    if (!prev.owner) return; // Cannot compact: prev isn't writable.    int byteCount = limit - pos;    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.    writeTo(prev, byteCount);    pop();    SegmentPool.recycle(this);  }/<code>

(5) 分割 Segment,对于有一些 Segment ,如果只是部分被读取出来,那么可以通过将其分割成 2 个 Segment ,然后取需要的那个 Segment 加入到当前的 Buffer 中。

<code>/**   * Splits this head of a circularly-linked list into two segments. The first   * segment contains the data in {@code [pos..pos+byteCount)}. The second   * segment contains the data in {@code [pos+byteCount..limit)}. This can be   * useful when moving partial segments from one buffer to another.   *   * 

Returns the new head of the circularly-linked list. */ public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }

/<code>

明白了这个方法的主要意图,再来分析具体的代码。分两类情况:其一,当要读取的数据大小小于 source.head 中的字节数时,如果当前 sink.head 不为 null,则将 source.head 的数据都写入到 sink.tail 中去,当然也是其最后的要读取的数据了,即 soruce.head.writeTo();而当 sink.head 为空时,则先将 source.head 再 byteCount 大小进行数据的分割,再进入到第二种情况。而在分割时,如果需要的数据大于分割的阈值,那么被分割出来的 Segment 就会被共享成同一个 Segment。其二,当要读取的数据大小大于等于 source.head 字节时,如果当前 sink.head 为 null ,那么就让 source.head 从 source.buffer 中取出,并让 sink.head 直接指向 source.head 即可。而如果当前 sink.head 不为空,则将取出来的 segment 并到当前 sink.buffer 的尾部。根据前面的分析,如果尾部的数据与其前一个数据大小之和没有达到 100% ,那就可以进行合并。

以上便是 Segment 的合并与分割了。Okio 在这里同时考虑到了 CPU 和内存的优化,并且实现的非常微妙。

同时上面有部分也提到了 Segment 的共享,从整个源码以及分析来看,共享主要的场景就是发生在复制时以及可能的分割时。当然,最重要的是分割时了。而关于复制时的共享,其实也是非常符合逻辑了。即使被标记成分享的 Segment 不能再写入,但是对于内存的利用来上讲也是非常优化的。

四、异步的超时机制

异步超时机制主要只是用于 Socket 通信,这里也重复贴一下 Source 的代码,Sink 差不多。

<code>  /**   * Returns a source that reads from {@code socket}. Prefer this over {@link   * #source(InputStream)} because this method honors timeouts. When the socket   * read times out, the socket is asynchronously closed by a watchdog thread.   */  public static Source source(Socket socket) throws IOException {    if (socket == null) throw new IllegalArgumentException("socket == null");    AsyncTimeout timeout = timeout(socket);    Source source = source(socket.getInputStream(), timeout);    return timeout.source(source);  }/<code>

如前面所说,Socket 通信时,主要是拿出了其 InputStream,以便于读取数据。而这里的 Timeout 是其子类 AsyncTimeout。当通过重载方法 source() 构造出了 Source 之后,进一步送进了 AsyncTimeout#source() 方法中

<code>/**   * Returns a new source that delegates to {@code source}, using this to implement timeouts. This   * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.   */  public final Source source(final Source source) {    return new Source() {      @Override public long read(Buffer sink, long byteCount) throws IOException {        boolean throwOnTimeout = false;        enter();        try {          long result = source.read(sink, byteCount);          throwOnTimeout = true;          return result;        } catch (IOException e) {          throw exit(e);        } finally {          exit(throwOnTimeout);        }      }      @Override public void close() throws IOException {        boolean throwOnTimeout = false;        try {          source.close();          throwOnTimeout = true;        } catch (IOException e) {          throw exit(e);        } finally {          exit(throwOnTimeout);        }      }      @Override public Timeout timeout() {        return AsyncTimeout.this;      }      @Override public String toString() {        return "AsyncTimeout.source(" + source + ")";      }    };  }/<code>

Okio#source()主要是定义了如何读取数据,而AsyncTimeout#source()定义了读取数据的超时机制。在 read() 方法中,主要体现在 enter() 以及 exit() 这两个方法上。

思考一下当前的场景,当前线程正在读取数据,那管理超时必然需要另一个线程。在 AsyncTimeout 中就定义了 WatchDog 线程,来监测超时的读写。

WatchDog 线程,俗名 “看门狗”,其在嵌入式方面应用非常多。而Android系统中的SystemServer是否存在耗时操作的监测也是由一个 WatchDog 线程来实现的。此处的 WatchDog 线程是由第一次 enter() 进入所触发的,起用后便被设置成守护线程运行,并且线程以链表的形式来管理。与此同时, enter() 也会添加一个新的 AsyncTimeout 节点到 WatchDog 线程中。而 exit() 则会检查节点是否还在链表中,否则时间到了不在链表中就会被认为是超时状态了,从而抛出超时的异常。具体我们来看看代码的实现。

<code>public final void enter() {    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");    long timeoutNanos = timeoutNanos();    boolean hasDeadline = hasDeadline();    if (timeoutNanos == 0 && !hasDeadline) {      return; // No timeout and no deadline? Don't bother with the queue.    }    inQueue = true;    scheduleTimeout(this, timeoutNanos, hasDeadline);  }/<code>

同 enter() 调用 scheduleTimeout() ,调度起一个 Timeout。

<code>private static synchronized void scheduleTimeout(      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {    // Start the watchdog thread and create the head node when the first timeout is scheduled.// 链表为空说明 Watchdog 还没有启动起来,这里添加了头结点,并且启动了 watchdog 线程。    if (head == null) {      head = new AsyncTimeout();      new Watchdog().start();    }// 分不同的条件讲计算终止时间    long now = System.nanoTime();    if (timeoutNanos != 0 && hasDeadline) {      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,      // Math.min() is undefined for absolute values, but meaningful for relative ones.      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);    } else if (timeoutNanos != 0) {      node.timeoutAt = now + timeoutNanos;    } else if (hasDeadline) {      node.timeoutAt = node.deadlineNanoTime();    } else {      throw new AssertionError();    }// 按剩余时间升序排序    // Insert the node in sorted order.    long remainingNanos = node.remainingNanos(now);    for (AsyncTimeout prev = head; true; prev = prev.next) {      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {        node.next = prev.next;        prev.next = node;        if (prev == head) {          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.        }        break;      }    }  }/<code>

以上便是插入了一个 AsyncTimeout 的节点。再来看看 Watchdog 线程是如何监测超时的。

<code>private static final class Watchdog extends Thread {    Watchdog() {      super("Okio Watchdog");      setDaemon(true);    }    public void run() {      while (true) {        try {          AsyncTimeout timedOut;          synchronized (AsyncTimeout.class) {            timedOut = awaitTimeout();            // Didn't find a node to interrupt. Try again.            if (timedOut == null) continue;            // The queue is completely empty. Let this thread exit and let another watchdog thread            // get created on the next call to scheduleTimeout().            if (timedOut == head) {              head = null;              return;            }          }          // Close the timed out node.          timedOut.timedOut();        } catch (InterruptedException ignored) {        }      }    }  }/<code>

重点在 awaitTimeout (),即等待一下,看看是否有 AsyncTimeout 超时了。

<code>/**   * Removes and returns the node at the head of the list, waiting for it to time out if necessary.   * This returns {@link #head} if there was no node at the head of the list when starting, and   * there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a   * new node was inserted while waiting. Otherwise this returns the node being waited on that has   * been removed.   */  static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {    // Get the next eligible node.    AsyncTimeout node = head.next;    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.    if (node == null) {      long startNanos = System.nanoTime();      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS          ? head  // The idle timeout elapsed.          : null; // The situation has changed.    }    long waitNanos = node.remainingNanos(System.nanoTime());    // The head of the queue hasn't timed out yet. Await that.    if (waitNanos > 0) {      // Waiting is made complicated by the fact that we work in nanoseconds,      // but the API wants (millis, nanos) in two arguments.      long waitMillis = waitNanos / 1000000L;      waitNanos -= (waitMillis * 1000000L);      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);      return null;    }    // The head of the queue has timed out. Remove it.    head.next = node.next;    node.next = null;    return node;  }/<code>

首先找到剩余时间最小的那个节点,按照前面升序排序,也就是 head.next 节点。然后再检查剩余时间是否大于 0 ,说明还没有超时,这时便将等待到其剩余时间,并且返回空,表示并未超时。这里提一下的是 class.wait(time) 方法,如果时间没有到,收到对应 class 的 notify()/notifyAll() 也是会提前被唤醒的。假设这里并没有超时,最后再来看看 exit()。

<code>/**   * Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See   * {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown.   */  final void exit(boolean throwOnTimeout) throws IOException {    boolean timedOut = exit();    if (timedOut && throwOnTimeout) throw newTimeoutException(null);  }  /**   * Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout   * occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception   * returned.   */  final IOException exit(IOException cause) throws IOException {    if (!exit()) return cause;    return newTimeoutException(cause);  }  /** Returns true if the timeout occurred. */  public final boolean exit() {    if (!inQueue) return false;    inQueue = false;    return cancelScheduledTimeout(this);  }/<code>

exit() 方法又重载了 3 个,很显然,前面 2 个主要是调用第 3 个来判断是否超时以便进一步的抛出超异常。这里主要来看看第 3 个。其又进一步调用了 cancelScheduledTimeout() 方法,这是与 scheduledTimeout()相对应的方法。

<code>/** Returns true if the timeout occurred. */  private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {    // Remove the node from the linked list.    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {      if (prev.next == node) {        prev.next = node.next;        node.next = null;        return false;      }    }    // The node wasn't found in the linked list: it must have timed out!    return true;  }/<code>

从链表中寻找当前结点,如果找到则从链接中移除并表示没有超时。如果没有找到,则表示已经超时了。

至此,异步超时机制分析完毕。如果看前面的分析还有点晕,不防来看看总结:

(1) 管理超时机制的数据结构是一个升序链表,而管理这个链表的是守护线程 Watchdog ,它会每次选择一个节点,并通过剩余时间的等待来等待当前 AsyncTimeout 的超时与否。

(2) 通过 enter() 方法,再经过 scheduleTimeout() 方法添加一个 AsyncTimeout节点

(3) 通过 exit() 方法,再经过 cancleScheduleTimeut() 方法将未超时的 AsyncTimeout 从链表中移除。移除后,当 Watchdog 进入下一次遍历时就不会再遍历到被它等待超时的那个节点了。

五、总结

感谢你能读到并读完这篇文章,对于 Okio 的全部分析到此为止了。文章确实贴了不少的源码,但我仍然觉得是有必要的,它能使我们读起来更加的通畅以及完整。当然,也许还有更好的表述方式。本文并没有分析到了 Okio 所有的方方面面,但我相信能够认真读完并且理解其中大部分的意思,再去分析其他部分应该也非常简单了。

最后,由于本人水平有限,在分析过程中难免会有错误,表述上也会存在不尽人意的地方。对于一切问题,欢迎下方留言讨论,不甚感激。


分享到:


相關文章: