OkHttp源码解析(七)请求写入与响应读取

Android框架学习

Posted by Cc1over on January 13, 2020

OkHttp源码解析(七)请求写入与响应读取

CallServerInterceptor-> intercept

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    // 获取上一层传下来的Exchange
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    // 使用Okio写入请求头
    exchange.writeRequestHeaders(request);
    
    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    // GET和HEAD不需要
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 如果客户端将在发送请求体之前等待100状态码的响应,则它必须发送一个Expect请求头,值为“100-continue”
      // 如果客户端不打算发送请求体,就禁止发送值为“100-continue”的Expect请求头。
      // 由于早期实现的原因,协议允许一些有歧义的情况,例如客户端可以发送“Expect: 100-continue”
      // 而不接收417或100状态码。所以,客户端发送这个头部时,不应该无限期的等待100状态码返回而不发送请求体
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        // 请求刷新,Okio处理  
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        // 构建Response.Builder,当response状态为100时,返回null  
        responseBuilder = exchange.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        if (request.body().isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          // header响应成功的情况下  
          exchange.flushRequest();
          // Okio构建请求体的输出流  
          BufferedSink bufferedRequestBody = Okio.buffer(
              // 使用Exchange拿到请求体,但是当前情况下参数为true
              exchange.createRequestBody(request, true));
          // 写入操作  
          request.body().writeTo(bufferedRequestBody);
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          // 依旧是使用Exchange拿到请求体,但是针对Expect: 100-continue这种情况参数为false 
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          // 写入操作  
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
      exchange.noRequestBody();
    }

    if (request.body() == null || !request.body().isDuplex()) {
      // 结束请求处理  
      exchange.finishRequest();
    }

    if (!responseHeadersStarted) {
      // 开始响应的处理 
      exchange.responseHeadersStart();
    }

    if (responseBuilder == null) {
      // 读取响应头  
      responseBuilder = exchange.readResponseHeaders(false);
    }
    // 构建一个响应
    Response response = responseBuilder
        // 原请求
        .request(request)
        // 握手信息
        .handshake(exchange.connection().handshake())
        // 发送时间
        .sentRequestAtMillis(sentRequestMillis)
        // 响应
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
    // 响应头结束
    exchange.responseHeadersEnd(response);

    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      // 收到101,表示切换协议,构建一个响应体为空的response  
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // 正常情况下
      response = response.newBuilder()
          // 使用exchange的openResponseBody方法获取一个请求体  
          .body(exchange.openResponseBody(response))
          .build();
    }
    // 关闭连接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
    // 响应码要求无内容,而response有实体,直接抛出异常
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

流程中有不少代码是针对特殊的请求头和100的响应码,梳理一下和Exchange有关的流程,因为Exchange中封装了连接以及针对Http1及Http2的编码解码

  • 写入请求头 Exchange-> writeRequestHeaders
  • 根据Request创建请求体,然后写入 Exchange-> createRequestBody
  • 结束对请求的处理 Exchange-> finishRequest
  • 开始对响应头进行处理 Exchange-> responseHeadersStart
  • 读取响应头 Exchange-> readResponseHeaders
  • 结束对响应头进行处理 Exchange-> responseHeadersEnd
  • 获取响应体 Exchange-> openResponseBody
  • 关闭连接 Exchange-> noNewExchangesOnConnection

Exchange

Exchange中的主要作用就是会对上层注册到Tranmsitter中eventListener进行事件回调

然后用Exchange中的ExchangeCodec执行实际的操作

这是面向接口的设计,针对Http1和Http2实现实现分派,先走一遍Http1的所有实现,再走一遍Http2的实现

Http1ExchangeCodec-> writeRequestHeaders

@Override public void writeRequestHeaders(Request request) throws IOException {
    // 得到请求行的字符串,包含了请求方法、url、版本号
    String requestLine = RequestLine.get(
        request, realConnection.route().proxy().type());
    // 写入请求头,请求行
    writeRequest(request.headers(), requestLine);
  }

