OkHttp源码解析(六)连接管理(下)

Android框架学习

Posted by Cc1over on January 12, 2020

OkHttp源码解析(六)连接管理(下)

前言

本来想一篇笔记把Ok的连接管理走完,但是最后发现东西有点多,所以在上一篇笔记只追看到findConnection方法引申出来的ConnectionPool和连接复用机制,这一篇笔记就把剩下的连接创建过程以及路由相关点走完

ExchangeFinder-> findConnection

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new exchanges.
      // 尝试用一个已经分配的连接,但是可能连接不能用来创建新的Exchange
      releasedConnection = transmitter.connection;
      // 如果这个连接不能用来创建Exchange那就把它关掉  
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        // 如果已经存在一个连接,而且这个连接的状态是好  
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        // 尝试从连接池中获取一个连接  
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          // 修改当前路由为下一个路由  
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          // 如果当前路由应该被重试,重试当前路由  
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);
    // eventListener的注册时期已经很久远了,在Call的创建中
    // 会创建transmitter,然后会把OkHttpClient的listener注册进来
    if (releasedConnection != null) {
      // 通知上层连接释放了  
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      // 通知上层连接获取  
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      // 如果找到了一个已经分配或者池中的连接,就可以直接返回了
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    // 如果需要进行路由选择,进行一次
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        // 在这里已经有了一个IP地址列表,然后会尝试获得从池中获取一个连接  
        routes = routeSelection.getAll();
        // 在连接池中根据IP列表去获取一个连接给transmitter  
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
      
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        // 创建一个连接  
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 做TCP和TLS握手
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      // 最后一次尝试从连接池中获取连接,这种情况只可能发生在一个host下多个并发连接这种情况下  
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        // 如果成功拿到则关闭我们前面创建的连接的Socket,并返回连接池中的连接  
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute;
      } else {
        // 如果失败则在连接池中放入我们刚刚创建的连接,并将其设置为transmitter中的连接  
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

主要流程:

  • 尝试从transmitter中拿到一个已经创建的连接,然后判断这个连接能不能用来创建一个Exchange对象,如果不能就把这个连接关闭掉,如果可以,那么就返回transmitter中这个可以用来创建Exchange对象的连接
  • 如果上一步获取不到,就从连接池中获取一个连接
  • 如果从连接池中依然获取不到连接,进行路由选择,进行玩路由选择后使用路由选择的结果再次从连接池中获取连接
  • 如果还是获取不到连接那就创建一个表示连接的RealConnection对象,然后执行TCP以及TLS握手
  • 然后会最后一次从连接池中获取连接,针对与同一host的情况,为了避免并发问题,所以会在连接池中拿到一个连接来使用,然后把创建的连接进行关闭,从而达到对同一个host的连接复用

而这这个流程中其实体现了Ok中从开头就埋下的伏笔,那就是transmitter,transmitter是在创建是在Call中的,从上面几个拦截器看其实看不太出他的作用,但是在这一层,不仅提供给上层一些连接操作的监听方式,而且还保存了一些连接

RealConnection

public RealConnection(RealConnectionPool connectionPool, Route route) {
    this.connectionPool = connectionPool;
    this.route = route;
 }

RealConnection-> connect

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
                    int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
                    EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");
    RouteException routeException = null;
    List connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    // ......
    while (true) {
        try {
            if (route.requiresTunnel()) {
                // 如果使用了隧道技术,调用connectTunnel方法
                connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
                if (rawSocket == null) {
                    // We were unable to connect the tunnel but properly closed down our resources.
                    break;
                }
            } else {
                // 未使用隧道技术,调用connectSocket方法
                connectSocket(connectTimeout, readTimeout, call, eventListener);
            }
            // 建立协议
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
            break;
        } catch (IOException e) {
            //......
        }
    }
    // ......
}

如果采用了隧道技术,调用connectTunnel方法执行连接

如果没有采用隧道技术,调用connectSocket方法执行连接

RealConnection-> connectTunnel

private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
      EventListener eventListener) throws IOException {
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    for (int i = 0; i < 21; i++) {
      // 采用connectSocket
      connectSocket(connectTimeout, readTimeout, call, eventListener);
      // 创建一个隧道请求
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
      // 隧道创建成功,在这里退出
      if (tunnelRequest == null) break; // Tunnel successfully created.

      // The proxy decided to close the connection after an auth challenge. We need to create a new
      // connection, but this time with the auth credentials.
      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
      eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
    }
  }

