OkHttp源码解析(五)连接管理(上)

Android框架学习

Posted by Cc1over on January 12, 2020

OkHttp源码解析(五)连接管理(上)

ConnectInterceptor-> intercept

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    // 如果请求方式为get,需要标记起来然后做一些额外检查
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }

主要的逻辑就是通过transimitter构建一个Exchange对象

然后传给下一层的拦截器

Transmitter-> newExchange

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  synchronized (connectionPool) {
    if (noMoreExchanges) {
      throw new IllegalStateException("released");
    }
    if (exchange != null) {
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    }
  }
  // attendance
  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
  // 给Transmitter做赋值
  synchronized (connectionPool) {
    this.exchange = result;
    this.exchangeRequestDone = false;
    this.exchangeResponseDone = false;
    return result;
  }
}

转调exchangeFinder的find方法获取一个ExchangeCodec对象

然后创建一个Exchange对象

最后给Transmitter做赋值,包含创建好的exchange

ExchangeFinder-> find

public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

首先先找到一个healthy的连接

然后通过这个连接拿到一个ExchangeCodec

ExchangeFinder-> findHealthyConnection

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      // 如果这是一个全新的连接,可以跳过健康检查   
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      // 进行检查确认池中的连接仍好,如果不是,将其从池拿出来并重新开始  
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }

寻找连接的工作是交给findConnection去做,然后获取到了一个RealConnection之后,会对这个RealConnection进行检查

RealConnection-> isMultiplexed

public boolean isMultiplexed() {
   return http2Connection != null;
}

判断是否支持多路复用,判断依据其实就是是否支持Http2.0的连接

RealConnection-> isHealthy

public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    }

    if (http2Connection != null) {
      return http2Connection.isHealthy(System.nanoTime());
    }

    if (doExtensiveChecks) {
      try {
        int readTimeout = socket.getSoTimeout();
        try {
          socket.setSoTimeout(1);
          if (source.exhausted()) {
            return false; // Stream is exhausted; socket is closed.
          }
          return true;
        } finally {
          socket.setSoTimeout(readTimeout);
        }
      } catch (SocketTimeoutException ignored) {
        // Read timed out; socket is good.
      } catch (IOException e) {
        return false; // Couldn't read; socket is closed.
      }
    }

    return true;
  }

先去检查socket是否关闭,socket能否进行读写

如果包含Http2的连接,检查是否可用

检查socket是否竭尽了

小结

理一理逻辑与流程,ConnectInterceptor主要的工作就是要去创建Exchange然后交给下层的拦截器,而Exchange用于发送Http请求和读取响应的类

然后创建Exchange需要ExchangeCodec、transimitter、Call等数据

Call就是我们最开始包装Request用于调度的数据结构,而Transimitter是Call创建的时候创建,同时还注册了eventListener

ExchangeCodec是一个接口,用于编码和解析Http请求和响应,使用面向接口这种设计兼容Http1和Http2

而ExchangeCodec的创建依赖于RealConnection这个连接,通过RealConnection的newCodec创建得到,所以需要先找到一个连接,针对连接Ok提供了一些缓存策略如池化,然后拿到连接之后就对连接进行有效性判断,然后就可以根据这个连接创建一个ExchangeCodec了

所以下一个关键点就在连接的创建