Http1ExchangeCodec-> writeRequest

public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    // 写入请求行
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    // 把Headers以kv形式进行写入
    for (int i = 0, size = headers.size(); i < size; i++) {
        sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

写入头部的逻辑就结束了,主要还是把Headers这个Map中的信息以kv的形式通过Okio进行写入,然后在前面加上请求行

Http1ExchangeCodec-> createRequestBody

@Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
    if (request.body() != null && request.body().isDuplex()) {
      throw new ProtocolException("Duplex connections are not supported for HTTP/1");
    }

    if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
      // Stream a request body of unknown length.
      return newChunkedSink();
    }

    if (contentLength != -1L) {
      // Stream a request body of a known length.
      return newKnownLengthSink();
    }

    throw new IllegalStateException(
        "Cannot stream a request body without chunked encoding or a known content length!");
  }

针对请求头中有Transfer-Encoding为chunked的情况,说明请求体的长度是未知的,转调newChunkedSink方法获取一个Sink,如果contentLength是有的,而且是一个有效值,那就转调newKnownLengthSink方法获取一个Sink

Http1ExchangeCodec-> newChunkedSink

private Sink newChunkedSink() {
    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
    state = STATE_WRITING_REQUEST_BODY;
    return new ChunkedSink();
  }

返回了一个ChunkedSink,它是Sink的子类,而最后也会用这个Sink的write方法进行写入操作

ChunkedSink-> write

@Override public void write(Buffer source, long byteCount) throws IOException {
      if (closed) throw new IllegalStateException("closed");
      if (byteCount == 0) return;
      // 16进制的形式写入数据的长度
      sink.writeHexadecimalUnsignedLong(byteCount);
      // 换行 
      sink.writeUtf8("\r\n");
      // 写入source中读出的内容
      sink.write(source, byteCount);
      // 再写一个空行
      sink.writeUtf8("\r\n");
    }

Http1ExchangeCodec-> newKnownLengthSink

private Sink newKnownLengthSink() {
    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
    state = STATE_WRITING_REQUEST_BODY;
    return new KnownLengthSink();
}

依然是转到KownLengthSink的write方法

KownLengthSink-> write

@Override public void write(Buffer source, long byteCount) throws IOException {
      if (closed) throw new IllegalStateException("closed");
      // 判断byteCount是否越界
      checkOffsetAndCount(source.size(), 0, byteCount);
      // 写入操作
      sink.write(source, byteCount);
    }

可以看到KownLengthSink和ChunkedSink的区别就是,在头部Transfer-Encoding为chunked的时候会忽略contentLength,然后根据Http协议的语法,需要用16进制的方式写入一个长度,然后再进行数据的写入

Http1ExchangeCodec-> finishRequest

@Override public void finishRequest() throws IOException {
    sink.flush();
}

用的是BufferSink,这里只是把sink中的内容flush出来

Exchange-> responseHeadersStart

public void responseHeadersStart() {
    eventListener.responseHeadersStart(call);
  }

Http1ExchangeCodec-> readResponseHeaders

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    // ......

    try {
      // 读响应行的内容  
      StatusLine statusLine = StatusLine.parse(readHeaderLine());

      Response.Builder responseBuilder = new Response.Builder()
          // 协议
          .protocol(statusLine.protocol)
          // 响应码
          .code(statusLine.code)
          .message(statusLine.message)
          // 读取响应头的内容
          .headers(readHeaders());

      if (expectContinue && statusLine.code == HTTP_CONTINUE) {
        return null;
      } else if (statusLine.code == HTTP_CONTINUE) {
        state = STATE_READ_RESPONSE_HEADERS;
        return responseBuilder;
      }

      state = STATE_OPEN_RESPONSE_BODY;
      return responseBuilder;
    } catch (EOFException e) {
      // ......
  }

Http1ExchangeCodec-> readHeaderLine

private long headerLimit = HEADER_LIMIT;

private String readHeaderLine() throws IOException {
    String line = source.readUtf8LineStrict(headerLimit);
    headerLimit -= line.length();
    return line;
  }

StatusLine-> parse

public static StatusLine parse(String statusLine) throws IOException {
    // H T T P / 1 . 1   2 0 0   T e m p o r a r y   R e d i r e c t
    // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0

    // Parse protocol like "HTTP/1.1" followed by a space.
    int codeStart;
    Protocol protocol;
    if (statusLine.startsWith("HTTP/1.")) {
      if (statusLine.length() < 9 || statusLine.charAt(8) != ' ') {
        throw new ProtocolException("Unexpected status line: " + statusLine);
      }
      int httpMinorVersion = statusLine.charAt(7) - '0';
      codeStart = 9;
      if (httpMinorVersion == 0) {
        protocol = Protocol.HTTP_1_0;
      } else if (httpMinorVersion == 1) {
        protocol = Protocol.HTTP_1_1;
      } else {
        throw new ProtocolException("Unexpected status line: " + statusLine);
      }
    } else if (statusLine.startsWith("ICY ")) {
      // Shoutcast uses ICY instead of "HTTP/1.0".
      protocol = Protocol.HTTP_1_0;
      codeStart = 4;
    } else {
      throw new ProtocolException("Unexpected status line: " + statusLine);
    }

    // Parse response code like "200". Always 3 digits.
    if (statusLine.length() < codeStart + 3) {
      throw new ProtocolException("Unexpected status line: " + statusLine);
    }
    int code;
    try {
      code = Integer.parseInt(statusLine.substring(codeStart, codeStart + 3));
    } catch (NumberFormatException e) {
      throw new ProtocolException("Unexpected status line: " + statusLine);
    }

    // Parse an optional response message like "OK" or "Not Modified". If it
    // exists, it is separated from the response code by a space.
    String message = "";
    if (statusLine.length() > codeStart + 3) {
      if (statusLine.charAt(codeStart + 3) != ' ') {
        throw new ProtocolException("Unexpected status line: " + statusLine);
      }
      message = statusLine.substring(codeStart + 4);
    }

    return new StatusLine(protocol, code, message);
  }

网络通信过程Http协议采用的ASCII码,通过Socket去读取会拿到一个字符串,然后还是要把这个字符串解析成一个StatusLine对象,主要要从请求行中拿到协议版本,响应码以及message

Http1ExchangeCodec-> readHeaders

private Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = readHeaderLine()).length() != 0; ) {
      // 进行字符串处理,解析成kv形式存储到headers的builder中  
      Internal.instance.addLenient(headers, line);
    }
    // 构建一个Headers返回
    return headers.build();
  }

把请求头一行一行的读进来,然后以kv的形式保存做成一个Headers对象

Exchange-> responseHeadersEnd

 public void responseHeadersEnd(Response response) {
    eventListener.responseHeadersEnd(call, response);
  }

依旧是回调eventListener的接口

Http1ExchangeCodec-> openResponseBodySource

@Override public Source openResponseBodySource(Response response) {
    // 没有响应体
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }
    // 针对Transfer-Encoding为chunked的情况特殊处理
    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }
    // 从响应头中获取contentLength字段
    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    return newUnknownLengthSource();
  }

和请求的写入是一种套路,就是根据长度信息的不同创建了不同的Source,用Okio执行读取操作

FixedLengthSource-> read

@Override public long read(Buffer sink, long byteCount) throws IOException {
      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
      if (closed) throw new IllegalStateException("closed");
      if (bytesRemaining == 0) return -1;

      long read = super.read(sink, Math.min(bytesRemaining, byteCount));
      if (read == -1) {
        realConnection.noNewExchanges(); // The server didn't supply the promised content length.
        ProtocolException e = new ProtocolException("unexpected end of stream");
        responseBodyComplete();
        throw e;
      }

      bytesRemaining -= read;
      if (bytesRemaining == 0) {
        responseBodyComplete();
      }
      return read;
    }