在隧道连接的逻辑中,依然还是通过调用connectSocket的方式构建连接,但是除此之外,还会调用createTunel创建一个隧道请求

并且在隧道连接过程中添加了重试机会,有21次的重试机会

RealConnection-> createTunnel

private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
      HttpUrl url) throws IOException {
    // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
    // 构建一个Http请求行
    String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
    while (true) {
      Http1ExchangeCodec tunnelCodec = new Http1ExchangeCodec(null, null, source, sink);
      source.timeout().timeout(readTimeout, MILLISECONDS);
      sink.timeout().timeout(writeTimeout, MILLISECONDS);
      tunnelCodec.writeRequest(tunnelRequest.headers(), requestLine);
      tunnelCodec.finishRequest();
      // 发起隧道请求   
      Response response = tunnelCodec.readResponseHeaders(false)
          .request(tunnelRequest)
          .build();
      tunnelCodec.skipConnectBody(response);

      switch (response.code()) {
        // 如果响应200,也就是隧道建立成功,返回null      
        case HTTP_OK:
          // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
          // that happens, then we will have buffered bytes that are needed by the SSLSocket!
          // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
          // that it will almost certainly fail because the proxy has sent unexpected data.  
          if (!source.getBuffer().exhausted() || !sink.buffer().exhausted()) {
            throw new IOException("TLS tunnel buffered too many bytes!");
          }
          return null;
        // 如果需要进行代理认证  
        case HTTP_PROXY_AUTH:
          // 代理认真失败
          tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
          if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
          // 代理认证成功但是需要关闭连接    
          if ("close".equalsIgnoreCase(response.header("Connection"))) {
            return tunnelRequest;
          }
          break;

        default:
          throw new IOException(
              "Unexpected response code for CONNECT: " + response.code());
      }
    }
  }

主要的逻辑就是构建一个隧道申请的请求发送出去,然后根据响应情况区分处理

RealConnection-> connectSocket

private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
    // 创建一个Socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      // 针对不同平台使用了不同的连接方式  
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      // ......
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      // .......
    }
  }

主要还是创建socket,通过socket的读写Okio实现对网络数据的读写

所以其实Ok就是基于Socket之上自己做了一套Http协议的实现

RealConnection-> establishProtocol

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    // 不是https
    if (route.address().sslSocketFactory() == null) {
      // 如果是Http2.0协议  
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }
      // Http1.1
      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }

    eventListener.secureConnectStart(call);
    // Tls握手
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);
    // 如果不是Htps而且是Http2.0
    if (protocol == Protocol.HTTP_2) {
      startHttp2(pingIntervalMillis);
    }
  }

在构建完socket之后会根据协议去做TLS握手或者开启对Http2.0的支持

RealConnection-> startHttp2

private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }

创建了一个Http2Connection对象,然后转调start方法

Http2Connection-> start

void start(boolean sendConnectionPreface) throws IOException {
    if (sendConnectionPreface) {
      writer.connectionPreface();
      writer.settings(okHttpSettings);
      int windowSize = okHttpSettings.getInitialWindowSize();
      if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
        writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
      }
    }
    new Thread(readerRunnable).start(); // Not a daemon thread.
  }

发送一个preface作为使用Http2.0的一个标识,并且preface后面必须跟着一个setting帧

而开启的读线程就是用于读取服务器返回的preface,然后进行字符串的匹配

RealConnection-> connectTls

private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
      // Create the wrapper over the connected socket.
      // 构建一个SSLSocket对象包装原来创建的Socket   
      sslSocket = (SSLSocket) sslSocketFactory.createSocket(
          rawSocket, address.url().host(), address.url().port(), true /* autoClose */);

      // Configure the socket's ciphers, TLS versions, and extensions.
      // 配置TLS信息,如版本,密码,扩展名  
      ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
      if (connectionSpec.supportsTlsExtensions()) {
        Platform.get().configureTlsExtensions(
            sslSocket, address.url().host(), address.protocols());
      }

      // Force handshake. This can throw!
      // 执行TLS握手
      sslSocket.startHandshake();
      // block for session establishment
      // 阻塞获取SSLSession  
      SSLSession sslSocketSession = sslSocket.getSession();
      Handshake unverifiedHandshake = Handshake.get(sslSocketSession);

      // Verify that the socket's certificates are acceptable for the target host.
      // 验证证书的有效性  
      if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) {
        List<Certificate> peerCertificates = unverifiedHandshake.peerCertificates();
        if (!peerCertificates.isEmpty()) {
          X509Certificate cert = (X509Certificate) peerCertificates.get(0);
          throw new SSLPeerUnverifiedException(
              "Hostname " + address.url().host() + " not verified:"
                  + "\n    certificate: " + CertificatePinner.pin(cert)
                  + "\n    DN: " + cert.getSubjectDN().getName()
                  + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
        } else {
          throw new SSLPeerUnverifiedException(
              "Hostname " + address.url().host() + " not verified (no certificates)");
        }
      }

      // Check that the certificate pinner is satisfied by the certificates presented.
      address.certificatePinner().check(address.url().host(),
          unverifiedHandshake.peerCertificates());

      // Success! Save the handshake and the ALPN protocol.
      // 握手成功,保存握手信息  
      String maybeProtocol = connectionSpec.supportsTlsExtensions()
          ? Platform.get().getSelectedProtocol(sslSocket)
          : null;
      socket = sslSocket;
      source = Okio.buffer(Okio.source(socket));
      sink = Okio.buffer(Okio.sink(socket));
      handshake = unverifiedHandshake;
      protocol = maybeProtocol != null
          ? Protocol.get(maybeProtocol)
          : Protocol.HTTP_1_1;
      success = true;
    } catch (AssertionError e) {
      if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
      throw e;
    } finally {
      if (sslSocket != null) {
        Platform.get().afterHandshake(sslSocket);
      }
      if (!success) {
        closeQuietly(sslSocket);
      }
    }
  }

主要逻辑:

  • 基于之前建立的Socket建立包装类SSLSocket
  • 对 TLS 相关信息进行配置
  • 通过SSLSocket进行握手
  • 验证一些证书相关信息

小结

紧接上一篇的连接复用机制,连接的创建主要流程还是:

创建socket,如果如果使用了隧道技术,还需要采用CONNECT的请求方式去请求隧道创建

socket的创建完成以及隧道的创建完成后

会根据对Http2.0以及Https协议进行额外的处理:

对Http2.0:发送一个preface字符串以及紧跟一个setting帧发送给服务端,然后开启一个线程读取服务器返回的preface,判断是否对应的字符串

对Https:

  • 基于之前建立的Socket建立包装类SSLSocket
  • 对 TLS 相关信息进行配置
  • 通过SSLSocket进行握手
  • 验证一些证书相关信息

ExchangeFinder-> findConnection

再次回顾ExchangeFinder中寻找一个Connection的流程:

  • 尝试从transmitter中拿到一个已经创建的连接,然后判断这个连接能不能用来创建一个Exchange对象,如果不能就把这个连接关闭掉,如果可以,那么就返回transmitter中这个可以用来创建Exchange对象的连接
  • 如果上一步获取不到,就从连接池中获取一个连接
  • 如果从连接池中依然获取不到连接,进行路由选择,进行玩路由选择后使用路由选择的结果再次从连接池中获取连接
  • 如果还是获取不到连接那就创建一个表示连接的RealConnection对象,然后执行TCP以及TLS握手
  • 然后会最后一次从连接池中获取连接,针对与同一host的情况,为了避免并发问题,所以会在连接池中拿到一个连接来使用,然后把创建的连接进行关闭,从而达到对同一个host的连接复用

通过之前看的连接复用机制,通过连接新建,解决了ExchangeFinder操作的ConnectionPool背后的工作原理,但是还剩下两次从连接池中获取的变数,那就是路由

connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)
connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)

两次从连接池中获取的逻辑中routes参数第一次会为空,第二次为路由选择后的IP地址列表

而transmitterAcquirePooledConnection传入的routes参数其实只会用于做校验,而不会直接保存或设置到RealConnection,而IP的设置其实就是在RealConnection创建的时候

而真正创建给连接绑定路由的操作其实就在连接的创建过程中

result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
// ......
connectionPool.routeDatabase.connected(result.route());

构造函数中会把selectedRoute保存在RealConnection中,然后执行RouteDatabase的connected方法连接Connection的路由信息

Route

public final class Route {
  final Address address;
  final Proxy proxy;
  final InetSocketAddress inetSocketAddress;
  // ......
}    

路由信息包含了三个部分,一个是对应的地址,一个是代理,还有一个InetSocketAddress,而这个InteSocketAddress会根据代理的情况有不同的取值:

  • 没有代理的情况下它包含的信息是经过了 DNS 解析的 IP 以及协议的端口号
  • SOCKS 代理的情况下,它包含了 HTTP 服务器的域名和协议端口号
  • HTTP 代理的情况下,它包含了代理服务器经过了 DNS 解析的 IP 地址及端口号

而Proxy代理也就有三种不同的取值了:

  • 不使用代理
  • HTTP 代理
  • SOCKS 代理

先看一下连接是怎么管理路由信息的

RouteDatabase

/**
 * 失败路由的黑名单,以避免在与目标地址建立新连接时出现。 这是
 * 用来使OkHttp可以从错误中学习:如果尝试连接到失败
 * 特定的IP地址或代理服务器,该故障会被记住,并且备用路由
 * 首选。
 */
final class RouteDatabase {
  private final Set<Route> failedRoutes = new LinkedHashSet<>();

  /** Records a failure connecting to {@code failedRoute}. */
  public synchronized void failed(Route failedRoute) {
    failedRoutes.add(failedRoute);
  }

  /** Records success connecting to {@code route}. */
  public synchronized void connected(Route route) {
    failedRoutes.remove(route);
  }

  /** Returns true if {@code route} has failed recently and should be avoided. */
  public synchronized boolean shouldPostpone(Route route) {
    return failedRoutes.contains(route);
  }
}

可见其实RouteDatabase它保存了一份失败的路由信息列表,不可以重复而且以链表的方式关联起来

从注释可以看到RouteDatabase这个对象用于保存一些失败的连接,作为备用路由的首先,而这份失败的路由信息被保存在ConnectionPool中了

而路由选择的逻辑依然还是要回到findConnection方法中

ExchangeFinder-> findConnection

// ......
 boolean newRouteSelection = false;
 if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
    newRouteSelection = true;
    routeSelection = routeSelector.next();
 }

在两次从连接池获取连接中,或通过routeSelector进行路由选择

RouteSelector-> next

public Selection next() throws IOException {
    // 判断是否有下一个代理
    if (!hasNext()) {
      throw new NoSuchElementException();
    }

    // Compute the next set of routes to attempt.
    // 构建IP地址列表集合
    List<Route> routes = new ArrayList<>();
    while (hasNextProxy()) {
      // Postponed routes are always tried last. For example, if we have 2 proxies and all the
      // routes for proxy1 should be postponed, we'll move to proxy2. Only after we've exhausted
      // all the good routes will we attempt the postponed routes.
      // 获取下一个代理
      Proxy proxy = nextProxy();
      for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
        Route route = new Route(address, proxy, inetSocketAddresses.get(i));
        // 如果这个路由失败过  
        if (routeDatabase.shouldPostpone(route)) {
          postponedRoutes.add(route);
        } else {
          // 正常情况下
          routes.add(route);
        }
      }

      if (!routes.isEmpty()) {
        break;
      }
    }
    
    if (routes.isEmpty()) {
      // We've exhausted all Proxies so fallback to the postponed routes.
      routes.addAll(postponedRoutes);
      postponedRoutes.clear();
    }

    return new Selection(routes);
  }

首先拿到下一个代理

然后通过routeDatabase判断这个路由是否失败过,而routeDatabase维护了一个失败路由信息的集合,判断也只是判断当前这个Route是否在集合中

而对于路由选择,会优先选择正常情况,当实在没有可用的路由的时候,会从失败过的路由中进行重试

RouteSelector-> nextProxy

private Proxy nextProxy() throws IOException {
    if (!hasNextProxy()) {
      throw new SocketException("No route to " + address.url().host()
          + "; exhausted proxy configurations: " + proxies);
    }
    Proxy result = proxies.get(nextProxyIndex++);
    resetNextInetSocketAddress(result);
    return result;
  }

