OkHttp3源码分析二

1.前言

  • Android开发过程中,使用第三方的框架库已成家常便饭,使用第三方好处避免重复造轮子、降低成本、提升效率、降低风险等等,当遇到框架库不能满足现有业务、框架库设计缺陷或者漏洞、API使用深度不够时,如果停留只会使用层面,就会增加修改过程的难度,所以对于使用的框架库最好还是有个系统的认识;

  • 本文带大家深入讲解 OkHttp


2.目录


3.定义

  • 一款处理网络请求的开源项目,由Square公司贡献。

4.作用

  • 通过建造者模式(Builder Pattern)方式,完成复杂的网络请求。

5.特点

  • 1.同时支持HTTP1.1与支持HTTP2.0
  • 2.同时支持同步与异步请求;
  • 3.同时具备HTTP与WebSocket功能;
  • 4.拥有自动维护的socket连接池,减少握手次数;
  • 5.拥有队列线程池,轻松写并发;
  • 6.拥有Interceptors(拦截器),轻松处理请求与响应额外需求(例:请求失败重试、响应内容重定向等等);

6.OkHttp系统图


7.OkHttpClient(封装请求参数)

  • OkHttpClient
    通过建造者模式方式,完成请求参数配置。常用如下:

  • connectTimeout :连接超时

    • readTimeout:读取超时
    • writeTimeout:写入超时
    • pingInterval:websocket情况下连接心跳间隔
    • interceptors:自定义拦截器
    • networkInterceptors:自定义网络连接成功的拦截器
  • OkHttpClient 除了完成请求参数的配置之外,还提供获取WebSocket、Call(Call实现类为RealCall,下文会介绍)相关类;

7.1 WebSocket

  • WebSocket是一种在单个TCP连接上进行全双工通信的协议,支持服务器想客户端的发送请求,由OkHttpClient创建,源码如下:
1
2
3
4
5
6
7
8
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);// 1
webSocket.connect(this);
return webSocket;
}

注释1:WebSocket是一个接口,它的实现类RealWebSocket,该类完成WebSocket的连接、数据请求与接收功能。

7.2 Call初始化

1
2
3
4
5
6
7
8
9
10
11
//OkHttpClient 初始化Call的函数
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
// RealCall 初始化函数
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}

8.RealCall同步异步

  • RealCall 是真正触发网络请求的类(实现Call接口,一次请求 = 一个RealCall实例),它提供了同步请求、异步请求

8.1 同步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  @Override public Response execute() throws IOException {
// 处理不能重复请求,因为一个RealCall对应一个请求。
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);//1
Response result = getResponseWithInterceptorChain();//2
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());//3
return chain.proceed(originalRequest);//4
}

注释1:将RealCall实例添加至Dispatcher中(下文会介绍Dispatcher)。
注释2:通过getResponseWithInterceptorChain()获取响应。
注释3:通过封装好的拦截器集合,获取第一个拦截器的任务。
注释4:触发第一个拦截器的任务,该任务就触发一下拦截器的任务,以此类推,原理(Android事件传递机制)如下图:

8.2 异步请求

1
2
3
4
5
6
7
8
9
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));//1
}

注释1:把AsyncCall请求对象传递进Dispatcher线程池管理;

  • AsyncCall 将请求业务放入到Runnable中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final class AsyncCall extends NamedRunnable {
// ......
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//2
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

注释2:通过getResponseWithInterceptorChain()获取响应;


9.Dispatcher线程池

  • Dispatcher 管理网络请求的线程池,其实就是把同步(RealCall)与异步(AsyncCall)的请求放进集合中统一管理,然后通过线程池执行AsyncCall的请求。

9.1 Dispatcher中同步(RealCall)

  • RealCall在Dispatcher中,其实主要就是一个存储功能(即用一个集合把RealCall的请求进行存储)。

9.2 Dispatcher中异步(AsyncCall)

  • AsyncCall在Dispatcher中,除了使用集合存储AsyncCall的请求,Dispatcher还初始化了一个线程池(ThreadPoolExecutor)处理AsyncCall的网络请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public final class Dispatcher {
//最大请求数量
private int maxRequests = 64;
//相同host的最大请求数据
private int maxRequestsPerHost = 5;
// ......
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));//1
}
return executorService;
}
// ......
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//2
runningAsyncCalls.add(call);//3
executorService().execute(call);//4
} else {
readyAsyncCalls.add(call);//5
}
}
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);//6
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();//7
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
// ......
}

注释1:获取自定义线程池;
注释2:判断正在执行的异步请求数量与请求集合中相同host的数量是否满足,如果满足就添加到执行中的集合中,并添加至线程池中执行请求;如果不满足就添加至待执行请求的集合中,等待执行中的请求完成之后,再执行相同host数量判断满足才添加至线程池中执行请求;
注释3:将请求对象AsyncCall添加进请求执行的集合中;
注释4:将请求对象AsyncCall添加进线程池中执行;
注释5:当不满足执行条件时(注释2),把请求对象添加至待执行的集合中;
注释6:每当一个请求执行完毕时,就会调用finished()去掉对应集合中的存储对象,并在次判断待执行的集合中是否有满足条件的请求,若满足就添加至执行的集合与线程池中执行,若不满足继续等待下一个请求完成再次判断。
注释7:判断待执行的集合中是否满足可执行的对象。