bytesRemainging是构造函数中传入的参数,在这个读取的操作中,其实还是用了父类的read方法,只不过在FixedLengthSource会对外界传入的大小以及读到的长度进行确认,取最小值

ChunkedSource-> read

@Override public long read(Buffer sink, long byteCount) throws IOException {
      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
      if (closed) throw new IllegalStateException("closed");
      if (!hasMoreChunks) return -1;

      if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) {
         // 读取块大小,赋值给bytesRemainingInChunk
         readChunkSize();
        if (!hasMoreChunks) return -1;
      }

      long read = super.read(sink, Math.min(byteCount, bytesRemainingInChunk));
      if (read == -1) {
        realConnection.noNewExchanges(); // The server didn't supply the promised chunk length.
        ProtocolException e = new ProtocolException("unexpected end of stream");
        responseBodyComplete();
        throw e;
      }
      bytesRemainingInChunk -= read;
      return read;
    }

和FixedLengthSource的读取操作类似,都是要对读取的长度以及指定的长度进行确认取最小值,只不过这次这个指定的长度不再是外界传入的,而是在响应体中读取那个16进制的数

小结

到这里其实整个Http1的请求写入和响应读取的流程就走完,主要都是通过Okio进行数据的读写操作,针对Transfer-Encoding=Chunked这种情况需要特殊处理,要在请求体中写入一个16进制的数标识长度,要在响应体中读取一个16进制的数标明这段数据的长度

Http2ExchangeCodec-> writeRequestHeaders

  @Override public void writeRequestHeaders(Request request) throws IOException {
    if (stream != null) return;

    boolean hasRequestBody = request.body() != null;
    List<Header> requestHeaders = http2HeadersList(request);
    stream = connection.newStream(requestHeaders, hasRequestBody);
    // We may have been asked to cancel while creating the new stream and sending the request
    // headers, but there was still no stream to close.
    if (canceled) {
      stream.closeLater(ErrorCode.CANCEL);
      throw new IOException("Canceled");
    }
    stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
    stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
  }

转调了http2HeadersList方法获取一个Header的集合

然后创建了一个Http2Stream

Http2ExchangeCodec-> http2HeadersList

public static List<Header> http2HeadersList(Request request) {
    Headers headers = request.headers();
    List<Header> result = new ArrayList<>(headers.size() + 4);
    result.add(new Header(":method", request.method()));
    result.add(new Header(":path", RequestLine.requestPath(request.url())));
    String host = request.header("Host");
    if (host != null) {
      result.add(new Header(":authority", host)); // Optional.
    }
    result.add(new Header(":scheme", request.url().scheme()));

    for (int i = 0, size = headers.size(); i < size; i++) {
      // header names must be lowercase.
      String name = headers.name(i).toLowerCase(Locale.US);
      if (!HTTP_2_SKIPPED_REQUEST_HEADERS.contains(name)
          || name.equals(TE) && headers.value(i).equals("trailers")) {
        result.add(new Header(name, headers.value(i)));
      }
    }
    return result;
  }

把request中头部转换成一个列表,当然这里还包含了请求行中携带的数据

Http2Connection-> newStream

public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
    return newStream(0, requestHeaders, out);
  }

