I/O回顾系列(二)Okio
# 前言
上一篇文章回顾了Buffer I/O的读写优化策略,这一篇文章主要是从Okio的源码入手,看一下Okio它解决了什么问题,如何解决的
# Segment
[Segment-> Field]
final class Segment {
// 一个segment可存储的最大字节数
static final int SIZE = 8192;
// 最小分享数据量
static final int SHARE_MINIMUM = 1024;
// 存储具体数据的数组
final byte[] data;
// 有效数据索引起始位置
int pos;
// 有效数据索引结束位置
int limit;
// 指示Segment是否为共享状态
boolean shared;
// 标识该segment是否是字节数组的所有者(所有者可以修改,不是所有者是只读权限)
boolean owner;
// 指向下一个Segment
Segment next;
// 指向前一个Segment
Segment prev;
}
除了用来存储具体数据的byte[]数据外,以 pos ~ limit 来标记有效的数据范围,0 ~ pos代表的是已读部分,pos ~ limit代表的是已读未写部分,limit ~ Segment.SIZE代表未用部分,除此之外,Segment还提供了分割和合并的操作,在将Segment分割成两个Segment时,就会进行数据分享,即使用相同的byte[] 数组,只不过 pos ~ limit 标记不同罢了,而合并
owner和shared的关系为:
- owner = true && shared = true : 数组 [0, limit-1] 范围内只读不可写,可写范围 [limit, SIZE] ,写大小为 SIZE - limit
- owner = true && shared = false : 整个数组 data 都是可读可写的,可以把可读数据在数组中随意移动,可写大小为 SIZE - (limit - pos)
- owner = false 时 shared 只能是 true, Segment 持有的数据不可以做任何修改,只读不写
[Segment-> split]
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
// 1
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
// 2
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
步骤1:如果需分割的大小大于1024则进行进行数据共享,否则就从池中获取一个空的segment,然后拷贝过去
步骤2:更新当前segment.pos与新的segment.limit,共享的数据就读完了,然后就把新节点加在当前节点的后面
小结:
- spilt是Okio中将一个segment拆分成两个segment的方法,参数指定拆分后第一个segemnt含有的未读字节数
- 被分割的第一个segment包含的数据范围[pos,pos+ byteCount),第二个segment包含的数据范围[pos+byteCount,limit)
优化目标:
- 避免拷贝数据带来的性能开销
- 而阀值的设定目的就是避免短数据构建成一条长链
[Segment-> concact]
public void compact() {
// 1
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
// 2
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
// 3
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
步骤1:对前一个节点进行有效校验,条件为存在并且可写入
步骤2:byteCount = 当前segment的可读长度,avaliableByteCount = 上一个segment的可用字节数,这里有个小细节就是在可用字节数的计算上,由于已读区域是可以重新写入的,所以要加上已读区域的大小
步骤3:把当前segment可读部分拷贝到上一个segment,然后把当前segment从链表中移除,最后回收到segmentPool
[Segment-> sharedCopy]
final Segment sharedCopy() {
shared = true;
return new Segment(data, pos, limit, true, false);
}
这个方法标记当前segment为分享模式并创建一个分享的segment
[Segment-> unShareCopy]
Segment unsharedCopy() {
return new Segment(data.clone(), pos, limit, false, true);
}
非分享状态直接通过数组clone创建一个新的segment
[Segment-> writeTo]
public final void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
// 1
if (sink.limit + byteCount > SIZE) {
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
// 2
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
步骤1:当前情况为可用部分小于需拷贝的部分,先把pos ~ limit的有效部分挪动到0 ~ limit的位置,也就是把已读部分移走,然后重新给pos和limit赋值
步骤2:读取当前 segment 数据并写入目标 segment (sink) 中
# SegmentPool
[SegmentPool-> Field]
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
static long byteCount; // 总字节数
static @Nullable Segment next; // segment单链表
SegmentPool的目的是减小Segment对象的创建和销毁带来的开销,而实现的原理也相对比较简单,那就是用一个单链表把空闲的Segment串起来,而最大的容量为64K
[SegmentPool-> take]
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
这个方法会在单链表中找到第一个空闲的节点并返回,如果木有那直接创建一个就可以了
[Segment-> recycle]
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
这个方法会首先对回收的segment进行校验,然后如果segment被共享或者池满了,就不执行操作了,而回收的的节点就以头插的方式插入到单链表
# Source
public interface Source extends Closeable {
long read(Buffer sink, long byteCount) throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
[Okio-> source]
private static Source source(final InputStream in, final Timeout timeout) {
// 1
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
// 2
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
// 2.1
timeout.throwIfReached();
// 2.2
Segment tail = sink.writableSegment(1);
// 2.3
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
// 2.4
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
// .......
}
步骤1:判空处理,输入流不能为空,超时条件不能为空
步骤2:实现匿名Source,read流程:
- 2.1:检查是否超时
- 2.2:从buffer中获取一个可以被写入的segment
- 2.3:实际从in读取的数据,最大不能超过 Segment.SIZE
- 2.4:读后的处理,更新buffer的size以及segment的limit
# Sink
public interface Sink extends Closeable, Flushable {
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
[Okio-> sink]
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
// 1
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// 2
timeout.throwIfReached();
// 3
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
// 4
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
// 5
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
// ......
}
步骤1:校验数据有效性,条件为是否越界
步骤2:在while循环中检查是否超时
步骤3:与source不同,在sink拿到的是双向链表的头结点,然后把写入操作委托给out,最大依然不能超过 Segment.SIZE
步骤4:更新segment中pos与limit的值,更新buffer中的size值
步骤5:如果当前写入segment中的数据已经读完了,那就在双向链表中移除并回收
# Buffer
- 作为Sink和Source中的操作媒介
- 实现了BufferedSource,BufferedSink接口
- 封装了SegmentPool对Segment获取的操作,添加了一些校验处理,减少Buffer和Segment之间的耦合,让Buffer只负责使用,实际的构建工作由SegmentPool来实现
- 真正存储数据的是Segment,Buffer用这种方式降低了数据迁移的粒度
[Buffer-> copyTo]
public Buffer copyTo(Buffer out, long offset, long byteCount) {
// 1
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0) return this;
out.size += byteCount;
// 2
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}
// 3
for (; byteCount > 0; s = s.next) {
Segment copy = s.sharedCopy();
copy.pos += offset;
copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
if (out.head == null) {
out.head = copy.next = copy.prev = copy;
} else {
out.head.prev.push(copy);
}
byteCount -= copy.limit - copy.pos;
offset = 0;
}
return this;
}
步骤1:对参数进行校验,判断条件为:不为空,数组不越界,byteCount不为0,并且更新新buffer的size
步骤2:跳跃不进行拷贝的segment
步骤3:然后就采用copy的方式进去拷贝,最后把segment插入到buffer中行程新链表,用这种方式提高数据拷贝的效率
[Okio-> buffer]
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
[RealBufferedSource-> read]
@Override public int read(byte[] sink, int offset, int byteCount) throws IOException {
// 1
checkOffsetAndCount(sink.length, offset, byteCount);
// 2
if (buffer.size == 0) {
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}
// 3
int toRead = (int) Math.min(byteCount, buffer.size);
return buffer.read(sink, offset, toRead);
}
步骤1:校验是否越界
步骤2:如果缓冲区没有数据了,从输入流中读取数据
步骤3:比较 byteCount 与 缓冲中的数据容量,得到到实际要读取的数据量,从Buffer中读取数据
[Buffer-> read]
@Override public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
// 1
Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);
System.arraycopy(s.data, s.pos, sink, offset, toCopy);
// 2
s.pos += toCopy;
size -= toCopy;
// 3
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return toCopy;
}
步骤1:获取链表中的第一个segment,基本判空,判长,然后就把segment中的数据拷贝到sink中
步骤2:更新segment和size的值
步骤3:如果当前segment已写部分已经读完,就把这个segment从链表中移除,并且回收到池中
[RealBufferedSink-> write]
@Override public BufferedSink write(byte[] source) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source);
return emitCompleteSegments();
}
[Buffer-> write]
@Override public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
checkOffsetAndCount(source.length, offset, byteCount);
int limit = offset + byteCount;
while (offset < limit) {
Segment tail = writableSegment(1);
// 1
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
// 2
offset += toCopy;
tail.limit += toCopy;
}
size += byteCount;
return this;
}
步骤1:基本的校验后,把字节数组中的值拷贝到buffer的tail segment中
步骤2:更新segment和buffer的值
[RealBufferedSink-> emitCompleteSegments]
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
这个方法执行的就是真正写入的操作,所以其实可以看到BufferedSink的buffer其实并没有起到缓冲的作用,数据都是直接写入的,但考虑到 Okio的整体设计来说,应该是把 Buffer 当做了一个数据统一的中转站,将读写的优化统一放在了 Buffer 中进行,因此考虑到整体的一致性,将 BufferedSink也采用了通过 Buffer中转的方式编写,应该算是一种妥协吧。并且采用 Buffer还有好处就是,一份数据既可以用于读也可以用于写
# Summary
如果使用传统的缓冲流进行数据读写,比如从一个输入流读入,再写入到一个输出流,那么这种情况下,在缓冲存在的情况下,数据走向是:
-> 从输入流读出到缓冲区
-> 从输入流缓冲区copy到 b[]
-> 将 b[] copy 到输出流缓冲区
-> 输出流缓冲区读出数据到输出流
而Okio则解决了这种冗余copy的操作,解决的方式其实就是通过Buffer的传递或者Segment的shareCopy减少了数据的拷贝次数以及拷贝粒度
# Timeout
Okio提供超时功能,希望IO能在一定时间内进行完毕,否则视为异常。分两种情况,同步超时和异步超时
- 同步超时:在每次读写中判断超时条件,因为处于同步方法,因此当IO发生阻塞时,不能及时响应
- 异步超时:用单独的线程监控超时条件,如果IO发生阻塞,并且检测到超时,抛出IO异常,阻塞终止
- 对于包装InputStream/OutputStream采用的是普通的TimeOut,而针对Socket,采用的是AsyncTimeOut
[Timeout-> throwIfReached]
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
在两种情况下会抛出异常:线程被中断,设定的最后时间到达了
# AsyncTimeout
[AsyncTimeout-> source]
public final Source source(final Source source) {
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
try {
source.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {
return AsyncTimeout.this;
}
@Override public String toString() {
return "AsyncTimeout.source(" + source + ")";
}
};
}
以AsyncTimeut的source方法为例,其实sink方法的逻辑也是一样的,那就是在read之前触发enter,read结束触发exit,同理close也要触发exit,所以其实只是相当于用静态代理实现了hook
[AsyncTimeout-> enter]
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
这个方法就只是检测有没有设置超时时间和截止时间,无需操作,然后就调用scheduleTimeout方法进一步处理
[AsyncTimeout-> scheduleTimeout]
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// 1
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
// 2
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// 3
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify();
}
break;
}
}
}
步骤1:AsyncTimeout是通过链表的方式进行管理的,在这里也就是初始化状态,会开启watchDog线程,以及创建头结点
步骤2:计算具体超时时间,如果timeout和deadline同时存在取最小值,其中一个不存在,取存在值
步骤3:根据时间先后把节点插入在链表中,如果插入节点在最前面,也就是时间最早,则唤醒watchDog线程,由于这个插入的节点时间是最早的,所以这里就需要进行notify对这个节点是否超时进行重新判断,并执行处理
[WatchDog-> run]
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
// 1
timedOut = awaitTimeout();
if (timedOut == null) continue;
// 2
if (timedOut == head) {
head = null;
return;
}
}
// 3
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
步骤1:寻找超时的结点,如果没找到就自旋
步骤2:链接中没有除了头结点没有其他结点并且守护线程达到最大等待时间,清空链表,退出线程
步骤3:找到超时的结点,调用该结点timeout方法
[AsyncTimeout-> awaitTimeout]
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
AsyncTimeout node = head.next;
// 1
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// 2
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// 3
head.next = node.next;
node.next = null;
return node;
}
步骤1:如果队列为空,wait 直到有新的节点加入 ,如果在这里返回null那么说明提前被notify或者有新结点加入
步骤2:waitNanos大于零说明还没有超时,则会等待超时
步骤3:超时了,从链表中删除节点
# Summary
整个enter操作所执行的工作就是开启WatchDog线程进行检测:
- 不停自旋直到找到节点
- 队列为空,wait 直到有新的节点加入
- 还没有超时,则会等待超时
- 超时后从链表删除节点
- 如果链表为空,退出WatchDog线程
- 执行timeout方法通知外界超时
[AsyncTimeout-> exit]
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
[AsyncTimeout-> cancelScheduledTimeout]
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
而退出的操作就相对简单了,只需要把链表中对应的节点移除就可以
参考资料: