OkHttp源码解析(一)请求与响应流程
前言
前段时间再回顾计算机网络相关的知识点,但是毕竟太理论化,然后我们项目里是基于OkHttp封装的一个网络库,所以最近一段时间想基于OkHttp走一下流程,做一波计网理论知识的实践
Request
/**
* An HTTP request. Instances of this class are immutable if their {@link #body} is null or itself
* immutable.
*/
public final class Request {
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
final Map<Class<?>, Object> tags;
// ......
}
Request是OkHttp中抽象的Http请求的数据结构
OkHttpClient-> newCall
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
RealCall-> newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
RealCall
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
}
在RealCall这个构造函数,就是做一些简单的赋值工作
Transmitter
public Transmitter(OkHttpClient client, Call call) {
this.client = client;
this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
this.call = call;
this.eventListener = client.eventListenerFactory().create(call);
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
Transmitter的构造函数也是做了一些初始化的工作
RealCall-> enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
主要的逻辑就是RealCall构造函数中创建的transmitter的callStart方法
然后构建一个AsyncCall包裹一个Callback,并传到OkHttpClient的dispatcher中
Dispatcher-> enqueue
void enqueue(AsyncCall call) {
synchronized (this) {
// 把当前的AsyncCall添加到等待队列中
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
// 寻找同一个host的AsyncCall
AsyncCall existingCall = findExistingCallWithHost(call.host());
// 复用Call的callsPerHost
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
// 实施执行
promoteAndExecute();
}
主要逻辑就是把上一步创建的AsyncCall添加到等待队列中
然后寻找同一个host的AsyncCall对象,然后复用这个CallsPerHost
最后转调promoteAndExecute方法执行
Dispatcher-> findExistingCallWithHost
@Nullable private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
主要逻辑就是从Dispatcher中的running还ready队列中寻找相同host的AsyncCall
Dispatcher-> reuseCallsPerHostFrom
void reuseCallsPerHostFrom(AsyncCall other) {
this.callsPerHost = other.callsPerHost;
}
所谓的复用就是把参数中的AsyncCall对应的perHost保存起来
Dispatcher-> promoteAndExecute
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
// 创建一个执行Call列表
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
// 判断正在执行AsyncCall是否大于大于最大值,如果是就直接退出
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
// 判断AsyncCall中AtomicInteger也就是host是否打包
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
// 前面的验证都不满足,则从预备队列中移除
i.remove();
// 给AsyncCall中preHost计数器+1
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
主要的逻辑就是创建可执行Call的列表,然后遍历等待队列,从中取出对应的AsyncCall
对执行队列还有AsyncCall中的计数器进行验证
然后把这个AsyncCall从等待队列中移除,添加到executable队列以及运行队列中
然后就遍历这个可执行队列转调AsyncCall的executeOn方法对Call进行执行
AsyncCall-> executeOn
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
主要的逻辑就是让线程池执行当前的AsyncCall对象,AsyncCall本来就是一个Runnable
然后如果成功就调用dispatcher的finished方法
Dispatcher-> executorService
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Dispatcher会构建一个线程池用于AsyncCall的调度,我们可以注意到这个线程池的核心线程数为0,然后非核心线程数为Integer.MAX_VALUE,这里的设计思想就是线程池不再去管理线程的数目,而是完全交给Dispatcher来做,然后Dispatcher通过运行队列和host数目来实现线程数目的管理
Dispatcher-> finished
void finished(AsyncCall call) {
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
call中host计数器的减少,转调finished方法
Dispatcher-> finished
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
主要逻辑就是把Call从运行队列中移除,然后尝试去执行等待队列中Call,如果不成功,那就说明已经没有任务在等待了,所以这个时候就可以让执行空闲任务的线程去跑了
RealCall-> execute
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
主要的逻辑依然是调用OkHttpClient中的Dispatcher然后主动调executed方法执行这个RealCall对象,这时候就不再是AsyncCall了,最后不管是否成功都会调Dispatcher的finished方法
然后会通过getResponseWithInterceptorChain方法返回一个Response,应该就是处理Ok中的拦截器,不过感觉其实AsyncCall应该也是一样的,只不过还没看到AsyncCall的run方法而已
Dispatcher-> executed
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
其实就是把构建的RealCall添加到运行队列中,与enqueue不同的是这个是runningSyncCalls
小结
到这里其实Ok的Request的处理流程就结束了,做个小结
首先我们在发送请求需要Request和OkHttpClient对象,对外界来说,OkHttpClient是一个Request的处理中心
OkHttpClient会让传入的这个Request和自身进行绑定然后生成一个RealCall对象
然后RealCall对象根据同步异步两种情况对外提供接口,并根据不同情况交给Dispatcher执行调度,异步执行AsyncCall,同步执行RealCall
对Ok来说Request是Http请求抽象出来的一种数据结构,而Call是Dispatcher执行调度的一个单位
而Dispatcher调度的原理其实就是维护了各种队列:
- 对同步调度,把RealCall执行添加到运行队列
- 对异步队列,把AsyncCall添加到等待队列,然后后续会遍历等待队列,把满足条件的AsyncCall添加到运行队列中
- 然后添加了一个类似Handler中idleHandler的机制,在一个任务结束了之后会Call会通知它的Dispatcher,然后Dispatcher会尝试从等待队列中再取出Call出来执行,而如果取不到就执行空闲处理
AsyncCall-> execute
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
AsyncCall的run方法在父类中被阉割了,阉割成了execute方法,然后其实也和想的一样,AsyncCall虽然交给线程池执行,但是核心和同步一样还是通过getResponseWithInterceptorChain拿到一个Response
所以getResponseWithInterceptorChain方法其实就是Request转到Response的一个关键了
RealCall-> getResponseWithInterceptorChain
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
先把OkHttpClient也就是外界调用者设置进来的Interceptor添加到interceptors这个容器中
然后就按顺序添加Ok中自带了几个拦截器,然后创建一个Chain对象,然后转调Chain的proceed方法拿到一个Respone
Interceptor
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
/**
* Returns the connection the request will be executed on. This is only available in the chains
* of network interceptors; for application interceptors this is always null.
*/
@Nullable Connection connection();
Call call();
int connectTimeoutMillis();
Chain withConnectTimeout(int timeout, TimeUnit unit);
int readTimeoutMillis();
Chain withReadTimeout(int timeout, TimeUnit unit);
int writeTimeoutMillis();
Chain withWriteTimeout(int timeout, TimeUnit unit);
}
}
RealInterceptorChain-> proceed
@Override public Response proceed(Request request) throws IOException {
return proceed(request, transmitter, exchange);
}
RealInterceptorChain-> proceed
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// ......
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// ......
return response;
}
主要逻辑就是通过index从interceptors容器中获取对应的Interceptor,然后index自增构建一个新的RealInterceptorChain,也就代表从下一个节点开始构建了一条新的责任链
实际Response生成的逻辑是通过Interceptor的intercept方法实现的
而实际上的OkHttp的对网络的处理和逻辑都是交由具体的Interceptor来实现的
intercept方法主要流程
@Override
public Response intercept(Chain chain) throws IOException {
// ......
Request request = chain.request();
// ......
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
// ......
return response;
}
拦截器的intercept方法会先从RealInterceptorChain中拿到Request,然后在到RealInterceptorChain的proceed方法
结合proceed和intercept两个方法可以理解到OkHttp的设计,先从开始到结尾处理一波Request,然后再从尾到头处理一波Response
所以这个时候,拦截器的设置顺序就比较重要了
RealCall-> getResponseWithInterceptorChain
Response getResponseWithInterceptorChain() throws IOException {
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
// ......
}
拦截器的执行顺序就是:
- 用户自定义的interceptors
- RetryAndFollowUpInterceptor:实现重定向
- BridgeInterceptor:负责把用户构造的请求转换为发送给服务器的请求,把服务器返回的响应转换为对用户友好的响应
- CacheInterceptor:负责读取缓存以及更新缓存
- ConnectInterceptor:负责与服务器建立连接
- 用户自定义networkInterceptors
- CallServerInterceptor:负责从服务器读取响应的数据
所以这里其实体现了OkHttpClient暴露给外界两种拦截器的区别:
- interceptors:在网络请求前后执行
- networkInterceptors:在读取响应前后执行