ExchangeFinder-> findConnection

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new exchanges.
      // 尝试用一个已经分配的连接,但是可能连接不能用来创建新的Exchange
      releasedConnection = transmitter.connection;
      // 如果这个连接不能用来创建Exchange那就把它关掉  
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        // 如果已经存在一个连接,而且这个连接的状态是好  
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        // 尝试从连接池中获取一个连接  
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          // 修改当前路由为下一个路由  
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          // 如果当前路由应该被重试,重试当前路由  
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);
    // eventListener的注册时期已经很久远了,在Call的创建中
    // 会创建transmitter,然后会把OkHttpClient的listener注册进来
    if (releasedConnection != null) {
      // 通知上层连接释放了  
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      // 通知上层连接获取  
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      // 如果找到了一个已经分配或者池中的连接,就可以直接返回了
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    // 如果需要进行路由选择,进行一次
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        // 在这里已经有了一个IP地址列表,然后会尝试获得从池中获取一个连接  
        routes = routeSelection.getAll();
        // 在连接池中根据IP列表去获取一个连接给transmitter  
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
      
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        // 创建一个连接  
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 做TCP和TLS握手
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      // 最后一次尝试从连接池中获取连接,这种情况只可能发生在一个host下多个并发连接这种情况下  
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        // 如果成功拿到则关闭我们前面创建的连接的Socket,并返回连接池中的连接  
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute;
      } else {
        // 如果失败则在连接池中放入我们刚刚创建的连接,并将其设置为transmitter中的连接  
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

主要流程:

  • 尝试从transmitter中拿到一个已经创建的连接,然后判断这个连接能不能用来创建一个Exchange对象,如果不能就把这个连接关闭掉,如果可以,那么就返回transmitter中这个可以用来创建Exchange对象的连接
  • 如果上一步获取不到,就从连接池中获取一个连接
  • 如果从连接池中依然获取不到连接,进行路由选择,进行玩路由选择后使用路由选择的结果再次从连接池中获取连接
  • 如果还是获取不到连接那就创建一个表示连接的RealConnection对象,然后执行TCP以及TLS握手
  • 然后会最后一次从连接池中获取连接,针对与同一host的情况,为了避免并发问题,所以会在连接池中拿到一个连接来使用,然后把创建的连接进行关闭,从而达到对同一个host的连接复用

而这这个流程中其实体现了Ok中从开头就埋下的伏笔,那就是transmitter,transmitter是在创建是在Call中的,从上面几个拦截器看其实看不太出他的作用,但是在这一层,不仅提供给上层一些连接操作的监听方式,而且还保存了一些连接

RealConnectionPool-> transmitterAcquirePooledConnection

private final Deque<RealConnection> connections = new ArrayDeque<>();

boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }

如果requireMultiplexed为true说明这是一个Http/2.0的连接,然后会对connection进行判断查看它是否支持多路复用

然后对connect进行合格判断,然后就转调transmitter的acquireConnectionNoEvents方法

Transmitter-> acquireConnectionNoEvents

void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));

    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }

可以看到其实到acquireConnectionNoEvents这个方法就直接对transmitter中的connection进行赋值,然后给connection中的transmitters添加一个reference

虽然暂时不知道这个reference的目的是什么什么,但是说明对于连接复用相关的机制都在ConnectionPool中,transmitter只会保存相应的connection

而transimitter是对应一个call也就是一个request的,所以一个connection可能会对应多个transmitters,所以这个connection才需要保存一份这个connection所引用的transmitter信息

RealConnection# transmitters#

final List<Reference<Transmitter>> transmitters = new ArrayList<>();

Transmitter# TransmitterReference

static final class TransmitterReference extends WeakReference<Transmitter> {
    /**
     * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
     * identifying the origin of connection leaks.
     */
    final Object callStackTrace;

    TransmitterReference(Transmitter referent, Object callStackTrace) {
      super(referent);
      this.callStackTrace = callStackTrace;
    }
  }

这个TransmitterReference是一个弱引用的子类,所以在RealConnection中保存的其实所以引用的Transmitter的弱引用,估计很可能和连接的回收策略是有关的

而接下来我们关于ConnectionPool的线索其实就剩下了findConnection中添加缓存时调到的Connection的put方法

RealConnectionPool-> put

private final Deque<RealConnection> connections = new ArrayDeque<>();

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

在添加缓存的时候会把connection添加到RealConnectionPool中维护的容器中,这是肯定会做的操作了,而在添加前做一个清除操作

RealConnectionPool# executor#

/**
 * Background threads are used to cleanup expired connections. There will be at most a single
 * thread running per connection pool. The thread pool executor permits the pool itself to be
 * garbage collected.
 */
// 用于做垃圾回收的线程池,会清除过期的连接
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