10.Interceptor拦截器及调用链

  • Interceptor 拦截器,供使用者可在请求过程或者响应过程中自定义额外的业务处理(例如:最常见的请求失败重试、响应数据的重定向等等)。
  • OkHttp3中,除了可自定义额外的拦截器之外,它内部也存储一些固定的拦截器处理其内部业务逻辑,下面就会介绍它们(RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor);

10.1 RetryAndFollowUpInterceptor

  • 定义:重定向拦截器;
  • 作用:在无法请求服务器或者请求失败时,服务器会告诉客户端可以处理请求的url,然后重定向拦截器承当重新请求新url的作用(服务器返回3XX错误码为重定向,可以通过响应头的Location获取新请求的url);

10.2 BridgeInterceptor

  • 定义:桥拦截器;
  • 作用:封装请求头(Content-Type、Connection、Cookie…)与响应头(“Content-Encoding…)的信息。

10.3 CacheInterceptor

  • 定义:缓存拦截器;
  • 作用:为网络请求提供缓存功能,加快相同请求的访问速度,减少资源损耗。

10.4 ConnectInterceptor

  • 定义:连接拦截器;
  • 作用:与服务器建立通讯连接。

10.5 CallServerInterceptor

  • 定义:请求服务器拦截器;
  • 作用:与服务器进行数据通讯(包含请求头、请求内容)。

10.6 调用链

  • 上文也提到任务链结构图(责任链模式):

  • 其实它的原理类似于(Android 事件传递机制),向下传递请求,向上反馈响应,在调用RealInterceptorChain的proceed()时,创建下一个拦截器的任务,并通过拦截器中intercept()把任务传递至当前拦截器进行关联,然后以此类推,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class RealInterceptorChain implements Interceptor.Chain {
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
// ......
// 创建下一个拦截器的任务
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取当前任务中的拦截器
Interceptor interceptor = interceptors.get(index);
// 将下一个任务传递至当前拦截器中进行关联,并在拦截器中传入的任务触发执行下一个拦截器
Response response = interceptor.intercept(next);
// ......
return response;
}
}

11.缓存机制CacheInterceptor

  • 看看缓存机制CacheInterceptor的实现原理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public final class CacheInterceptor implements Interceptor {
// ....
@Override public Response intercept(Chain chain) throws IOException {
// 根据请求内容通过Cache类判断是否已经存在响应的缓存信息
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
// 通过请求对象与缓存对象获取缓存策略,根据请求头的内容(Date、Expires、Last-Modified....)制定缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
// 记录请求过程的相关数据(请求次数、缓存次数....)
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// 当请求头中包含only-if-cached时,networkRequest 与 cacheResponse 都为空,表示不进行网络请求
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 不进行网络请求,并请求头包含缓存标识时,构建缓存内容.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 执行下一个拦截器任务
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 如果本地有缓存,并且响应结果为没有修改时,直接从本地缓存获取相关信息数据
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// 更新响应对象至缓存中
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
// 没有响应缓存时,封装请求返回的响应并添加至本地缓存中
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// 添加本地缓存中
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
// 去掉(POST\PUT\DELETE\MOVE\PATCH)请求方法的本地缓存
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
}

12.连接与请求(StreamAllocation,RealConnection,HttpCodec)

  • OkHttp3把网络连接、请求数据通讯过程封装StreamAllocation,RealConnection,HttpCodec中;
  • StreamAllocation:负责初始化RealConnection、HttpCodec,并将前2者与RealCall进行关联;
    1.StreamAllocation初始化(在RetryAndFollowUpInterceptor进行实例化)
1
2
3
4
5
6
7
8
9
10
public final class RetryAndFollowUpInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//......
// 实例化
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//......
}
}

2.RealConnection与HttpCodec初始化(RealConnection在ConnectInterceptor中通过StreamAllocation的newStream()初始化,而HttpCodec在RealConnection中被初始化)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final class ConnectInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 获取HttpCodec 对象
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
public final class StreamAllocation {
//......
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// 初始化RealConnection,最后会调用RealConnection的connect函数建立网络连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
// 通过RealConnection 实例化HttpCpdec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
}
public final class RealConnection extends Http2Connection.Listener implements Connection {
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
//判断HTTP是否为Http2,如果是实例化Http2Codec,否则实例化Http1Codec
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
}
  • RealConnection:真正的负责完成网络连接;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public final class RealConnection extends Http2Connection.Listener implements Connection {
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
} else {
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
throw new RouteException(new UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
}
}
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
// 最后会调用socket.connect()进行网络连接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
}
  • HttpCodec:

    负责完成发送请求头与数据内容(使用okio完成数据的写入与读出);

    • Http1Codec:处理HTTP1.1协议的数据传递。
    • Http2Codec:处理HTTP2.0协议的数据传递。

13.实例

  • 编写一个简单GET请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void get(){
OkHttpClient okHttpClient = new OkHttpClient()
.newBuilder()
// .addInterceptor()
.readTimeout(30, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
// .header()
.url("http://www.baidu.com")
.build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
}

14.总结

  • 到此,Android OkHttp3就讲解完毕。
坚持原创技术分享,您的支持将鼓励我继续创作!