这一篇文章是分享http2的文章,前两篇主要是http1的源码实现,前两篇有兴趣的可以再阅读一下。详情可点击:Golang源码深入-Go1.15.6发起http请求流程-1 和 Golang源码深入-Go1.15.6发起http请求流程-2。
目前http2协议在很多的web网站中有应用,http2通过多路复用,二进制流,Header压缩等等技术,极大地提高了性能。
http2的源码核心流程图解如下:
下面我们来分别看下不同的模块的源码:
1 RoundTrip函数
// http2的入口函数
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.RoundTripOpt(req, RoundTripOpt{})
}
2 RoundTripOpt函数,实现处理请求
// 实现函数:RoundTripOpt
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
// 判断是否是https,或者客户端手动设置是允许HTTP
if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
return nil, errors.New("http2: unsupported scheme")
}
// 鉴权scheme和host,返回ip:port的结构
addr := authorityAddr(req.URL.Scheme, req.URL.Host)
// http2协议发送请求有重试机制,这个版本默认是重试6次。只有在获取不到连接或者重试次数6次的时候进行重试。
for retry := 0; ; retry++ {
// 这个是获取连接的方法,从连接池拿或者新建连接。
cc, err := t.connPool().GetClientConn(req, addr)
if err != nil {
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
// 这个是打印traceLog日志的
traceGotConn(req, cc, reused)
// 这个方法是最后处理请求或者响应的方法
res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
if err != nil && retry <= 6 {
// 判断哪些err能重试,并且
if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
// 第一次直接重试,不进行暂停逻辑。
if retry == 0 {
continue
}
// 生成一个随机值来,判断过多久之后进行重试。
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
// 通过select 来监听重试时间和当前请求执行超时,重试是继续,请求超时则直接返回。
select {
case <-time.After(time.Second * time.Duration(backoff)):
continue
case <-req.Context().Done():
return nil, req.Context().Err()
}
}
}
// 这里是重试次数超过了还是报错,则返回错误。
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
return nil, err
}
// 这里是正常请求返回
return res, nil
}
}
3 函数:GetClientConn,获取处理请求连接。
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
return p.getClientConn(req, addr, dialOnMiss)
}
4 函数:getClientConn,实现获取处理请求连接。
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
// 先判断请求是否关闭
if isConnectionCloseRequest(req) && dialOnMiss {
// It gets its own connection.
traceGetConn(req, addr)
const singleUse = true
cc, err := p.t.dialClientConn(addr, singleUse)
if err != nil {
return nil, err
}
return cc, nil
}
// 获取请求加锁,全局锁
p.mu.Lock()
// 循环连接池,key是host+port生成的
for _, cc := range p.conns[addr] {
// 判断当前的连接状态,是否能获取到新请求
if st := cc.idleState(); st.canTakeNewRequest {
if p.shouldTraceGetConn(st) {
traceGetConn(req, addr)
}
// 如果能获取到连接,则解锁,返回连接
p.mu.Unlock()
return cc, nil
}
}
// 这个判断,反正我是没看懂为啥,感觉是多余
if !dialOnMiss {
p.mu.Unlock()
return nil, ErrNoCachedConn
}
traceGetConn(req, addr)
// 这里是第一次获取连接。
call := p.getStartDialLocked(addr)
// 请求解锁
p.mu.Unlock()
<-call.done
return call.res, call.err
}
5 getStartDialLocked 获取开始的监听之后生成的第一个链接。
// requires p.mu is held.
func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
// 判断正在连接,如果存在则返回这个连接dialCall
if call, ok := p.dialing[addr]; ok {
// A dial is already in-flight. Don't start another.
return call
}
// 生成新的连接对象,进行返回
call := &dialCall{p: p, done: make(chan struct{})}
if p.dialing == nil {
p.dialing = make(map[string]*dialCall)
}
p.dialing[addr] = call
// 这里是开启协程去建立新连接,给call对象赋值
go call.dial(addr)
// 直接返回本身的连接对象
return call
}
6 dial 建立新连接
// run in its own goroutine.
func (c *dialCall) dial(addr string) {
const singleUse = false // shared conn
// 建立客户端的tcp连接
c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
close(c.done)
// 加锁
c.p.mu.Lock()
// 这个目前没看懂为什么要删除,目的何在?
delete(c.p.dialing, addr)
if c.err == nil {
// 初始化p.conns和p.keys 并把当前对象连接存进去
c.p.addConnLocked(addr, c.res)
}
c.p.mu.Unlock()
}
7 dialClientConn 建立tcp链接
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
// 获取host
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
// 建立tls安全协议的tcp连接。
tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host))
if err != nil {
return nil, err
}
// 返回建立的新连接
return t.newClientConn(tconn, singleUse)
}
8 newClientConn 建立基于tls的安全tcp协议的http 2的通信
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
}
// 把设置的空闲时间赋值给每个新建的连接,并设置空闲定时器
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
}
// 打印日志
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
}
// 初始化ClientConn相关的结构
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(initialWindowSize))
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)
cc.fr = NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
// TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
// henc in response to SETTINGS frames?
cc.henc = hpack.NewEncoder(&cc.hbuf)
if t.AllowHTTP {
cc.nextStreamID = 3
}
if cs, ok := c.(connectionStater); ok {
state := cs.ConnectionState()
cc.tlsState = &state
}
initialSettings := []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
}
if max := t.maxHeaderListSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
}
cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
return nil, cc.werr
}
// 起一个处理response的协程处理。
go cc.readLoop()
return cc, nil
}
9 readLoop
// readLoop运行在一个一个协程中,处理读数据
func (cc *ClientConn) readLoop() {
// 初始化读的对象
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
// 读对象执行run方法
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(ConnectionError); ok {
cc.wmu.Lock()
cc.fr.WriteGoAway(0, ErrCode(ce), nil)
cc.wmu.Unlock()
}
}
10 clientConnReadLoop.run
func (rl *clientConnReadLoop) run() error {
cc := rl.cc
// 如果禁用了长连接,或者开启单个http请求,则设置空闲开关关闭
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
readIdleTimeout := cc.t.ReadIdleTimeout
var t *time.Timer
if readIdleTimeout != 0 {
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
defer t.Stop()
}
// 死循环读取帧数据,为了尽快读完http2协议和返回的数据
for {
// 这里是读帧数据
f, err := cc.fr.ReadFrame()
// 如果设置了读数据的超时时间,则要等读完数据之后再进行心跳检测包。所以需要重置定时器
if t != nil {
t.Reset(readIdleTimeout)
}
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
// 如果是流数据读取错误,则会重置流数据进行读取
if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
rl.endStreamError(cs, se)
}
continue
// 如果是其他错误,则直接返回错误
} else if err != nil {
return err
}
if VerboseLogs {
cc.vlogf("http2: Transport received %s", summarizeFrame(f))
}
if !gotSettings {
// 协议参数配置错误,则会报PROTOCOL_ERROR
if _, ok := f.(*SettingsFrame); !ok {
cc.logf("protocol error: received %T before a SETTINGS frame", f)
return ConnectionError(ErrCodeProtocol)
}
gotSettings = true
}
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
// 处理http2头部数据流
case *MetaHeadersFrame:
err = rl.processHeaders(f)
maybeIdle = true
gotReply = true
// 处理http数据流
case *DataFrame:
err = rl.processData(f)
maybeIdle = true
// 告诉远端停止在当前连接上创建流
case *GoAwayFrame:
err = rl.processGoAway(f)
maybeIdle = true
// 协议数据被重置RST的流
case *RSTStreamFrame:
err = rl.processResetStream(f)
maybeIdle = true
// 检验点对点的设置参数配置数据流
case *SettingsFrame:
err = rl.processSettings(f)
// 用于初始化服务器流
case *PushPromiseFrame:
err = rl.processPushPromise(f)
// 用于实现流控制
case *WindowUpdateFrame:
err = rl.processWindowUpdate(f)
// 心跳检测流
case *PingFrame:
err = rl.processPing(f)
default:
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
if VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
}
return err
}
// 如果获取完数据,并且这个连接空闲,并且空闲开关是关闭,则会直接关闭当前连接。
if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
}
11 processData:处理http2的数据流函数
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
// 如果当前连接不存在
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
cc.mu.Unlock()
// 如果当前的流数据Id大于等于下一个,则表示结束或者协议有异常
if f.StreamID >= neverSent {
// We never asked for this.
cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
return ConnectionError(ErrCodeProtocol)
}
// We probably did ask for this, but canceled. Just ignore it.
// TODO: be stricter here? only silently ignore things which
// we canceled, but not things which were closed normally
// by the peer? Tough without accumulating too much state.
// But at least return their flow control:
if f.Length > 0 {
cc.mu.Lock()
cc.inflow.add(int32(f.Length))
cc.mu.Unlock()
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(f.Length))
cc.bw.Flush()
cc.wmu.Unlock()
}
return nil
}
if !cs.firstByte {
cc.logf("protocol error: received DATA before a HEADERS frame")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
Code: ErrCodeProtocol,
})
return nil
}
if f.Length > 0 {
// 判断不是头协议
if cs.req.Method == "HEAD" && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
Code: ErrCodeProtocol,
})
return nil
}
// Check connection-level flow control.
// 从数据流读data的数据
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
cs.inflow.take(int32(f.Length))
} else {
cc.mu.Unlock()
return ConnectionError(ErrCodeFlowControl)
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
var refund int
if pad := int(f.Length) - len(data); pad > 0 {
refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
refund += len(data)
}
// 这里是读取到数据之后,更新读取当前的tcp流数据
if refund > 0 {
cc.inflow.add(int32(refund))
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush()
cc.wmu.Unlock()
}
cc.mu.Unlock()
// 这里是读取数据,然后最后读到的数据写到当前连接的read buf中
if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err)
return err
}
}
}
// 判断当前读出流数据是不是结束的帧,如果是则会把最后的响应结果写到cs.resc,
if f.StreamEnded() {
rl.endStream(cs)
}
return nil
}
12 roundTrip函数,处理请求头,和响应的函数
func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
// 检测连接协议头是否有异常
if err := checkConnHeaders(req); err != nil {
return nil, false, err
}
// 停止空闲连接的定时器。
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
// 处理请求头信息
trailers, err := commaSeparatedTrailers(req)
if err != nil {
return nil, false, err
}
hasTrailers := trailers != ""
cc.mu.Lock()
if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
return nil, false, err
}
// 计算请求的长度
body := req.Body
contentLen := actualContentLength(req)
hasBody := contentLen != 0
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
// 请求未压缩,则设置gzip压缩
var requestedGzip bool
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
}
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
// 处理请求头信息encode
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
if err != nil {
cc.mu.Unlock()
return nil, false, err
}
cs := cc.newStream()
cs.req = req
cs.trace = httptrace.ContextClientTrace(req.Context())
cs.requestedGzip = requestedGzip
bodyWriter := cc.t.getBodyWriterState(cs, body)
cs.on100 = bodyWriter.on100
defer func() {
cc.wmu.Lock()
werr := cc.werr
cc.wmu.Unlock()
if werr != nil {
cc.Close()
}
}()
cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
// 发送头部请求
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()
traceWroteHeaders(cs.trace)
cc.mu.Unlock()
if werr != nil {
if hasBody {
req.Body.Close() // per RoundTripper contract
bodyWriter.cancel()
}
cc.forgetStreamID(cs.ID)
// Don't bother sending a RST_STREAM (our write already failed;
// no need to keep writing)
traceWroteRequest(cs.trace, werr)
return nil, false, werr
}
var respHeaderTimer <-chan time.Time
if hasBody {
bodyWriter.scheduleBodyWrite()
} else {
traceWroteRequest(cs.trace, nil)
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
}
readLoopResCh := cs.resc
bodyWritten := false
ctx := req.Context()
handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
// ongoing write, assuming that the server doesn't care
// about our request body. If the server replied with 1xx or
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWrite)
if hasBody && !bodyWritten {
<-bodyWriter.resc
}
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, false, nil
}
// 通过for select 来监听读请求详情结果,之前起的读协程读取response的响应结果会写回readLoopResCh := cs.resc
for {
select {
// 处理响应结果
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
// 响应头超时
case <-respHeaderTimer:
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), errTimeout
// 这个是客户端请求的超时处理
case <-ctx.Done():
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), ctx.Err()
// 请求主动取消
case <-req.Cancel:
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), errRequestCanceled
// 处理报文被重置
case <-cs.peerReset:
// processResetStream already removed the
// stream from the streams map; no need for
// forgetStreamID.
return nil, cs.getStartedWrite(), cs.resetErr
// body写入的状态监听
case err := <-bodyWriter.resc:
bodyWritten = true
// Prefer the read loop's response, if available. Issue 16102.
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
default:
}
if err != nil {
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), err
}
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
}
}
}
1、http2的帧,FrameHeader基础协议是9个字节,结构如下
type FrameHeader struct {
//标志位,默认是1个位
valid bool
// 帧类型,默认是1个字节,8位
Type FrameType
// 帧类型标记,默认1个字节 8位
Flags Flags
// 帧的长度 默认三个字节,24位,最大表示16MB
Length uint32
// 帧的唯一ID,标识符,0x00值被保留给与连接相关联的帧作为一个整体,而不是单独的流。是31位
StreamID uint32
}
2、http2的数据协议帧结构如下
type DataFrame struct {
FrameHeader
data []byte
}
3、http2的头部协议实现结构是HeadersFrame,这个主要是实现http2的协议响应
4、关于如何通过本地实现http2的服务实现,并且通过代码debug来查看http2的请求过程,笔者也是通过先实现功能,然后通过debug,单步调试查看源码的执行流程,我这里整理一份代码,有兴趣可以参考同步发送的上一篇文章。
总结
1:http2其实是tcp的长连接,并在tcp协议上封装了一层http2协议。
2:http2的FrameHeader的数据结构的定义解决了tcp的粘包问题。
3:http2默认底层实现重试为6次
4:http2通过多路复用,二进制流,Header压缩等等技术,极大地提高了性能。
参考文献
https://httpwg.org/http2-spec/draft-ietf-httpbis-http2bis.html#name-data
注意:
1、笔者本着严谨的态度,流程中的很多细节并未详细提及或讲述,请读者酌情参考。