Http2Connection-> newStream

 private Http2Stream newStream(
      int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
    boolean outFinished = !out;
    boolean inFinished = false;
    boolean flushHeaders;
    Http2Stream stream;
    int streamId;

    synchronized (writer) {
      synchronized (this) {
        // 判断streamId是否越界  
        if (nextStreamId > Integer.MAX_VALUE / 2) {
          shutdown(REFUSED_STREAM);
        }
        if (shutdown) {
          throw new ConnectionShutdownException();
        }
        // 获取streamId  
        streamId = nextStreamId;
        nextStreamId += 2;
        // 创建一个Http2Stream对象  
        stream = new Http2Stream(streamId, this, outFinished, inFinished, null);
        flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
        if (stream.isOpen()) {
          // 缓存这个Http2Stream对象   
          streams.put(streamId, stream);
        }
      }
      if (associatedStreamId == 0) {
        writer.headers(outFinished, streamId, requestHeaders);
      } else if (client) {
        throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
      } else { // HTTP/2 has a PUSH_PROMISE frame.
        writer.pushPromise(associatedStreamId, streamId, requestHeaders);
      }
    }

    if (flushHeaders) {
      writer.flush();
    }

    return stream;
  }

主要的流程就是给这个stream分配一个id,然后创建一个Http2Stream,然后缓存到map中,然后,通过Http2Writer执行写入

这里还有一个push_promise帧的处理逻辑,用于表明服务器向客户端推送所述资源的意图,先于请求推送资源的响应数据传输

Http2Writer->headers

public synchronized void headers(
      boolean outFinished, int streamId, List<Header> headerBlock) throws IOException {
    if (closed) throw new IOException("closed");
    // 对请求头进行HPACK压缩
    hpackWriter.writeHeaders(headerBlock);

    long byteCount = hpackBuffer.size();
    int length = (int) Math.min(maxFrameSize, byteCount);
    byte type = TYPE_HEADERS;
    byte flags = byteCount == length ? FLAG_END_HEADERS : 0;
    if (outFinished) flags |= FLAG_END_STREAM;
    // 写入帧头,包含帧类型,帧长度,帧的状态信息,流的id
    frameHeader(streamId, length, type, flags);
    // 写入经过HPACK压缩后的头部信息
    sink.write(hpackBuffer, length);

    if (byteCount > length) writeContinuationFrames(streamId, byteCount - length);
  }

Http2的写入操作到这里也就走完了,可以对比出Http2的头部写入与Http1的区别:

  • Http2是以帧为单位进行数据传输,会把原本Http1中请求行的内容,添加到头部帧中进行发送
  • Http2通过引入stream实现多路复用,每个stream有一个streamId作为标识
  • Http2中会做头部压缩,压缩方式HPACK,通过静态字典与动态字典以及哈夫曼编码进行头部压缩

Http2ExchangeCodec-> createRequestBody

@Override public Sink createRequestBody(Request request, long contentLength) {
    return stream.getSink();
  }

获取在请求头写入过程中创建的Http2Stream中的sink,这个sink的类型是FrameSink,然后就可以通过这个sink,在流中进行请求体的写入

FrameSink-> write

@Override public void write(Buffer source, long byteCount) throws IOException {
      assert (!Thread.holdsLock(Http2Stream.this));
      sendBuffer.write(source, byteCount);
      while (sendBuffer.size() >= 16834) {
        emitFrame(false);
      }
    }

把source中的数据写入到sendBuffer中,然后转调emitFrame进行帧的发送,对于data帧会积攒到一定大小之后再发送出去,而没有到这个大小也不用太担心,因为在flush中的时候会对sendBuffer进行判断,如果大于0,依然还是会调emitFrame方法进行数据发送

FrameSink-> emitFrame

/**
 * 向连接发出单个数据帧。 帧的大小受此流的限制
 * 写窗口。 该方法将一直阻塞,直到写入窗口为非空。
 */
private void emitFrame(boolean outFinishedOnLastFrame) throws IOException {
      long toWrite;
      synchronized (Http2Stream.this) {
        writeTimeout.enter();
        try {
          while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
            // 等待接受在这个流中的一个WINDOW_UDPATE帧  
            waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
          }
        } finally {
          writeTimeout.exitAndThrowIfTimedOut();
        }

        checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
        // 写入的长度不能超过窗口大小
        toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
        bytesLeftInWriteWindow -= toWrite;
      }

      writeTimeout.enter();
      try {
        boolean outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size();
        // 在连接的流中写入数据
        connection.writeData(id, outFinished, sendBuffer, toWrite);
      } finally {
        writeTimeout.exitAndThrowIfTimedOut();
      }
    }

