OkHttp源码解析(七)请求写入与响应读取
CallServerInterceptor-> intercept
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// 获取上一层传下来的Exchange
Exchange exchange = realChain.exchange();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
// 使用Okio写入请求头
exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
// GET和HEAD不需要
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 如果客户端将在发送请求体之前等待100状态码的响应,则它必须发送一个Expect请求头,值为“100-continue”
// 如果客户端不打算发送请求体,就禁止发送值为“100-continue”的Expect请求头。
// 由于早期实现的原因,协议允许一些有歧义的情况,例如客户端可以发送“Expect: 100-continue”
// 而不接收417或100状态码。所以,客户端发送这个头部时,不应该无限期的等待100状态码返回而不发送请求体
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
// 请求刷新,Okio处理
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
// 构建Response.Builder,当response状态为100时,返回null
responseBuilder = exchange.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (request.body().isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
// header响应成功的情况下
exchange.flushRequest();
// Okio构建请求体的输出流
BufferedSink bufferedRequestBody = Okio.buffer(
// 使用Exchange拿到请求体,但是当前情况下参数为true
exchange.createRequestBody(request, true));
// 写入操作
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
// 依旧是使用Exchange拿到请求体,但是针对Expect: 100-continue这种情况参数为false
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
// 写入操作
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}
if (request.body() == null || !request.body().isDuplex()) {
// 结束请求处理
exchange.finishRequest();
}
if (!responseHeadersStarted) {
// 开始响应的处理
exchange.responseHeadersStart();
}
if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(false);
}
// 构建一个响应
Response response = responseBuilder
// 原请求
.request(request)
// 握手信息
.handshake(exchange.connection().handshake())
// 发送时间
.sentRequestAtMillis(sentRequestMillis)
// 响应
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
// 响应头结束
exchange.responseHeadersEnd(response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
// 收到101,表示切换协议,构建一个响应体为空的response
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// 正常情况下
response = response.newBuilder()
// 使用exchange的openResponseBody方法获取一个请求体
.body(exchange.openResponseBody(response))
.build();
}
// 关闭连接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}
// 响应码要求无内容,而response有实体,直接抛出异常
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
流程中有不少代码是针对特殊的请求头和100的响应码,梳理一下和Exchange有关的流程,因为Exchange中封装了连接以及针对Http1及Http2的编码解码
- 写入请求头 Exchange-> writeRequestHeaders
- 根据Request创建请求体,然后写入 Exchange-> createRequestBody
- 结束对请求的处理 Exchange-> finishRequest
- 开始对响应头进行处理 Exchange-> responseHeadersStart
- 读取响应头 Exchange-> readResponseHeaders
- 结束对响应头进行处理 Exchange-> responseHeadersEnd
- 获取响应体 Exchange-> openResponseBody
- 关闭连接 Exchange-> noNewExchangesOnConnection
Exchange
Exchange中的主要作用就是会对上层注册到Tranmsitter中eventListener进行事件回调
然后用Exchange中的ExchangeCodec执行实际的操作
这是面向接口的设计,针对Http1和Http2实现实现分派,先走一遍Http1的所有实现,再走一遍Http2的实现
Http1ExchangeCodec-> writeRequestHeaders
@Override public void writeRequestHeaders(Request request) throws IOException {
// 得到请求行的字符串,包含了请求方法、url、版本号
String requestLine = RequestLine.get(
request, realConnection.route().proxy().type());
// 写入请求头,请求行
writeRequest(request.headers(), requestLine);
}
Http1ExchangeCodec-> writeRequest
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
// 写入请求行
sink.writeUtf8(requestLine).writeUtf8("\r\n");
// 把Headers以kv形式进行写入
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
写入头部的逻辑就结束了,主要还是把Headers这个Map中的信息以kv的形式通过Okio进行写入,然后在前面加上请求行
Http1ExchangeCodec-> createRequestBody
@Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
if (request.body() != null && request.body().isDuplex()) {
throw new ProtocolException("Duplex connections are not supported for HTTP/1");
}
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
// Stream a request body of unknown length.
return newChunkedSink();
}
if (contentLength != -1L) {
// Stream a request body of a known length.
return newKnownLengthSink();
}
throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}
针对请求头中有Transfer-Encoding为chunked的情况,说明请求体的长度是未知的,转调newChunkedSink方法获取一个Sink,如果contentLength是有的,而且是一个有效值,那就转调newKnownLengthSink方法获取一个Sink
Http1ExchangeCodec-> newChunkedSink
private Sink newChunkedSink() {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new ChunkedSink();
}
返回了一个ChunkedSink,它是Sink的子类,而最后也会用这个Sink的write方法进行写入操作
ChunkedSink-> write
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
if (byteCount == 0) return;
// 16进制的形式写入数据的长度
sink.writeHexadecimalUnsignedLong(byteCount);
// 换行
sink.writeUtf8("\r\n");
// 写入source中读出的内容
sink.write(source, byteCount);
// 再写一个空行
sink.writeUtf8("\r\n");
}
Http1ExchangeCodec-> newKnownLengthSink
private Sink newKnownLengthSink() {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new KnownLengthSink();
}
依然是转到KownLengthSink的write方法
KownLengthSink-> write
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
// 判断byteCount是否越界
checkOffsetAndCount(source.size(), 0, byteCount);
// 写入操作
sink.write(source, byteCount);
}
可以看到KownLengthSink和ChunkedSink的区别就是,在头部Transfer-Encoding为chunked的时候会忽略contentLength,然后根据Http协议的语法,需要用16进制的方式写入一个长度,然后再进行数据的写入
Http1ExchangeCodec-> finishRequest
@Override public void finishRequest() throws IOException {
sink.flush();
}
用的是BufferSink,这里只是把sink中的内容flush出来
Exchange-> responseHeadersStart
public void responseHeadersStart() {
eventListener.responseHeadersStart(call);
}
Http1ExchangeCodec-> readResponseHeaders
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
// ......
try {
// 读响应行的内容
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
// 协议
.protocol(statusLine.protocol)
// 响应码
.code(statusLine.code)
.message(statusLine.message)
// 读取响应头的内容
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// ......
}
Http1ExchangeCodec-> readHeaderLine
private long headerLimit = HEADER_LIMIT;
private String readHeaderLine() throws IOException {
String line = source.readUtf8LineStrict(headerLimit);
headerLimit -= line.length();
return line;
}
StatusLine-> parse
public static StatusLine parse(String statusLine) throws IOException {
// H T T P / 1 . 1 2 0 0 T e m p o r a r y R e d i r e c t
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0
// Parse protocol like "HTTP/1.1" followed by a space.
int codeStart;
Protocol protocol;
if (statusLine.startsWith("HTTP/1.")) {
if (statusLine.length() < 9 || statusLine.charAt(8) != ' ') {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
int httpMinorVersion = statusLine.charAt(7) - '0';
codeStart = 9;
if (httpMinorVersion == 0) {
protocol = Protocol.HTTP_1_0;
} else if (httpMinorVersion == 1) {
protocol = Protocol.HTTP_1_1;
} else {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
} else if (statusLine.startsWith("ICY ")) {
// Shoutcast uses ICY instead of "HTTP/1.0".
protocol = Protocol.HTTP_1_0;
codeStart = 4;
} else {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
// Parse response code like "200". Always 3 digits.
if (statusLine.length() < codeStart + 3) {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
int code;
try {
code = Integer.parseInt(statusLine.substring(codeStart, codeStart + 3));
} catch (NumberFormatException e) {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
// Parse an optional response message like "OK" or "Not Modified". If it
// exists, it is separated from the response code by a space.
String message = "";
if (statusLine.length() > codeStart + 3) {
if (statusLine.charAt(codeStart + 3) != ' ') {
throw new ProtocolException("Unexpected status line: " + statusLine);
}
message = statusLine.substring(codeStart + 4);
}
return new StatusLine(protocol, code, message);
}
网络通信过程Http协议采用的ASCII码,通过Socket去读取会拿到一个字符串,然后还是要把这个字符串解析成一个StatusLine对象,主要要从请求行中拿到协议版本,响应码以及message
Http1ExchangeCodec-> readHeaders
private Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// parse the result headers until the first blank line
for (String line; (line = readHeaderLine()).length() != 0; ) {
// 进行字符串处理,解析成kv形式存储到headers的builder中
Internal.instance.addLenient(headers, line);
}
// 构建一个Headers返回
return headers.build();
}
把请求头一行一行的读进来,然后以kv的形式保存做成一个Headers对象
Exchange-> responseHeadersEnd
public void responseHeadersEnd(Response response) {
eventListener.responseHeadersEnd(call, response);
}
依旧是回调eventListener的接口
Http1ExchangeCodec-> openResponseBodySource
@Override public Source openResponseBodySource(Response response) {
// 没有响应体
if (!HttpHeaders.hasBody(response)) {
return newFixedLengthSource(0);
}
// 针对Transfer-Encoding为chunked的情况特殊处理
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
return newChunkedSource(response.request().url());
}
// 从响应头中获取contentLength字段
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
return newFixedLengthSource(contentLength);
}
return newUnknownLengthSource();
}
和请求的写入是一种套路,就是根据长度信息的不同创建了不同的Source,用Okio执行读取操作
FixedLengthSource-> read
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (bytesRemaining == 0) return -1;
long read = super.read(sink, Math.min(bytesRemaining, byteCount));
if (read == -1) {
realConnection.noNewExchanges(); // The server didn't supply the promised content length.
ProtocolException e = new ProtocolException("unexpected end of stream");
responseBodyComplete();
throw e;
}
bytesRemaining -= read;
if (bytesRemaining == 0) {
responseBodyComplete();
}
return read;
}
bytesRemainging是构造函数中传入的参数,在这个读取的操作中,其实还是用了父类的read方法,只不过在FixedLengthSource会对外界传入的大小以及读到的长度进行确认,取最小值
ChunkedSource-> read
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (!hasMoreChunks) return -1;
if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) {
// 读取块大小,赋值给bytesRemainingInChunk
readChunkSize();
if (!hasMoreChunks) return -1;
}
long read = super.read(sink, Math.min(byteCount, bytesRemainingInChunk));
if (read == -1) {
realConnection.noNewExchanges(); // The server didn't supply the promised chunk length.
ProtocolException e = new ProtocolException("unexpected end of stream");
responseBodyComplete();
throw e;
}
bytesRemainingInChunk -= read;
return read;
}
和FixedLengthSource的读取操作类似,都是要对读取的长度以及指定的长度进行确认取最小值,只不过这次这个指定的长度不再是外界传入的,而是在响应体中读取那个16进制的数
小结
到这里其实整个Http1的请求写入和响应读取的流程就走完,主要都是通过Okio进行数据的读写操作,针对Transfer-Encoding=Chunked这种情况需要特殊处理,要在请求体中写入一个16进制的数标识长度,要在响应体中读取一个16进制的数标明这段数据的长度
Http2ExchangeCodec-> writeRequestHeaders
@Override public void writeRequestHeaders(Request request) throws IOException {
if (stream != null) return;
boolean hasRequestBody = request.body() != null;
List<Header> requestHeaders = http2HeadersList(request);
stream = connection.newStream(requestHeaders, hasRequestBody);
// We may have been asked to cancel while creating the new stream and sending the request
// headers, but there was still no stream to close.
if (canceled) {
stream.closeLater(ErrorCode.CANCEL);
throw new IOException("Canceled");
}
stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}
转调了http2HeadersList方法获取一个Header的集合
然后创建了一个Http2Stream
Http2ExchangeCodec-> http2HeadersList
public static List<Header> http2HeadersList(Request request) {
Headers headers = request.headers();
List<Header> result = new ArrayList<>(headers.size() + 4);
result.add(new Header(":method", request.method()));
result.add(new Header(":path", RequestLine.requestPath(request.url())));
String host = request.header("Host");
if (host != null) {
result.add(new Header(":authority", host)); // Optional.
}
result.add(new Header(":scheme", request.url().scheme()));
for (int i = 0, size = headers.size(); i < size; i++) {
// header names must be lowercase.
String name = headers.name(i).toLowerCase(Locale.US);
if (!HTTP_2_SKIPPED_REQUEST_HEADERS.contains(name)
|| name.equals(TE) && headers.value(i).equals("trailers")) {
result.add(new Header(name, headers.value(i)));
}
}
return result;
}
把request中头部转换成一个列表,当然这里还包含了请求行中携带的数据
Http2Connection-> newStream
public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
return newStream(0, requestHeaders, out);
}
Http2Connection-> newStream
private Http2Stream newStream(
int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
boolean outFinished = !out;
boolean inFinished = false;
boolean flushHeaders;
Http2Stream stream;
int streamId;
synchronized (writer) {
synchronized (this) {
// 判断streamId是否越界
if (nextStreamId > Integer.MAX_VALUE / 2) {
shutdown(REFUSED_STREAM);
}
if (shutdown) {
throw new ConnectionShutdownException();
}
// 获取streamId
streamId = nextStreamId;
nextStreamId += 2;
// 创建一个Http2Stream对象
stream = new Http2Stream(streamId, this, outFinished, inFinished, null);
flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
if (stream.isOpen()) {
// 缓存这个Http2Stream对象
streams.put(streamId, stream);
}
}
if (associatedStreamId == 0) {
writer.headers(outFinished, streamId, requestHeaders);
} else if (client) {
throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
} else { // HTTP/2 has a PUSH_PROMISE frame.
writer.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
if (flushHeaders) {
writer.flush();
}
return stream;
}
主要的流程就是给这个stream分配一个id,然后创建一个Http2Stream,然后缓存到map中,然后,通过Http2Writer执行写入
这里还有一个push_promise帧的处理逻辑,用于表明服务器向客户端推送所述资源的意图,先于请求推送资源的响应数据传输
Http2Writer->headers
public synchronized void headers(
boolean outFinished, int streamId, List<Header> headerBlock) throws IOException {
if (closed) throw new IOException("closed");
// 对请求头进行HPACK压缩
hpackWriter.writeHeaders(headerBlock);
long byteCount = hpackBuffer.size();
int length = (int) Math.min(maxFrameSize, byteCount);
byte type = TYPE_HEADERS;
byte flags = byteCount == length ? FLAG_END_HEADERS : 0;
if (outFinished) flags |= FLAG_END_STREAM;
// 写入帧头,包含帧类型,帧长度,帧的状态信息,流的id
frameHeader(streamId, length, type, flags);
// 写入经过HPACK压缩后的头部信息
sink.write(hpackBuffer, length);
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length);
}
Http2的写入操作到这里也就走完了,可以对比出Http2的头部写入与Http1的区别:
- Http2是以帧为单位进行数据传输,会把原本Http1中请求行的内容,添加到头部帧中进行发送
- Http2通过引入stream实现多路复用,每个stream有一个streamId作为标识
- Http2中会做头部压缩,压缩方式HPACK,通过静态字典与动态字典以及哈夫曼编码进行头部压缩
Http2ExchangeCodec-> createRequestBody
@Override public Sink createRequestBody(Request request, long contentLength) {
return stream.getSink();
}
获取在请求头写入过程中创建的Http2Stream中的sink,这个sink的类型是FrameSink,然后就可以通过这个sink,在流中进行请求体的写入
FrameSink-> write
@Override public void write(Buffer source, long byteCount) throws IOException {
assert (!Thread.holdsLock(Http2Stream.this));
sendBuffer.write(source, byteCount);
while (sendBuffer.size() >= 16834) {
emitFrame(false);
}
}
把source中的数据写入到sendBuffer中,然后转调emitFrame进行帧的发送,对于data帧会积攒到一定大小之后再发送出去,而没有到这个大小也不用太担心,因为在flush中的时候会对sendBuffer进行判断,如果大于0,依然还是会调emitFrame方法进行数据发送
FrameSink-> emitFrame
/**
* 向连接发出单个数据帧。 帧的大小受此流的限制
* 写窗口。 该方法将一直阻塞,直到写入窗口为非空。
*/
private void emitFrame(boolean outFinishedOnLastFrame) throws IOException {
long toWrite;
synchronized (Http2Stream.this) {
writeTimeout.enter();
try {
while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
// 等待接受在这个流中的一个WINDOW_UDPATE帧
waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
}
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
// 写入的长度不能超过窗口大小
toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
bytesLeftInWriteWindow -= toWrite;
}
writeTimeout.enter();
try {
boolean outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size();
// 在连接的流中写入数据
connection.writeData(id, outFinished, sendBuffer, toWrite);
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
}
首先会判断窗口大小是否足够,如果不足够就会等待,等待接受到WINDOW_UPDATE的帧
然后对写入内容进行限制,不可以超过窗口大小,最后便是在连接中的流中进行写入操作
Http2Connection-> writeData
public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
throws IOException {
if (byteCount == 0) { // Empty data frames are not flow-controlled.
writer.data(outFinished, streamId, buffer, 0);
return;
}
while (byteCount > 0) {
int toWrite;
synchronized (Http2Connection.this) {
try {
while (bytesLeftInWriteWindow <= 0) {
// Before blocking, confirm that the stream we're writing is still open. It's possible
// that the stream has since been closed (such as if this write timed out.)
if (!streams.containsKey(streamId)) {
throw new IOException("stream closed");
}
Http2Connection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException();
}
toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
toWrite = Math.min(toWrite, writer.maxDataLength());
bytesLeftInWriteWindow -= toWrite;
}
byteCount -= toWrite;
writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
}
}
依旧会判断当前窗口是否有可用的大小,没有可用大小就会等待接收一个WINDOW_UPDATE帧去扩大窗口大小
并且在这里其实体现了对stream进行缓存的意义,主要作用是判断这个stream是否被关闭掉了
然后对写入大小进行限制,不能超过窗口大小,然后计算完长度之后需要从窗口大小中减去这个写入大小
然后转调Http2Writer的data方法执行data帧的写入
Http2Writer-> data
public synchronized void data(boolean outFinished, int streamId, Buffer source, int byteCount)
throws IOException {
if (closed) throw new IOException("closed");
byte flags = FLAG_NONE;
if (outFinished) flags |= FLAG_END_STREAM;
dataFrame(streamId, flags, source, byteCount);
}
Http2Writer-> dataFrame
void dataFrame(int streamId, byte flags, Buffer buffer, int byteCount) throws IOException {
byte type = TYPE_DATA;
frameHeader(streamId, byteCount, type, flags);
if (byteCount > 0) {
sink.write(buffer, byteCount);
}
}
写入帧头,然后写入相应的数据帧
Http2ExchangeCodec-> readResponseHeaders
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
Headers headers = stream.takeHeaders();
Response.Builder responseBuilder = readHttp2HeadersList(headers, protocol);
if (expectContinue && Internal.instance.code(responseBuilder) == HTTP_CONTINUE /*100*/) {
return null;
}
return responseBuilder;
}
通过stream.takeHeaders方法拿到一个Headers,把响应头的内容换成Response.Builder,然后如果响应码是100就继续执行读取操作
Http2Stream-> takeHeaders
public synchronized Headers takeHeaders() throws IOException {
readTimeout.enter();
try {
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
if (!headersQueue.isEmpty()) {
return headersQueue.removeFirst();
}
throw errorException != null ? errorException : new StreamResetException(errorCode);
}
主要的逻辑就是等待headersQueue有新的header信息填入
Http2ExchangeCodec-> readHttp2HeadersList
/** Returns headers for a name value block containing an HTTP/2 response. */
public static Response.Builder readHttp2HeadersList(Headers headerBlock,
Protocol protocol) throws IOException {
StatusLine statusLine = null;
Headers.Builder headersBuilder = new Headers.Builder();
for (int i = 0, size = headerBlock.size(); i < size; i++) {
String name = headerBlock.name(i);
String value = headerBlock.value(i);
if (name.equals(":status")) {
statusLine = StatusLine.parse("HTTP/1.1 " + value);
} else if (!HTTP_2_SKIPPED_RESPONSE_HEADERS.contains(name)) {
Internal.instance.addLenient(headersBuilder, name, value);
}
}
if (statusLine == null) throw new ProtocolException("Expected ':status' header not present");
return new Response.Builder()
.protocol(protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(headersBuilder.build());
}
把请求头的内容解析成一个Response.Builder对象,可见其实Http2的逻辑应该是后面再添加,因为请求行的解析的工作也是复用Http1的解析处理
Http2ExchangeCodec-> openResponseBody
@Override public Source openResponseBodySource(Response response) {
return stream.getSource();
}
直接返回当前流的source提供读取操作,依然是FrameSource,一样的套路了
FrameSource-> read
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
while (true) {
long readBytesDelivered = -1;
IOException errorExceptionToDeliver = null;
// 1. Decide what to do in a synchronized block.
synchronized (Http2Stream.this) {
// ......
if (closed) {
throw new IOException("stream closed");
} else if (readBuffer.size() > 0) {
// Prepare to read bytes. Start by moving them to the caller's buffer.
readBytesDelivered = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
unacknowledgedBytesRead += readBytesDelivered;
if (errorExceptionToDeliver == null
&& unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a
// WINDOW_UPDATE if the stream isn't in error.
// 流程控制:通知对端我们已经准备好接收更多数据! 仅发送一个
// WINDOW_UPDATE(如果流没有错误)
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
} else if (!finished && errorExceptionToDeliver == null) {
// Nothing to do. Wait until that changes then try again.
waitForIo();
continue;
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
}
// 2. Do it outside of the synchronized block and timeout.
if (readBytesDelivered != -1) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered);
return readBytesDelivered;
}
if (errorExceptionToDeliver != null) {
// We defer throwing the exception until now so that we can refill the connection
// flow-control window. This is necessary because we don't transmit window updates until
// the application reads the data. If we throw this prior to updating the connection
// flow-control window, we risk having it go to 0 preventing the server from sending data.
throw errorExceptionToDeliver;
}
return -1; // This source is exhausted.
}
}
在这里主要的操作还是添加流量控制,等需要接受更多数据的时候,接收方主动向发送方发送WINDOW_UPDATE帧通知它扩大窗口大小
总结
Ok中对Http2的处理与Http1差别比较大,在Http2构建连接的时候就会创建一个线程不断读取接收的帧,然后分发到对应的Stream,除此之外,相比Http1,Http2还添加了一套流量控制的机制,在发送数据之后会把窗口大小减少,而窗口的大小完全取决WINDOW_UPDATE帧,也就是接收方,由接收方决定发送方的发送速度,而发送方需要信任接收方,而当发送方的窗口大小不足时,则阻塞等待WINDOW_UPDATE帧的到来,而从Ok的逻辑来看,只有data帧会受到流量控制的限制,其他类型的帧不会影响窗口的大小