从proxies中获取下一个代理信,然后根据代理信息调整Route的InetSocketAddress

RouteSelector-> resetNextInetSocketAddress

/** Prepares the socket addresses to attempt for the current proxy or host. */
  private void resetNextInetSocketAddress(Proxy proxy) throws IOException {
    // Clear the addresses. Necessary if getAllByName() below throws!
    inetSocketAddresses = new ArrayList<>();

    String socketHost;
    int socketPort;
    // 如果代理方式是直连或者socket代理
    if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
      // 目标主机和目标端口都是原主机和原端口  
      socketHost = address.url().host();
      socketPort = address.url().port();
    } else {
      // 如果是Http代理  
      SocketAddress proxyAddress = proxy.address();
      if (!(proxyAddress instanceof InetSocketAddress)) {
        throw new IllegalArgumentException(
            "Proxy.address() is not an " + "InetSocketAddress: " + proxyAddress.getClass());
      }
      InetSocketAddress proxySocketAddress = (InetSocketAddress) proxyAddress;
      // 目的主机就变成了代理服务器的主机,目的端口号依然是原端口号   
      socketHost = getHostString(proxySocketAddress);
      socketPort = proxySocketAddress.getPort();
    }

    if (socketPort < 1 || socketPort > 65535) {
      throw new SocketException("No route to " + socketHost + ":" + socketPort
          + "; port is out of range");
    }
    // socket代理直接填入ip地址,无需进行DNS解析
    if (proxy.type() == Proxy.Type.SOCKS) {
      inetSocketAddresses.add(InetSocketAddress.createUnresolved(socketHost, socketPort));
    } else {
      eventListener.dnsStart(call, socketHost);
      
      // Try each address for best behavior in mixed IPv4/IPv6 environments.
      // 进行dns解析
      List<InetAddress> addresses = address.dns().(socketHost);
      if (addresses.isEmpty()) {
        throw new UnknownHostException(address.dns() + " returned no addresses for " + socketHost);
      }

      eventListener.dnsEnd(call, socketHost, addresses);
      // 转存到inetSocketAddresses
      for (int i = 0, size = addresses.size(); i < size; i++) {
        InetAddress inetAddress = addresses.get(i);
        inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
      }
    }
  }

主要是对三种代理方式执行了不同的处理:

  • 直连模式:使用DNS对目标服务器的地址进行解析,之后将解析后的IP地址及端口号填入
  • Socket代理:直接填入代理服务器的域名及端口号
  • Http代理:使用DNS对代理服务器地址进行解析,将解析后的IP地址及端口号填入

然后给上一层返回拿到的路径的IP端口

RouteSelector-> next

方法流程:

  • 从已经初始化好的代理列表中获取下一个代理
  • 然后通过routeDatabase判断这个路由是否失败过,而routeDatabase维护了一个失败路由信息的集合,判断也只是判断当前这个Route是否在集合中
  • 而对于路由选择,会优先选择正常情况,当实在没有可用的路由的时候,会从失败过的路由中进行重试
  • 把正常的路由信息保存在routes中用于构建Selection,而Selection只是route信息列表的封装,对外提供访问正常路由信息的操作接口

RouteSelector.Selection

public static final class Selection {
    private final List<Route> routes;
    private int nextRouteIndex = 0;

    Selection(List<Route> routes) {
      this.routes = routes;
    }

    public boolean hasNext() {
      return nextRouteIndex < routes.size();
    }

    public Route next() {
      if (!hasNext()) {
        throw new NoSuchElementException();
      }
      return routes.get(nextRouteIndex++);
    }

    public List<Route> getAll() {
      return new ArrayList<>(routes);
    }
  }

可见其实在连接获取中进行路由选择的时候,其实这些路由信息已经是被收集完缓存在RouteSelector.Selection的Route集合中了

而其实在ExchangeFinder中连接管理依赖于ConnectionPool,路由选择依赖RouteSelector,而ExchangeFinder寻找连接这个方法就是在各种黑盒子提供的服务下完成自己的连接获取逻辑

下一个关注点应该是路由信息的初始化的操作以及时机,追踪到ExchangeFinder创建时候RouteSelector创建

RouteSelector

