前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >OkHttp源码走心解析(很细 很长)

OkHttp源码走心解析(很细 很长)

作者头像
没关系再继续努力
发布2021-12-16 16:05:55
9780
发布2021-12-16 16:05:55
举报
文章被收录于专栏:Android面试

前言

本文是对OkHttp开源库的一个详细解析,如果你觉得自己不够了解OkHttp,想进一步学习一下,相信本文对你会有所帮助。

本文包含了详细的请求流程分析、各大拦截器解读以及自己的一点反思总结,文章很长,欢迎大家一起交流讨论。

使用方法

使用方法十分简单,分别创建一个OkHttpClient对象,一个Request对象,然后利用他们创建一个Call对象,最后调用同步请求execute()方法或者异步请求enqueue()方法来拿到Response

代码语言:javascript
复制
private final OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder()
      .url("https://github.com/")
      .build();
    //同步请求
    Response response = client.newCall(request).execute();
    //todo handle response

    //异步请求
    client.newCall(request).enqueue(new Callback() {
      @Override
      public void onFailure(@NotNull Call call, @NotNull IOException e) {
      //todo handle request failed
      }

      @Override
      public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
          //todo handle Response
      }
    });

基本对象介绍

正如使用方法中所述,我们先后构建了 OkHttpClient对象、Request对象、Call对象,那这些对象都是什么意思,有什么作用呢?这个就需要我们进一步学习了解了。

OkHttpClient

一个请求的配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeoutcookieinterceptor等等。

