上一篇文章我们讲到go client的大概实现的大概思路,整理了相关client.go的核心源码,详情请翻阅:Golang源码深入-Go1.15.6发起http请求流程-1。笔者这一篇分享一下transport.go相关核心的代码,整理相关核心的技术点,希望读者多交流学习。
1、go client的整理流程,主要函数调用和流程如下:NewRequestWithContext->client.Do->client.do->client.send->send->rt.RoundTrip->Transport.roundTrip->Transport.getConn->Transport.queueForDial->Transport.dialConnFor->Transport.dialConn->Transport.readLoop()/Transport.writeLoop()->persistConn.roundTrip。 2、http.Client对象保存着Transport连接对象,Transport里面是一个最核心的是tcp连接池,连接池是处理http的请求,相对一个服务来说是全局的。在不同的函数中实例化这个对象处理不同的请求,在不重写Transport对象时,一个服务的连接都是默认复用。为什么是复用呢?是因为transport有个全局变量DefaultTransport,默认都是使用DefaultTransport这个全局对象。 3、http.NewRequest针对于每个请求都是独立的,每个请求request都是从http.Client里面获取连接,每个请求request都开启一个写协程处理发送请求,一个读协程处理响应请求,这个request本身则调用roundTrip函数启动for select 来监听读协程的结果,到此则请求完成。
下面我们来看核心模块代码翻译:
1 Transport.RoundTrip实现RoundTripper的方法
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}
2 Transport.roundTrip是主入口
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
// 下面判断request首部的有效性
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
origReq := req
cancelKey := cancelKey{origReq}
req = setupRewindBody(req)
if altRT := t.alternateRoundTripper(req); altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
var err error
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
if !isHTTP {
req.closeBody()
return nil, badStringError("unsupported protocol scheme", scheme)
}
if req.Method != "" && !validMethod(req.Method) {
req.closeBody()
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
// 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled) 的情况是不会进行重试的。具体参见shouldRetryRequest函数
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,使用其自注册的RoundTrip处理,从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
if err != nil {
// 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
// pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。通过writeLoop发送request,通过readLoop返回response
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
// Failed. Clean up and determine whether to retry.
if http2isNoCachedConnError(err) {
if t.removeIdleConn(pconn) {
t.decConnsPerHost(pconn.cacheKey)
}
} else if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done.
if e, ok := err.(transportReadFromServerError); ok {
err = e.err
}
return nil, err
}
testHookRoundTripRetried()
// 用于重定向场景
req, err = rewindBody(req)
if err != nil {
return nil, err
}
}
}
3 getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接。
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
// 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
// 排队等待获取连接
t.queueForDial(w)
// 通过select监听获取连接完成或者取消
select {
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
// If the request has been cancelled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
4 排队等待新建连接
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
// 如果没有限制最大连接数,直接建立连接
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
// 如果没超过连接数限制,直接建立连接
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
// 排队等待连接建立
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
5 调用t.dialConn获取一个真正的*persistConn
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
// 执行新建连接,拨号功能,如果新建连接成功,则添加当前连接到连接池
pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
// 如果建立连接或者获取连接失败,则删除连接池中的连接。
if err != nil {
t.decConnsPerHost(w.key)
}
}
6 dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
// 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
var err error
pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
// 如果连接类型是TLS的,则需要处理TLS协商
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
// 启动TLS协商,如果协商失败需要 关闭连接
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
}
} else {
// 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr()
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// 处理proxy的情况
switch {
case cm.proxyURL == nil:
// Do nothing. Not using a proxy.
case cm.proxyURL.Scheme == "socks5":
conn := pconn.conn
d := socksNewDialer("tcp", conn.RemoteAddr().String())
if u := cm.proxyURL.User; u != nil {
auth := &socksUsernamePassword{
Username: u.Username(),
}
auth.Password, _ = u.Password()
d.AuthMethods = []socksAuthMethod{
socksAuthMethodNotRequired,
socksAuthMethodUsernamePassword,
}
d.Authenticate = auth.Authenticate
}
if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
conn.Close()
return nil, err
}
case cm.targetScheme == "http":
pconn.isProxy = true
if pa := cm.proxyAuth(); pa != "" {
pconn.mutateHeaderFunc = func(h Header) {
h.Set("Proxy-Authorization", pa)
}
}
case cm.targetScheme == "https":
conn := pconn.conn
hdr := t.ProxyConnectHeader
if hdr == nil {
hdr = make(Header)
}
if pa := cm.proxyAuth(); pa != "" {
hdr = hdr.Clone()
hdr.Set("Proxy-Authorization", pa)
}
connectReq := &Request{
Method: "CONNECT",
URL: &url.URL{Opaque: cm.targetAddr},
Host: cm.targetAddr,
Header: hdr,
}
// If there's no done channel (no deadline or cancellation
// from the caller possible), at least set some (long)
// timeout here. This will make sure we don't block forever
// and leak a goroutine if the connection stops replying
// after the TCP connect.
connectCtx := ctx
if ctx.Done() == nil {
newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
connectCtx = newCtx
}
didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
var (
resp *Response
err error // write or read error
)
// Write the CONNECT request & read the response.
go func() {
defer close(didReadResponse)
err = connectReq.Write(conn)
if err != nil {
return
}
// Okay to use and discard buffered reader here, because
// TLS server will not speak until spoken to.
br := bufio.NewReader(conn)
resp, err = ReadResponse(br, connectReq)
}()
select {
case <-connectCtx.Done():
conn.Close()
<-didReadResponse
return nil, connectCtx.Err()
case <-didReadResponse:
// resp or err now set
}
if err != nil {
conn.Close()
return nil, err
}
if resp.StatusCode != 200 {
f := strings.SplitN(resp.Status, " ", 2)
conn.Close()
if len(f) < 2 {
return nil, errors.New("unknown status code")
}
return nil, errors.New(f[1])
}
}
if cm.proxyURL != nil && cm.targetScheme == "https" {
if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
return nil, err
}
}
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(http2erringRoundTripper); ok {
// pconn.conn was closed by next (http2configureTransport.upgradeFn).
return nil, e.err
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
// 处理请求response
go pconn.readLoop()
// 开启协程处理请求
go pconn.writeLoop()
return pconn, nil
}
7 readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。
func (pc *persistConn) readLoop() {
// 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个处理,任何一个loop退出(协议升级除外)则该连接不可用,readLoop跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
// 尝试将连接放回连接池
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// 变量主要用于阻塞调用者协程读取EOF的resp.body,直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,便于连接快速重用
eofc := make(chan struct{})
defer close(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true
for alive {
// 获取允许的response首部的最大字节数
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
pc.mu.Lock()
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
// 如果有response数据,则读取并解析为Response格式
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
// 可能的错误如server端关闭,发送EOF
err = transportReadFromServerError{err}
closeErr = err
}
if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effectively no limit for response bodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
bodyWritable := resp.bodyIsWritable()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
if !hasBody || bodyWritable {
pc.t.setReqCanceler(rc.cancelKey, nil)
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
if bodyWritable {
closeErr = errCallerOwnsConn
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
// 返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
// 此处与处理不带resp.body的场景相同
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select {
case bodyEOF := <-waitForBodyRead:
// 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。
pc.t.setReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
// 释放阻塞的读操作
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}
8 writeLoop用于发送request请求
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
// writeLoop会阻塞等待两个IO case 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;如果底层连接关闭,则退出writeLoop
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
// Errors reading from the user's
// Request.Body are high priority.
// Set it here before sending on the
// channels below or calling
// pc.close() which tears town
// connections and causes other
// errors.
wr.req.setError(err)
}
// 请求失败时,需要关闭request和底层连接
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
// 将结果发送给readLoop的pc.wroteRequest()函数处理
pc.writeErrCh <- err
// 将结果返回给roundTrip处理,防止响应超时
wr.ch <- err
// 如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,同时会导致readLoop退出
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
}
}
9 一个roundTrip用于处理一个request,通过for select来监听结果。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
// 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消, 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.mu.Lock()
// 与readLoop配合使用,表示期望的响应的个数
pc.numExpectedResponses++
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()
if headerFn != nil {
headerFn(req.extraHeaders())
}
// Ask for a compressed version if the caller didn't set their
// own value for Accept-Encoding. We only attempt to
// uncompress the gzip stream if we were the layer that
// requested it.
requestedGzip := false
// 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且仅在调用者没有设置这些首部时设置
if !pc.t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// https://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
req.extraHeaders().Set("Accept-Encoding", "gzip")
}
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
// HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
if pc.t.DisableKeepAlives && !req.wantsClose() {
req.extraHeaders().Set("Connection", "close")
}
// 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
gone := make(chan struct{})
defer close(gone)
defer func() {
if err != nil {
pc.t.setReqCanceler(req.cancelKey, nil)
}
}()
const debugRoundTrip = false
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
startBytesWritten := pc.nwrite
// 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
// 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse, 退出roundtrip函数
for {
testHookWaitResLoop()
select {
// writeLoop返回发送request后的结果
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
// 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
case <-pc.closech:
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
// 等待获取response超时,关闭连接
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
// 接收到readLoop返回的response结果
case re := <-resc:
// 极异常情况,直接程序panic
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
// 到这里是最终的成功返回的结果。
return re.res, nil
// request取消
case <-cancelChan:
pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
// 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
cancelChan = nil
case <-ctxDoneChan:
pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
关于源码就分享到这里。
主要核心流程以及功能梳理图解如下
总结
( 1 )go发起http1.1请求,遇到不关心的请求,请务必完整读取响应内容以保障连接复用性。
( 2 )如果在http.client 中没有设置transport熟悉,则会使用文章开头说的DefaultTransport,这里设置的默认最大空闲连接数MaxIdleConns,每个host最大空闲连接数MaxIdleConnsPerHost是2,每个host的最大连接数MaxConnsPerHost是0。在大量并发情况下,默认配置会造成很多链接,进而性能急剧下降。如果需要控制合适的连接数,就需要使用自定义的client和transport。配置方式如下:
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 100
t.MaxConnsPerHost = 100
t.MaxIdleConnsPerHost = 100
httpClient = &http.Client{
Timeout: 10 * time.Second,
Transport: t,
}
( 3 )http1.1线头阻塞:http中一个连接上的请求,需要等这个请求处理完了才能继续下一个请求。
参照文献
https://www.jb51.net/article/193675.htm
https://www.cnblogs.com/charlieroro/p/11409153.html