/* State for negotiating the next proxy to use. */
private List<Proxy> proxies = Collections.emptyList();
/* State for negotiating the next socket address to use. */
private List<InetSocketAddress> inetSocketAddresses = Collections.emptyList();
/* State for negotiating failed routes */
private final List<Route> postponedRoutes = new ArrayList<>();

/**
 * Selects routes to connect to an origin server. Each connection requires a choice of proxy server,
 * IP address, and TLS mode. Connections may also be recycled.
 */
RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
      EventListener eventListener) {
    this.address = address;
    this.routeDatabase = routeDatabase;
    this.call = call;
    this.eventListener = eventListener;

    resetNextProxy(address.url(), address.proxy());
  }

做了一些简单赋值操作,然后根据url与代理信息转调resetNextProxy方法

RouteSelector-> resetNextProxy

private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if (proxy != null) {
      // If the user specifies a proxy, try that and only that.
      // 如果用户指明了一个代理,使用用户指定的代理  
      proxies = Collections.singletonList(proxy);
    } else {
      // Try each of the ProxySelector choices until one connection succeeds.
      // 使用ProxySelector选择路由  
      List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
      proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }

主要的逻辑就是先查看用户有没有设置代理,如果用户有设置代理,优先使用用户设置的代理

如果用户没有设置代理,那就尝试用ProxySelector获取代理列表,可以通过OkHttpClient配置,默认情况下会使用系统配置的代理列表

总结

结合上一篇文章,其实整个获取连接操作背后的机制都已经走了一遍了,再次总结一下从连接获取流程以及背后引发的机制

连接获取流程:

  • 尝试从transmitter中拿到一个已经创建的连接,然后判断这个连接能不能用来创建一个Exchange对象,如果不能就把这个连接关闭掉,如果可以,那么就返回transmitter中这个可以用来创建Exchange对象的连接
  • 如果上一步获取不到,就从连接池中获取一个连接
  • 如果从连接池中依然获取不到连接,进行路由选择,进行玩路由选择后使用路由选择的结果再次从连接池中获取连接
  • 如果还是获取不到连接那就创建一个表示连接的RealConnection对象,然后执行TCP以及TLS握手
  • 然后会最后一次从连接池中获取连接,针对与同一host的情况,为了避免并发问题,所以会在连接池中拿到一个连接来使用,然后把创建的连接进行关闭,从而达到对同一个host的连接复用

路由选择:

  • 路由选择器会在ExchangeFinder创建的时候进行创建,在这个过程路由选择会去收集所有相关的代理信息,为后续提供录音选择服务
  • 然后进行路由选择的本质就是走一下这些代理的路径,拿到对应的IP地址列表,而根据不同的代理方式进行不一样的操作:
    • 直连模式:使用DNS对目标服务器的地址进行解析,之后将解析后的IP地址及端口号填入
    • Socket代理:直接填入代理服务器的域名及端口号
    • Http代理:使用DNS对代理服务器地址进行解析,将解析后的IP地址及端口号填入
  • 然后根据当前代理路径的IP地址列表构建路由信息,而这个过程会优先选择正常的路由信息,失败过的会先跳过,而一旦找不到正常的路由信息,就会做降级处理,从失败的路由信息中进行重试

连接池复用:

  • ConnectionPool通过Deque做连接的缓存,对外提供了添加缓存和访问缓存的操作接口
  • 并且通过Transmitter的弱引用集合判断当前的连接是否没有被使用
  • 通过维护一个后台线程去定时回收没有被使用而且空闲时间最长的连接
  • 对于空闲时间最长而且没有到达keep-alive时间的连接,会主动等待时间差,在Transmitter主动释放持有连接时会唤醒正在等待的这个后台线程
  • 从池中经过第一次获取后而且路由选择后仍获取不到连接,这个时机会去创建一个新的连接RealConection

连接创建:

  • 如果使用了隧道技术,需要先通过一个CONNECT请求方式的Http请求去请求构建隧道
  • 如果没有使用隧道以及隧道建立后通过Socket的去实现连接
  • 然后根据协议做不同处理:
    • Http2.0:发送preface以及setting帧作为Http2.0开启的确认,然后开启线程匹配服务端传来的preface进行字符串的匹配
    • Https:执行TLS握手,用的SSLSocket包装原来的Socket,然后利用SSLSocket实现TLS握手,以及添加证书验证