OkHttp源码解析(一)请求与响应流程

Android框架学习

Posted by Cc1over on January 10, 2020

OkHttp源码解析(一)请求与响应流程

前言

前段时间再回顾计算机网络相关的知识点,但是毕竟太理论化,然后我们项目里是基于OkHttp封装的一个网络库,所以最近一段时间想基于OkHttp走一下流程,做一波计网理论知识的实践

Request

/**
 * An HTTP request. Instances of this class are immutable if their {@link #body} is null or itself
 * immutable.
 */
public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Map<Class<?>, Object> tags;
  // ......
}

Request是OkHttp中抽象的Http请求的数据结构

OkHttpClient-> newCall

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

RealCall-> newRealCall

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
 }

RealCall

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
}

在RealCall这个构造函数,就是做一些简单的赋值工作

Transmitter

public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
} 

Transmitter的构造函数也是做了一些初始化的工作

RealCall-> enqueue

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

主要的逻辑就是RealCall构造函数中创建的transmitter的callStart方法

然后构建一个AsyncCall包裹一个Callback,并传到OkHttpClient的dispatcher中

Dispatcher-> enqueue

void enqueue(AsyncCall call) {
    synchronized (this) {
      // 把当前的AsyncCall添加到等待队列中
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        // 寻找同一个host的AsyncCall
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        // 复用Call的callsPerHost
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    // 实施执行
    promoteAndExecute();
  }

主要逻辑就是把上一步创建的AsyncCall添加到等待队列中

然后寻找同一个host的AsyncCall对象,然后复用这个CallsPerHost

最后转调promoteAndExecute方法执行

Dispatcher-> findExistingCallWithHost

@Nullable private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
  }

主要逻辑就是从Dispatcher中的running还ready队列中寻找相同host的AsyncCall

Dispatcher-> reuseCallsPerHostFrom

void reuseCallsPerHostFrom(AsyncCall other) {
      this.callsPerHost = other.callsPerHost;
 }

所谓的复用就是把参数中的AsyncCall对应的perHost保存起来

Dispatcher-> promoteAndExecute

 private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
    // 创建一个执行Call列表
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
        // 判断正在执行AsyncCall是否大于大于最大值,如果是就直接退出
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        // 判断AsyncCall中AtomicInteger也就是host是否打包  
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
        // 前面的验证都不满足,则从预备队列中移除
        i.remove();
        // 给AsyncCall中preHost计数器+1  
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

主要的逻辑就是创建可执行Call的列表,然后遍历等待队列,从中取出对应的AsyncCall

对执行队列还有AsyncCall中的计数器进行验证

然后把这个AsyncCall从等待队列中移除,添加到executable队列以及运行队列中

然后就遍历这个可执行队列转调AsyncCall的executeOn方法对Call进行执行

AsyncCall-> executeOn

void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

主要的逻辑就是让线程池执行当前的AsyncCall对象,AsyncCall本来就是一个Runnable

然后如果成功就调用dispatcher的finished方法

Dispatcher-> executorService

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher会构建一个线程池用于AsyncCall的调度,我们可以注意到这个线程池的核心线程数为0,然后非核心线程数为Integer.MAX_VALUE,这里的设计思想就是线程池不再去管理线程的数目,而是完全交给Dispatcher来做,然后Dispatcher通过运行队列和host数目来实现线程数目的管理

Dispatcher-> finished

void finished(AsyncCall call) {
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }

call中host计数器的减少,转调finished方法

Dispatcher-> finished

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

主要逻辑就是把Call从运行队列中移除,然后尝试去执行等待队列中Call,如果不成功,那就说明已经没有任务在等待了,所以这个时候就可以让执行空闲任务的线程去跑了

RealCall-> execute

 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
      client.dispatcher().executed(this);
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }

主要的逻辑依然是调用OkHttpClient中的Dispatcher然后主动调executed方法执行这个RealCall对象,这时候就不再是AsyncCall了,最后不管是否成功都会调Dispatcher的finished方法

然后会通过getResponseWithInterceptorChain方法返回一个Response,应该就是处理Ok中的拦截器,不过感觉其实AsyncCall应该也是一样的,只不过还没看到AsyncCall的run方法而已

Dispatcher-> executed

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