首先会判断窗口大小是否足够,如果不足够就会等待,等待接受到WINDOW_UPDATE的帧

然后对写入内容进行限制,不可以超过窗口大小,最后便是在连接中的流中进行写入操作

Http2Connection-> writeData

public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
      throws IOException {
    if (byteCount == 0) { // Empty data frames are not flow-controlled.
      writer.data(outFinished, streamId, buffer, 0);
      return;
    }

    while (byteCount > 0) {
      int toWrite;
      synchronized (Http2Connection.this) {
        try {
          while (bytesLeftInWriteWindow <= 0) {
            // Before blocking, confirm that the stream we're writing is still open. It's possible
            // that the stream has since been closed (such as if this write timed out.)
            if (!streams.containsKey(streamId)) {
              throw new IOException("stream closed");
            }
            Http2Connection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt(); // Retain interrupted status.
          throw new InterruptedIOException();
        }

        toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
        toWrite = Math.min(toWrite, writer.maxDataLength());
        bytesLeftInWriteWindow -= toWrite;
      }

      byteCount -= toWrite;
      writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
    }
  }

依旧会判断当前窗口是否有可用的大小,没有可用大小就会等待接收一个WINDOW_UPDATE帧去扩大窗口大小

并且在这里其实体现了对stream进行缓存的意义,主要作用是判断这个stream是否被关闭掉了

然后对写入大小进行限制,不能超过窗口大小,然后计算完长度之后需要从窗口大小中减去这个写入大小

然后转调Http2Writer的data方法执行data帧的写入

Http2Writer-> data

public synchronized void data(boolean outFinished, int streamId, Buffer source, int byteCount)
      throws IOException {
    if (closed) throw new IOException("closed");
    byte flags = FLAG_NONE;
    if (outFinished) flags |= FLAG_END_STREAM;
    dataFrame(streamId, flags, source, byteCount);
  }

Http2Writer-> dataFrame

void dataFrame(int streamId, byte flags, Buffer buffer, int byteCount) throws IOException {
    byte type = TYPE_DATA;
    frameHeader(streamId, byteCount, type, flags);
    if (byteCount > 0) {
      sink.write(buffer, byteCount);
    }
  }

写入帧头,然后写入相应的数据帧

Http2ExchangeCodec-> readResponseHeaders

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    Headers headers = stream.takeHeaders();
    Response.Builder responseBuilder = readHttp2HeadersList(headers, protocol);
    if (expectContinue && Internal.instance.code(responseBuilder) == HTTP_CONTINUE /*100*/) {
      return null;
    }
    return responseBuilder;
  }

通过stream.takeHeaders方法拿到一个Headers,把响应头的内容换成Response.Builder,然后如果响应码是100就继续执行读取操作

Http2Stream-> takeHeaders

 public synchronized Headers takeHeaders() throws IOException {
    readTimeout.enter();
    try {
      while (headersQueue.isEmpty() && errorCode == null) {
        waitForIo();
      }
    } finally {
      readTimeout.exitAndThrowIfTimedOut();
    }
    if (!headersQueue.isEmpty()) {
      return headersQueue.removeFirst();
    }
    throw errorException != null ? errorException : new StreamResetException(errorCode);
  }

主要的逻辑就是等待headersQueue有新的header信息填入

Http2ExchangeCodec-> readHttp2HeadersList

