OkHttp源码解析(五)连接管理(上)
ConnectInterceptor-> intercept
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
// 如果请求方式为get,需要标记起来然后做一些额外检查
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
主要的逻辑就是通过transimitter构建一个Exchange对象
然后传给下一层的拦截器
Transmitter-> newExchange
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}
// attendance
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
// 给Transmitter做赋值
synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
转调exchangeFinder的find方法获取一个ExchangeCodec对象
然后创建一个Exchange对象
最后给Transmitter做赋值,包含创建好的exchange
ExchangeFinder-> find
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
首先先找到一个healthy的连接
然后通过这个连接拿到一个ExchangeCodec
ExchangeFinder-> findHealthyConnection
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
// 如果这是一个全新的连接,可以跳过健康检查
synchronized (connectionPool) {
if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
// 进行检查确认池中的连接仍好,如果不是,将其从池拿出来并重新开始
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
寻找连接的工作是交给findConnection去做,然后获取到了一个RealConnection之后,会对这个RealConnection进行检查
RealConnection-> isMultiplexed
public boolean isMultiplexed() {
return http2Connection != null;
}
判断是否支持多路复用,判断依据其实就是是否支持Http2.0的连接
RealConnection-> isHealthy
public boolean isHealthy(boolean doExtensiveChecks) {
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
if (http2Connection != null) {
return http2Connection.isHealthy(System.nanoTime());
}
if (doExtensiveChecks) {
try {
int readTimeout = socket.getSoTimeout();
try {
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
return true;
}
先去检查socket是否关闭,socket能否进行读写
如果包含Http2的连接,检查是否可用
检查socket是否竭尽了
小结
理一理逻辑与流程,ConnectInterceptor主要的工作就是要去创建Exchange然后交给下层的拦截器,而Exchange用于发送Http请求和读取响应的类
然后创建Exchange需要ExchangeCodec、transimitter、Call等数据
Call就是我们最开始包装Request用于调度的数据结构,而Transimitter是Call创建的时候创建,同时还注册了eventListener
ExchangeCodec是一个接口,用于编码和解析Http请求和响应,使用面向接口这种设计兼容Http1和Http2
而ExchangeCodec的创建依赖于RealConnection这个连接,通过RealConnection的newCodec创建得到,所以需要先找到一个连接,针对连接Ok提供了一些缓存策略如池化,然后拿到连接之后就对连接进行有效性判断,然后就可以根据这个连接创建一个ExchangeCodec了
所以下一个关键点就在连接的创建
ExchangeFinder-> findConnection
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new exchanges.
// 尝试用一个已经分配的连接,但是可能连接不能用来创建新的Exchange
releasedConnection = transmitter.connection;
// 如果这个连接不能用来创建Exchange那就把它关掉
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
// 如果已经存在一个连接,而且这个连接的状态是好
result = transmitter.connection;
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
// 尝试从连接池中获取一个连接
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else if (nextRouteToTry != null) {
// 修改当前路由为下一个路由
selectedRoute = nextRouteToTry;
nextRouteToTry = null;
} else if (retryCurrentRoute()) {
// 如果当前路由应该被重试,重试当前路由
selectedRoute = transmitter.connection.route();
}
}
}
closeQuietly(toClose);
// eventListener的注册时期已经很久远了,在Call的创建中
// 会创建transmitter,然后会把OkHttpClient的listener注册进来
if (releasedConnection != null) {
// 通知上层连接释放了
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
// 通知上层连接获取
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
// 如果找到了一个已经分配或者池中的连接,就可以直接返回了
return result;
}
// If we need a route selection, make one. This is a blocking operation.
// 如果需要进行路由选择,进行一次
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
// 在这里已经有了一个IP地址列表,然后会尝试获得从池中获取一个连接
routes = routeSelection.getAll();
// 在连接池中根据IP列表去获取一个连接给transmitter
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
// 创建一个连接
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 做TCP和TLS握手
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
// 最后一次尝试从连接池中获取连接,这种情况只可能发生在一个host下多个并发连接这种情况下
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
// 如果成功拿到则关闭我们前面创建的连接的Socket,并返回连接池中的连接
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute;
} else {
// 如果失败则在连接池中放入我们刚刚创建的连接,并将其设置为transmitter中的连接
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
主要流程:
- 尝试从transmitter中拿到一个已经创建的连接,然后判断这个连接能不能用来创建一个Exchange对象,如果不能就把这个连接关闭掉,如果可以,那么就返回transmitter中这个可以用来创建Exchange对象的连接
- 如果上一步获取不到,就从连接池中获取一个连接
- 如果从连接池中依然获取不到连接,进行路由选择,进行玩路由选择后使用路由选择的结果再次从连接池中获取连接
- 如果还是获取不到连接那就创建一个表示连接的RealConnection对象,然后执行TCP以及TLS握手
- 然后会最后一次从连接池中获取连接,针对与同一host的情况,为了避免并发问题,所以会在连接池中拿到一个连接来使用,然后把创建的连接进行关闭,从而达到对同一个host的连接复用
而这这个流程中其实体现了Ok中从开头就埋下的伏笔,那就是transmitter,transmitter是在创建是在Call中的,从上面几个拦截器看其实看不太出他的作用,但是在这一层,不仅提供给上层一些连接操作的监听方式,而且还保存了一些连接
RealConnectionPool-> transmitterAcquirePooledConnection
private final Deque<RealConnection> connections = new ArrayDeque<>();
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (requireMultiplexed && !connection.isMultiplexed()) continue;
if (!connection.isEligible(address, routes)) continue;
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
如果requireMultiplexed为true说明这是一个Http/2.0的连接,然后会对connection进行判断查看它是否支持多路复用
然后对connect进行合格判断,然后就转调transmitter的acquireConnectionNoEvents方法
Transmitter-> acquireConnectionNoEvents
void acquireConnectionNoEvents(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}
可以看到其实到acquireConnectionNoEvents这个方法就直接对transmitter中的connection进行赋值,然后给connection中的transmitters添加一个reference
虽然暂时不知道这个reference的目的是什么什么,但是说明对于连接复用相关的机制都在ConnectionPool中,transmitter只会保存相应的connection
而transimitter是对应一个call也就是一个request的,所以一个connection可能会对应多个transmitters,所以这个connection才需要保存一份这个connection所引用的transmitter信息
RealConnection# transmitters#
final List<Reference<Transmitter>> transmitters = new ArrayList<>();
Transmitter# TransmitterReference
static final class TransmitterReference extends WeakReference<Transmitter> {
/**
* Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
* identifying the origin of connection leaks.
*/
final Object callStackTrace;
TransmitterReference(Transmitter referent, Object callStackTrace) {
super(referent);
this.callStackTrace = callStackTrace;
}
}
这个TransmitterReference是一个弱引用的子类,所以在RealConnection中保存的其实所以引用的Transmitter的弱引用,估计很可能和连接的回收策略是有关的
而接下来我们关于ConnectionPool的线索其实就剩下了findConnection中添加缓存时调到的Connection的put方法
RealConnectionPool-> put
private final Deque<RealConnection> connections = new ArrayDeque<>();
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
在添加缓存的时候会把connection添加到RealConnectionPool中维护的容器中,这是肯定会做的操作了,而在添加前做一个清除操作
RealConnectionPool# executor#
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
// 用于做垃圾回收的线程池,会清除过期的连接
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
这个线程池的设计其实和Ok的Dispatcher的设计是一样的,核心线程数配置为0,非核心线程数的最大值配置为Integer.MAX_VALUE,就是想给RealConnectionPool更多管理线程的权限,把这些线程池的操作交给RealConnectionPool
RealConnectionPool# cleanupRunnable
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
主要的清除的任务转调cleanup来完成,cleanup方法会返回一个等待时间,然后RealConnectionPool就会等待这个时间,为什么要等待只需要关注唤醒的地方就可以
RealConnectionPool-> cleanup
ong cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 判断当前的connection是否还在被引用着
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
// 来到这里说明这个连接没有使用者
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 寻找空闲时间最长的connection
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
// 如果最长空闲时间大于keep-alive设置的时间
// 又或者空闲数大于最大的空闲连接数
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
// 从连接池中移除这个空闲时间最长的连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
// 计算当前空闲时间到keep-alive的时间距离
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
// 如果当前的连接都都是可用的,则直接返回keep-alive的时间
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
关注这个方法的关键流程:
遍历RealConnectionPool中缓存Connection的集合,然后调用pruneAndGetAllocationCount判断当前的Connection是否被引用,如果被引用着那这个Connection就不会被回收
如果Connection不被引用着, 会统计这些不被引用着的Connection的数目,而且会从这堆不被引用的Connection中找到空闲时间最长的Connection
找到这个空闲时间最长的Connection之后,如果最长空闲时间大于keep-alive设置的时间又或者当前空闲数大于最大的空闲连接数,那就从连接池中移除这个空闲时间最长的Connection
如果keep-alive的时间还没有到又或者所有连接都是可用的,则让RealConnectionPool进入等待
当它等待的时间到了,肯定就会做它要做的事情,但是所以需要寻找的还是唤醒的地方
RealConnectionPool-> connectionBecameIdle
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewExchanges || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
把得知一个connection变成空闲的时候就会把我们的clean up线程唤醒,反跟踪这个方法的调用处
Transmitter-> releaseConnectionNoEvents
@Nullable Socket releaseConnectionNoEvents() {
assert (Thread.holdsLock(connectionPool));
// 把connection中transmitters的引用找出来
int index = -1;
for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
Reference<Transmitter> reference = this.connection.tra.get(i);
if (reference.get() == this) {
index = i;
break;
}
}
if (index == -1) throw new IllegalStateException();
// 把transmitters的引用删除
RealConnection released = this.connection;
released.transmitters.remove(index);
this.connection = null;
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime();
// 通知connectionPool当前这个connection已经空闲了
if (connectionPool.connectionBecameIdle(released)) {
return released.socket();
}
}
return null;
}
理一理现在的情况:
- 在ConnectionPool中会有一些过期的连接,在put的时候会执行这个cleanup线程
- cleanup线程会清除没有被引用到的而已待时间最长的连接,如果通过keep-alive设置的存活时间,cleanup线程会等待到keep-alive时间结束又或者是Transmitter主动去释放一个Connection
而接下来的关注点其实应该是怎么样去判断Connection没有被引用
RealConnectionPool-> pruneAndGetAllocationCount
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<Transmitter>> references = connection.transmitters;
for (int i = 0; i < references.size(); ) {
Reference<Transmitter> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked transmitter. This is an application bug.
TransmitterReference transmitterRef = (TransmitterReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
references.remove(i);
connection.noNewExchanges = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
其实就是通过遍历在connection中维护的transimitter引用列表,通过判断reference是否为null就知道connection是否被引用
总结
看了这么长的方法流程,先小结一下
起点是ExchangeFinder的findConnection方法,在寻找连接的过程会,会先从transimitter获取,然后从连接池根据有无路由获取
而连接池对ExchangeFinder提供了连接复用的机制,通过维护一个Deque把连接存储在其中
而在连接池中通过后台线程实现无用连接的清除,而连接池还暴露了接口给ExchangeFinder,让它可以创建一个连接并添加到连接池中