其实就是把构建的RealCall添加到运行队列中,与enqueue不同的是这个是runningSyncCalls

小结

到这里其实Ok的Request的处理流程就结束了,做个小结

首先我们在发送请求需要Request和OkHttpClient对象,对外界来说,OkHttpClient是一个Request的处理中心

OkHttpClient会让传入的这个Request和自身进行绑定然后生成一个RealCall对象

然后RealCall对象根据同步异步两种情况对外提供接口,并根据不同情况交给Dispatcher执行调度,异步执行AsyncCall,同步执行RealCall

对Ok来说Request是Http请求抽象出来的一种数据结构,而Call是Dispatcher执行调度的一个单位

而Dispatcher调度的原理其实就是维护了各种队列:

  • 对同步调度,把RealCall执行添加到运行队列
  • 对异步队列,把AsyncCall添加到等待队列,然后后续会遍历等待队列,把满足条件的AsyncCall添加到运行队列中
  • 然后添加了一个类似Handler中idleHandler的机制,在一个任务结束了之后会Call会通知它的Dispatcher,然后Dispatcher会尝试从等待队列中再取出Call出来执行,而如果取不到就执行空闲处理

AsyncCall-> execute

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        client.dispatcher().finished(this);
      }
    }

AsyncCall的run方法在父类中被阉割了,阉割成了execute方法,然后其实也和想的一样,AsyncCall虽然交给线程池执行,但是核心和同步一样还是通过getResponseWithInterceptorChain拿到一个Response

所以getResponseWithInterceptorChain方法其实就是Request转到Response的一个关键了

RealCall-> getResponseWithInterceptorChain

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

先把OkHttpClient也就是外界调用者设置进来的Interceptor添加到interceptors这个容器中

然后就按顺序添加Ok中自带了几个拦截器,然后创建一个Chain对象,然后转调Chain的proceed方法拿到一个Respone

Interceptor

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    /**
     * Returns the connection the request will be executed on. This is only available in the chains
     * of network interceptors; for application interceptors this is always null.
     */
    @Nullable Connection connection();

    Call call();

    int connectTimeoutMillis();

    Chain withConnectTimeout(int timeout, TimeUnit unit);

    int readTimeoutMillis();

    Chain withReadTimeout(int timeout, TimeUnit unit);

    int writeTimeoutMillis();

    Chain withWriteTimeout(int timeout, TimeUnit unit);
  }
}

RealInterceptorChain-> proceed

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, transmitter, exchange);
  }

RealInterceptorChain-> proceed

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;
 
    // ......
    
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

   // ......

    return response;
  }

主要逻辑就是通过index从interceptors容器中获取对应的Interceptor,然后index自增构建一个新的RealInterceptorChain,也就代表从下一个节点开始构建了一条新的责任链

实际Response生成的逻辑是通过Interceptor的intercept方法实现的

而实际上的OkHttp的对网络的处理和逻辑都是交由具体的Interceptor来实现的

intercept方法主要流程

@Override 
public Response intercept(Chain chain) throws IOException {
    // ......
    Request request = chain.request();
    // ......
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    // ......
    return response;     
}

拦截器的intercept方法会先从RealInterceptorChain中拿到Request,然后在到RealInterceptorChain的proceed方法

结合proceed和intercept两个方法可以理解到OkHttp的设计,先从开始到结尾处理一波Request,然后再从尾到头处理一波Response

所以这个时候,拦截器的设置顺序就比较重要了

RealCall-> getResponseWithInterceptorChain

Response getResponseWithInterceptorChain() throws IOException {   
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // ......
}    

拦截器的执行顺序就是:

  • 用户自定义的interceptors
  • RetryAndFollowUpInterceptor:实现重定向
  • BridgeInterceptor:负责把用户构造的请求转换为发送给服务器的请求,把服务器返回的响应转换为对用户友好的响应
  • CacheInterceptor:负责读取缓存以及更新缓存
  • ConnectInterceptor:负责与服务器建立连接
  • 用户自定义networkInterceptors
  • CallServerInterceptor:负责从服务器读取响应的数据

所以这里其实体现了OkHttpClient暴露给外界两种拦截器的区别:

  • interceptors:在网络请求前后执行
  • networkInterceptors:在读取响应前后执行