/** Returns headers for a name value block containing an HTTP/2 response. */
  public static Response.Builder readHttp2HeadersList(Headers headerBlock,
      Protocol protocol) throws IOException {
    StatusLine statusLine = null;
    Headers.Builder headersBuilder = new Headers.Builder();
    for (int i = 0, size = headerBlock.size(); i < size; i++) {
      String name = headerBlock.name(i);
      String value = headerBlock.value(i);
      if (name.equals(":status")) {
        statusLine = StatusLine.parse("HTTP/1.1 " + value);
      } else if (!HTTP_2_SKIPPED_RESPONSE_HEADERS.contains(name)) {
        Internal.instance.addLenient(headersBuilder, name, value);
      }
    }
    if (statusLine == null) throw new ProtocolException("Expected ':status' header not present");

    return new Response.Builder()
        .protocol(protocol)
        .code(statusLine.code)
        .message(statusLine.message)
        .headers(headersBuilder.build());
  }

把请求头的内容解析成一个Response.Builder对象,可见其实Http2的逻辑应该是后面再添加,因为请求行的解析的工作也是复用Http1的解析处理

Http2ExchangeCodec-> openResponseBody

@Override public Source openResponseBodySource(Response response) {
    return stream.getSource();
  }

直接返回当前流的source提供读取操作,依然是FrameSource,一样的套路了

FrameSource-> read

@Override public long read(Buffer sink, long byteCount) throws IOException {
      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);

      while (true) {
        long readBytesDelivered = -1;
        IOException errorExceptionToDeliver = null;

        // 1. Decide what to do in a synchronized block.

        synchronized (Http2Stream.this) {
             
            // ......
            
            if (closed) {
              throw new IOException("stream closed");

            } else if (readBuffer.size() > 0) {
              // Prepare to read bytes. Start by moving them to the caller's buffer.
              readBytesDelivered = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
              unacknowledgedBytesRead += readBytesDelivered;

              if (errorExceptionToDeliver == null
                  && unacknowledgedBytesRead
                  >= connection.okHttpSettings.getInitialWindowSize() / 2) {
                // Flow control: notify the peer that we're ready for more data! Only send a
                // WINDOW_UPDATE if the stream isn't in error.
                // 流程控制:通知对端我们已经准备好接收更多数据! 仅发送一个
                // WINDOW_UPDATE(如果流没有错误)  
                connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
                unacknowledgedBytesRead = 0;
              }
            } else if (!finished && errorExceptionToDeliver == null) {
              // Nothing to do. Wait until that changes then try again.
              waitForIo();
              continue;
            }
          } finally {
            readTimeout.exitAndThrowIfTimedOut();
          }
        }

        // 2. Do it outside of the synchronized block and timeout.

        if (readBytesDelivered != -1) {
          // Update connection.unacknowledgedBytesRead outside the synchronized block.
          updateConnectionFlowControl(readBytesDelivered);
          return readBytesDelivered;
        }

        if (errorExceptionToDeliver != null) {
          // We defer throwing the exception until now so that we can refill the connection
          // flow-control window. This is necessary because we don't transmit window updates until
          // the application reads the data. If we throw this prior to updating the connection
          // flow-control window, we risk having it go to 0 preventing the server from sending data.
          throw errorExceptionToDeliver;
        }

        return -1; // This source is exhausted.
      }
    }

在这里主要的操作还是添加流量控制,等需要接受更多数据的时候,接收方主动向发送方发送WINDOW_UPDATE帧通知它扩大窗口大小

总结

Ok中对Http2的处理与Http1差别比较大,在Http2构建连接的时候就会创建一个线程不断读取接收的帧,然后分发到对应的Stream,除此之外,相比Http1,Http2还添加了一套流量控制的机制,在发送数据之后会把窗口大小减少,而窗口的大小完全取决WINDOW_UPDATE帧,也就是接收方,由接收方决定发送方的发送速度,而发送方需要信任接收方,而当发送方的窗口大小不足时,则阻塞等待WINDOW_UPDATE帧的到来,而从Ok的逻辑来看,只有data帧会受到流量控制的限制,其他类型的帧不会影响窗口的大小