这个线程池的设计其实和Ok的Dispatcher的设计是一样的,核心线程数配置为0,非核心线程数的最大值配置为Integer.MAX_VALUE,就是想给RealConnectionPool更多管理线程的权限,把这些线程池的操作交给RealConnectionPool

RealConnectionPool# cleanupRunnable

private final Runnable cleanupRunnable = () -> {
    while (true) {
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };

主要的清除的任务转调cleanup来完成,cleanup方法会返回一个等待时间,然后RealConnectionPool就会等待这个时间,为什么要等待只需要关注唤醒的地方就可以

RealConnectionPool-> cleanup

ong cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 判断当前的connection是否还在被引用着 
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        // 来到这里说明这个连接没有使用者
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        // 寻找空闲时间最长的connection  
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
       
      // 如果最长空闲时间大于keep-alive设置的时间
      // 又或者空闲数大于最大的空闲连接数
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block). 
        // 从连接池中移除这个空闲时间最长的连接  
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        // 计算当前空闲时间到keep-alive的时间距离  
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        // 如果当前的连接都都是可用的,则直接返回keep-alive的时间
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use. 
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

关注这个方法的关键流程:

遍历RealConnectionPool中缓存Connection的集合,然后调用pruneAndGetAllocationCount判断当前的Connection是否被引用,如果被引用着那这个Connection就不会被回收

如果Connection不被引用着, 会统计这些不被引用着的Connection的数目,而且会从这堆不被引用的Connection中找到空闲时间最长的Connection

找到这个空闲时间最长的Connection之后,如果最长空闲时间大于keep-alive设置的时间又或者当前空闲数大于最大的空闲连接数,那就从连接池中移除这个空闲时间最长的Connection

如果keep-alive的时间还没有到又或者所有连接都是可用的,则让RealConnectionPool进入等待

当它等待的时间到了,肯定就会做它要做的事情,但是所以需要寻找的还是唤醒的地方

RealConnectionPool-> connectionBecameIdle

boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (connection.noNewExchanges || maxIdleConnections == 0) {
      connections.remove(connection);
      return true;
    } else {
      notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
      return false;
    }
  }

把得知一个connection变成空闲的时候就会把我们的clean up线程唤醒,反跟踪这个方法的调用处

Transmitter-> releaseConnectionNoEvents

@Nullable Socket releaseConnectionNoEvents() {
    assert (Thread.holdsLock(connectionPool));
    // 把connection中transmitters的引用找出来
    int index = -1;
    for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
      Reference<Transmitter> reference = this.connection.tra.get(i);
      if (reference.get() == this) {
        index = i;
        break;
      }
    }

    if (index == -1) throw new IllegalStateException();
    // 把transmitters的引用删除
    RealConnection released = this.connection;
    released.transmitters.remove(index);
    this.connection = null;
     
    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime();
      // 通知connectionPool当前这个connection已经空闲了  
      if (connectionPool.connectionBecameIdle(released)) {
        return released.socket();
      }
    }

    return null;
  }

理一理现在的情况:

  • 在ConnectionPool中会有一些过期的连接,在put的时候会执行这个cleanup线程
  • cleanup线程会清除没有被引用到的而已待时间最长的连接,如果通过keep-alive设置的存活时间,cleanup线程会等待到keep-alive时间结束又或者是Transmitter主动去释放一个Connection

而接下来的关注点其实应该是怎么样去判断Connection没有被引用

RealConnectionPool-> pruneAndGetAllocationCount

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<Transmitter>> references = connection.transmitters;
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked transmitter. This is an application bug.
      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);

      references.remove(i);
      connection.noNewExchanges = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

其实就是通过遍历在connection中维护的transimitter引用列表,通过判断reference是否为null就知道connection是否被引用

总结

看了这么长的方法流程,先小结一下

起点是ExchangeFinder的findConnection方法,在寻找连接的过程会,会先从transimitter获取,然后从连接池根据有无路由获取

而连接池对ExchangeFinder提供了连接复用的机制,通过维护一个Deque把连接存储在其中

而在连接池中通过后台线程实现无用连接的清除,而连接池还暴露了接口给ExchangeFinder,让它可以创建一个连接并添加到连接池中