代码语言:javascript
复制
open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  constructor() : this(Builder())

  class Builder constructor() {
    //调度器
    internal var dispatcher: Dispatcher = Dispatcher()
    //连接池
    internal var connectionPool: ConnectionPool = ConnectionPool()
    //整体流程拦截器
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    //网络流程拦截器
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    //流程监听器
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    //连接失败时是否重连
    internal var retryOnConnectionFailure = true
    //服务器认证设置
    internal var authenticator: Authenticator = Authenticator.NONE
    //是否重定向
    internal var followRedirects = true
    //是否从HTTP重定向到HTTPS
    internal var followSslRedirects = true
    //cookie设置
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    //缓存设置
    internal var cache: Cache? = null
    //DNS设置
    internal var dns: Dns = Dns.SYSTEM
    //代理设置
    internal var proxy: Proxy? = null
    //代理选择器设置
    internal var proxySelector: ProxySelector? = null
    //代理服务器认证设置
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    //socket配置
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    //https socket配置
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    //协议
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    //域名校验
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    //请求超时
    internal var callTimeout = 0
    //连接超时
    internal var connectTimeout = 10_000
    //读取超时
    internal var readTimeout = 10_000
    //写入超时
    internal var writeTimeout = 10_000
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    internal var routeDatabase: RouteDatabase? = null
    
···省略代码···

Request

同样是请求参数的配置类,也同样采用了建造者模式,但相比于OkHttpClientRequest就十分简单了,只有四个参数,分别是请求URL请求方法请求头请求体

代码语言:javascript
复制
class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,
  @get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") val body: RequestBody?,
  internal val tags: Map<Class<*>, Any>
) {

  open class Builder {
    //请求的URL
    internal var url: HttpUrl? = null
    //请求方法,如:GET、POST..
    internal var method: String
    //请求头
    internal var headers: Headers.Builder
    //请求体
    internal var body: RequestBody? = null
  ···省略代码···

Call

请求调用接口,表示这个请求已经准备好可以执行,也可以取消只能执行一次

代码语言:javascript
复制
interface Call : Cloneable {
  /** 返回发起此调用的原始请求 */
  fun request(): Request

  /**
   * 同步请求,立即执行。
   * 
   * 抛出两种异常:
   * 1. 请求失败抛出IOException;
   * 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/
  @Throws(IOException::class)
  fun execute(): Response

  /**
   * 异步请求,将请求安排在将来的某个时间点执行。
   * 如果在执行过一回的前提下再次执行抛出IllegalStateException */
  fun enqueue(responseCallback: Callback)

  /** 取消请求。已经完成的请求不能被取消 */
  fun cancel()

  /** 是否已被执行  */
  fun isExecuted(): Boolean

  /** 是否被取消   */
  fun isCanceled(): Boolean

  /** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
  fun timeout(): Timeout

  /** 克隆这个call,创建一个新的相同的Call */
  public override fun clone(): Call

  /** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
  fun interface Factory {
    fun newCall(request: Request): Call
  }
}

RealCall

OkHttpClient 中,我们利用 newCall 方法来创建一个 Call 对象,但从源码中可以看出,newCall 方法返回的是一个 RealCall 对象。

代码语言:javascript
复制
OkHttpClient.kt

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCallCall接口的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response及其它数据流。 通过使用方法也可知,创建RealCall对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute()异步请求 enqueue()方法。(后面具体展开分析)

AsyncCall

异步请求调用,是RealCall的一个内部类,就是一个Runnable,被调度器中的线程池所执行。

代码语言:javascript
复制
inner class AsyncCall(
    //用户传入的响应回调方法
    private val responseCallback: Callback
  ) : Runnable {
    //同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

···省略代码···

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        //调用线程池执行
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        //请求失败,调用 Callback.onFailure() 方法
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          //请求失败,调用调度器finish方法
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          //请求成功,获取到服务器返回的response
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          //调用 Callback.onResponse() 方法,将 response 传递出去
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            //请求失败,调用 Callback.onFailure() 方法
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          //请求出现异常,调用cancel方法来取消请求
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            //请求失败,调用 Callback.onFailure() 方法
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          //请求结束,调用调度器finish方法
          client.dispatcher.finished(this)
        }
      }
    }
  }

Dispatcher

调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。

代码语言:javascript
复制
class Dispatcher constructor() {
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        //创建一个缓存线程池,来处理请求调用
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  /** 已准备好的异步请求队列 */
  @get:Synchronized
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
  private val runningSyncCalls = ArrayDeque<RealCall>()

···省略代码···
}

总结一下

对象

作用

Call

请求调用接口,表示这个请求已经准备好可以执行,也可以被取消,只能执行一次。

RealCall

Call接口的具体实现类,是应用与网络层之间的连接桥,包含OkHttpClient与Request信息。

AsyncCall

异步请求调用,其实就是个Runnable,会被放到线程池中进行处理。

Dispatcher

调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。

Request

请求类,包含url、method、headers、body。

Response

网络层返回的响应数据。

Callback

响应回调函数接口,包含onFailure、onResponse 两个方法。

流程分析

介绍完了对象,接下来就根据使用方法,具体看一下源码吧。

同步请求

同步请求的使用方法。

代码语言:javascript
复制
client.newCall(request).execute();

newCall方法就是创建一个RealCall对象,然后执行其execute()方法。

代码语言:javascript
复制
  RealCall.kt
  
  override fun execute(): Response {
    //CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    //请求超时开始计时
    timeout.enter()
    //开启请求监听
    callStart()
    try {
      //调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
      client.dispatcher.executed(this)
      //调用getResponseWithInterceptorChain 方法拿到 response
      return getResponseWithInterceptorChain()
    } finally {
      //执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
      client.dispatcher.finished(this)
    }
  }

调用调度器executed方法,就是将当前的RealCall对象加入到runningSyncCalls队列中,然后调用getResponseWithInterceptorChain方法拿到response

异步请求

在来看看异步请求。

代码语言:javascript
复制
  RealCall.kt

  override fun enqueue(responseCallback: Callback) {
    //CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    //开启请求监听
    callStart()
    //新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

然后调用调度器的enqueue方法,

代码语言:javascript
复制
  Dispatcher.kt
  
  internal fun enqueue(call: AsyncCall) {
    //加锁,保证线程安全
    synchronized(this) {
      //将该请求调用加入到 readyAsyncCalls 队列中
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        //通过域名来查找有没有相同域名的请求,有则复用。
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //执行请求
    promoteAndExecute()
  }


  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    //判断是否有请求正在执行
    val isRunning: Boolean
    //加锁,保证线程安全
    synchronized(this) {
      //遍历 readyAsyncCalls 队列
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        //runningAsyncCalls 的数量不能大于最大并发请求数 64
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        //同域名最大请求数5,同一个域名最多允许5条线程同时执行请求
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        //从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      //通过运行队列中的请求数量来判断是否有请求正在执行
      isRunning = runningCallsCount() > 0
    }

    //遍历可执行队列,调用线程池来执行AsyncCall
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

调度器的enqueue方法就是将AsyncCall加入到readyAsyncCalls队列中,然后调用promoteAndExecute方法来执行请求,promoteAndExecute方法做的其实就是遍历readyAsyncCalls队列,然后将符合条件的请求用线程池执行,也就是会执行AsyncCall.run()方法。

AsyncCall 方法的具体代码看基本对象介绍 AsyncCall,这边就不在此展示了,简单来说就是调用getResponseWithInterceptorChain方法拿到response,然后通过Callback.onResponse方法传递出去。反之,如果请求失败,捕获了异常,就通过Callback.onFailure将异常信息传递出去。 最终,请求结束,调用调度器finish方法。

代码语言:javascript
复制
  Dispatcher.kt

  /** 异步请求调用结束方法 */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** 同步请求调用结束方法 */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      //将当前请求调用从 正在运行队列 中移除
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    //继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      //如果执行完了所有请求,处于闲置状态,调用闲置回调方法
      idleCallback.run()
    }
  }

获取Response

接着就是看看getResponseWithInterceptorChain方法是如何拿到response的。

代码语言:javascript
复制
  internal fun getResponseWithInterceptorChain(): Response {
    //拦截器列表
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    //构建拦截器责任链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )
    //如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了
    var calledNoMoreExchanges = false
    try {
      //执行拦截器责任链来获取 response
      val response = chain.proceed(originalRequest)
      //如果被取消,关闭响应,抛出异常
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

简单概括一下:这里采用了责任链设计模式,通过拦截器构建了以RealInterceptorChain责任链,然后执行proceed方法来得到response

那么,这又涉及拦截器是什么?拦截器责任链又是什么?

Interceptor

只声明了一个拦截器方法,在子类中具体实现,还包含一个Chain接口,核心方法是proceed(request)处理请求来获取response

代码语言:javascript
复制
fun interface Interceptor {
  /** 拦截方法 */
  @Throws(IOException::class)
  fun intercept(chain: Chain): Response

  interface Chain {
    /** 原始请求数据 */
    fun request(): Request

    /** 核心方法,处理请求,获取response */
    @Throws(IOException::class)
    fun proceed(request: Request): Response
    
    fun connection(): Connection?

    fun call(): Call

    fun connectTimeoutMillis(): Int

    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain

    fun readTimeoutMillis(): Int

    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain

    fun writeTimeoutMillis(): Int

    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}

RealInterceptorChain

拦截器链就是实现Interceptor.Chain接口,重点就是复写的proceed方法。

代码语言:javascript
复制
class RealInterceptorChain(
  internal val call: RealCall,
  private val interceptors: List<Interceptor>,
  private val index: Int,
  internal val exchange: Exchange?,
  internal val request: Request,
  internal val connectTimeoutMillis: Int,
  internal val readTimeoutMillis: Int,
  internal val writeTimeoutMillis: Int
) : Interceptor.Chain {

···省略代码···
  private var calls: Int = 0
  override fun call(): Call = call
  override fun request(): Request = request

  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    //index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器
    val next = copy(index = index + 1, request = request)
    //取出当前拦截器
    val interceptor = interceptors[index]

    //执行当前拦截器的拦截方法
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
}

链式调用,最终会执行拦截器列表中的每个拦截器,返回Response

拦截器

OK,接下来就该看看拦截器列表中的具体拦截器了。

先上各类拦截器的总结,按顺序:

  1. client.interceptors:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header自定义log等等。
  2. RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
  3. BridgeInterceptor:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。
  4. CacheInterceptor:这里主要是缓存的相关处理,会根据用户在OkHttpClient里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response
  5. ConnectInterceptor:这里主要就是负责建立连接,会建立TCP连接或者TLS连接
  6. client.networkInterceptors:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。
  7. CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response

接下来我们按顺序,从上往下,对这些拦截器进行一一解读。

client.interceptors

这是用户自己定义的拦截器,称为应用拦截器,会保存在OkHttpClientinterceptors: List<Interceptor>列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦截方法,我们可以通过它来添加自定义Header信息,如:

代码语言:javascript
复制
class HeaderInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request().newBuilder()
                .addHeader("device-android", "xxxxxxxxxxx")
                .addHeader("country-code", "ZH")
                .build();
        return chain.proceed(request);
    }
}

//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()
    .connectTimeout(60, TimeUnit.SECONDS)
    .readTimeout(15, TimeUnit.SECONDS)
    .writeTimeout(15, TimeUnit.SECONDS)
    .cookieJar(new MyCookieJar())
    .addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器
    .build();

RetryAndFollowUpInterceptor

第二个拦截器,从它的名字也可知道,它负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。

代码语言:javascript
复制
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      //这里会新建一个ExchangeFinder,ConnectInterceptor会使用到
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          //尝试通过路由连接失败。该请求将不会被发送。
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          //尝试与服务器通信失败。该请求可能已发送。
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        //尝试关联上一个response,注意:body是为null
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        //会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }
        //如果请求体是一次性的,不需要再次重试
        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()

        //最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }

  /** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/
  private fun recover(
    e: IOException,
    call: RealCall,
    userRequest: Request,
    requestSendStarted: Boolean
  ): Boolean {
    //客户端禁止重试
    if (!client.retryOnConnectionFailure) return false

    //不能再次发送该请求体
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    //发生的异常是致命的,无法恢复,如:ProtocolException
    if (!isRecoverable(e, requestSendStarted)) return false

    //没有更多的路由来尝试重连
    if (!call.retryAfterFailure()) return false

    // 对于失败恢复,使用带有新连接的相同路由选择器
    return true
  }
···省略代码··· 

BridgeInterceptor

从它的名字可以看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加 Content-Type,添加 Cookie,添加User-Agent等等。再将服务器返回的response做一些处理转换为客户端需要的response。比如:移除响应头中的Content-EncodingContent-Length等等。

代码语言:javascript
复制
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    //获取原始请求数据
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()
    //重新构建请求头,请求体信息
    val body = userRequest.body

    val contentType = body.contentType()
    requestBuilder.header("Content-Type", contentType.toString())
    requestBuilder.header("Content-Length", contentLength.toString())
    requestBuilder.header("Transfer-Encoding", "chunked")
    requestBuilder.header("Host", userRequest.url.toHostHeader())
    requestBuilder.header("Connection", "Keep-Alive")

   ···省略代码···
   
    //添加cookie
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }
    //添加user-agent
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
    //重新构建一个Request,然后执行下一个拦截器来处理该请求
    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    //创建一个新的responseBuilder,目的是将原始请求数据构建到response中
    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        //修改response header信息,移除Content-Encoding,Content-Length信息
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        //修改response body信息
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }
    
    return responseBuilder.build()
   
···省略代码···

CacheInterceptor

用户可以通过OkHttpClient.cache来配置缓存,缓存拦截器通过CacheStrategy来判断是使用网络还是缓存来构建response

代码语言:javascript
复制
class CacheInterceptor(internal val cache: Cache?) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    //通过request从OkHttpClient.cache中获取缓存
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
    //创建一个缓存策略,用来确定怎么使用缓存
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    //为空表示不使用网络,反之,则表示使用网络
    val networkRequest = strategy.networkRequest
    //为空表示不使用缓存,反之,则表示使用缓存
    val cacheResponse = strategy.cacheResponse
    //追踪网络与缓存的使用情况
    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
    //有缓存但不适用,关闭它
    if (cacheCandidate != null && cacheResponse == null) {
      cacheCandidate.body?.closeQuietly()
    }

    //如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    //如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }
    //为缓存添加监听
    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      //责任链往下处理,从服务器返回response 赋值给 networkResponse
      networkResponse = chain.proceed(networkRequest)
    } finally {
      //捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    //如果有缓存
    if (cacheResponse != null) {
      //且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        //否则关闭缓存响应体
        cacheResponse.body?.closeQuietly()
      }
    }

    //构建网络请求的response
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    //如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中
    if (cache != null) {
      //根据response的code,header以及CacheControl.noStore来判断是否可以缓存
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // 将该response存入缓存
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            listener.cacheMiss(call)
          }
        }
      }
      //根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          //缓存无效,将该请求缓存从client缓存配置中移除
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }
  
···省略代码···  

ConnectInterceptor

负责实现与服务器真正建立起连接,

代码语言:javascript
复制
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //初始化一个exchange对象
    val exchange = realChain.call.initExchange(chain)
    //根据这个exchange对象来复制创建一个新的连接责任链
    val connectedChain = realChain.copy(exchange = exchange)
    //执行该连接责任链
    return connectedChain.proceed(realChain.request)
  }
}

一扫下来,代码十分简单,拦截方法里就只有三步。

  1. 初始化一个exchange对象。
  2. 然后根据这个exchange对象来复制创建一个新的连接责任链。
  3. 执行该连接责任链。

那这个exchange对象又是什么呢?

代码语言:javascript
复制
RealCall.kt

internal fun initExchange(chain: RealInterceptorChain): Exchange {
    ...省略代码...
    //这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
    val exchangeFinder = this.exchangeFinder!!
    //返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
    val codec = exchangeFinder.find(client, chain)
    //根据exchangeFinder与codec新构建一个Exchange对象,并返回
    val result = Exchange(this, eventListener, exchangeFinder, codec)
  ...省略代码...
    return result
  }

具体看看ExchangeFinder.find()这一步,

代码语言:javascript
复制
ExchangeFinder.kt

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      //查找合格可用的连接,返回一个 RealConnection 对象
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

继续往下看findHealthyConnection方法

代码语言:javascript
复制
ExchangeFinder.kt

  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      //重点:查找连接
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )
      //检查该连接是否合格可用,合格则直接返回该连接
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
      //如果该连接不合格,标记为不可用,从连接池中移除
      candidate.noNewExchanges()
    ...省略代码...
    }
  }

简单概括一下就是:通过findConnection方法来查找连接,找到连接后判断是否是合格可用的,合格就直接返回该连接。

所以核心方法就是findConnection,我们继续深入看看该方法:

代码语言:javascript
复制
private fun findConnection(
    connectTimeout: Int, 
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    //第一次,尝试重连 call 中的 connection,不需要去重新获取连接
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      //如果 call 中的 connection 还没有释放,就重用它。
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      //如果 call 中的 connection 已经被释放,关闭Socket.
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    //需要一个新的连接,所以重置一些状态
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    //第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    //连接池中是空的,准备下次尝试连接的路由
    val routes: List<Route>?
    val route: Route
    
    ...省略代码...

      //第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    //第四次,手动创建一个新连接
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    //第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
    //这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      //将手动创建的新连接放入连接池
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }

在代码中可以看出,一共做了5次尝试去得到连接:

  1. 第一次,尝试重连 call 中的 connection,不需要去重新获取连接。
  2. 第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用。
  3. 第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用。
  4. 第四次,手动创建一个新连接。
  5. 第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。

OK,到了这一步,就算建立起了连接。

client.networkInterceptors

该拦截器称为网络拦截器,与client.interceptors一样也是由用户自己定义的,同样是以列表的形式存在OkHttpClient中。

那这两个拦截器有什么不同呢?

其实他两的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的位置,它不一定会被执行,而且可能会被执行多次,比如:在RetryAndFollowUpInterceptor失败或者CacheInterceptor直接返回缓存的情况下,我们的网络拦截器是不会被执行的。

CallServerInterceptor

到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的response了。

代码语言:javascript
复制
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    try {
      //写入请求头
      exchange.writeRequestHeaders(request)
      //如果不是GET请求,并且请求体不为空
      if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
        //当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
        //POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
          //刷新请求,即发送请求头
          exchange.flushRequest()
          //解析响应头
          responseBuilder = exchange.readResponseHeaders(expectContinue = true)
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
        //写入请求体
        if (responseBuilder == null) {
          if (requestBody.isDuplex()) {
            //如果请求体是双公体,就先发送请求头,稍后在发送请求体
            exchange.flushRequest()
            val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
            //写入请求体
            requestBody.writeTo(bufferedRequestBody)
          } else {
            //如果获取到了"Expect: 100-continue"响应,写入请求体
            val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
            requestBody.writeTo(bufferedRequestBody)
            bufferedRequestBody.close()
          }
       ···省略代码···
        //请求结束,发送请求体
        exchange.finishRequest()
    ···省略代码···

    try {
      if (responseBuilder == null) {
        //读取响应头
        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
        ···省略代码···
      //构建一个response
      var response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      var code = response.code
      ···省略代码···
      return response

···省略代码···

简单概括一下:写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()方法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。

拦截器流程图
拦截器流程图

总结

我们分析了请求的流程,包括同步请求与异步请求,还仔细分析了拦截器责任链中的每个拦截器,现在画一个流程图,简单总结一下,你可以对照着流程图,在走一遍流程。

完整流程图
完整流程图

反思

设计模式

  1. 建造者模式:不论是在OkHttpClientRequest还是Response中都用到了建造者模式,因为这几个类中都有很多参数,需要供用户选择需要的参数来构建其想要的实例,所以在开源库中,Build模式是很常见的。
  2. 工厂方法模式:帮助生成复杂对象,如: OkHttpClient.newCall(request Request) 来创建 Call 对象
  3. 责任链模式:这个就用的很绝妙了,将7个拦截器构成拦截器责任链,然后按顺序从上往下执行,得到Response后,从下往上传回去。

线程安全

AsyncCall 类中的 callsPerHost 变量,使用了 Volatile + AtomicInteger来修饰,从而保证在多线程下的线程安全。

代码语言:javascript
复制
inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    //同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
    @Volatile var callsPerHost = AtomicInteger(0)
      private set
    ...省略代码...

数据结构

为什么readyAsyncCalls runningAsyncCalls runningSyncCalls采用ArrayDeque呢?

两个点回答、他们都是用来存放网络请求的,这些请求需要做到先到先得,所以采用队列。、根据代码所示,当执行enqueue时,我们需要遍历readyAsyncCalls,将符合执行条件的Call加入到runningAsyncCalls,这相对比于链表来说,数组的查找效率要更高,所以采用ArrayDeque

结尾

到此,关于OkHttp的源码解析就介绍啦。

其实学习源码的最好方式,就是自己将代码克隆下来,然后对着使用方法,按流程,一步一步往下走。

其实分享文章的最大目的正是等待着有人指出我的错误,如果你发现哪里有错误,请毫无保留的指出即可,虚心请教。

视频:

资深架构师逐题详解Android大厂精选高频面试题之OkHttp

Android(安卓)开发零基础从入门到精通之OkHttp

原文: https://juejin.cn/post/7033307467199021086

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 使用方法
  • 基本对象介绍
    • OkHttpClient
      • Request
        • Call
          • RealCall
            • AsyncCall
              • Dispatcher
                • 总结一下
                • 流程分析
                  • 同步请求
                    • 异步请求
                      • 获取Response
                        • Interceptor
                        • RealInterceptorChain
                    • 拦截器
                      • client.interceptors
                        • RetryAndFollowUpInterceptor
                          • BridgeInterceptor
                            • CacheInterceptor
                              • ConnectInterceptor
                                • client.networkInterceptors
                                  • CallServerInterceptor
                                  • 总结
                                  • 反思
                                    • 设计模式
                                      • 线程安全
                                        • 数据结构
                                        • 结尾
                                        领券
                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档