声明

分享到:

声明

image

声明

  • tchannel-go版本为v1.12.0
  • 阅读本篇文章需要go语言,HTTP2——多路复用基础

前言

    UBER的RPC框架TChannel有一个闪亮点————多路复用。对于多路复用是如何实现一直都很好奇,所以抽了点时间看了TChannel多路复用的实现源码,并整理成这篇文章。文章主要从客户端【发起请求】到服务端【响应请求】一条完整请求来看多路复用整个生命周期的实现。

客户端发起调用

客户端调用我们把这个过程分成4个步骤:

 1. 出站握手
 
 2. 复用链接
 
 3. 消息交换
 
 4. 有序写入——发起请求

出站握手

 1github.com/uber/tchannel-go/preinit_connection.go #35
 2func (ch *Channel) outboundHandshake(ctx context.Context, c net.Conn, outboundHP string, events connectionEvents) (_ *Connection, err error) {
 3  ......
 4  msg := &initReq{initMessage: ch.getInitMessage(ctx, 1)}
 5  if err := ch.writeMessage(c, msg); err != nil {
 6    return nil, err
 7  }
 8  ......
 9  res := &initRes{}
10  id, err := ch.readMessage(c, res)
11  if err != nil {
12    return nil, err
13  }
14  ......
15
16  return ch.newConnection(c, 1 /* initialID */, outboundHP, remotePeer, remotePeerAddress, events), nil
17}

    在开始请求前,TChannel有一次握手,这次握手不是TCP/IP的三次握手,是为了确认服务端能够正常响应。 如果服务端能够正常响应,则这条TCP链接将会被复用。

 1func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo,
 2	remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
 3  ......
 4  connID := _nextConnID.Inc()
 5  ......
 6  c := &Connection{
 7    channelConnectionCommon: ch.channelConnectionCommon,
 8
 9    connID:             connID,
10    conn:               conn,
11    opts:               opts,
12    state:              connectionActive,
13    sendCh:             make(chan *Frame, opts.SendBufferSize),
14    ......
15    inbound:            newMessageExchangeSet(log, messageExchangeSetInbound),
16    outbound:           newMessageExchangeSet(log, messageExchangeSetOutbound),
17    ......
18  }
19
20  ......
21  // Connections are activated as soon as they are created.
22  c.callOnActive()
23  go c.readFrames(connID)
24  go c.writeFrames(connID)
25  return c
26}

    当握手成功,这条链接随后会被放入Peer,以备其他请求使用。同时会启动2个协程,“readFrames” 用于读取服务端的响应,“writeFrames”把数据写入TCP链接里面,关于这2个协程的作用下面会详细介绍。

复用链接

 1github.com/uber/tchannel-go/peer.go #361
 2func (p *Peer) getActiveConnLocked() (*Connection, bool) {
 3  allConns := len(p.inboundConnections) + len(p.outboundConnections)
 4  if allConns == 0 {
 5    return nil, false
 6  }
 7
 8  // We cycle through the connection list, starting at a random point
 9  // to avoid always choosing the same connection.
10  startOffset := peerRng.Intn(allConns)
11  for i := 0; i < allConns; i++ {
12    connIndex := (i + startOffset) % allConns
13    if conn := p.getConn(connIndex); conn.IsActive() {
14      return conn, true
15    }
16  }
17
18  return nil, false
19}

    复用链接是多路复用很关键的一步,和HTTP的复用不同,HTTP链接需要响应成功后才能被复用,而多路复用链接只要被创建了就能被复用。

消息交换 —— 无序响应

 1github.com/uber/tchannel-go/mex.go #306
 2func (mexset *messageExchangeSet) newExchange(ctx context.Context, framePool FramePool,
 3	msgType messageType, msgID uint32, bufferSize int) (*messageExchange, error) {
 4  ......
 5  mex := &messageExchange{
 6    msgType:   msgType,
 7    msgID:     msgID,
 8    ctx:       ctx,
 9    //请求会等待Frame的写入
10    recvCh:    make(chan *Frame, bufferSize),
11    errCh:     newErrNotifier(),
12    mexset:    mexset,
13    framePool: framePool,
14  }
15
16  mexset.Lock()
17  //保存messageExchange
18  addErr := mexset.addExchange(mex)
19  mexset.Unlock()
20  ......
21  mexset.onAdded()
22
23  ......
24  return mex, nil
25}

    在客户端发起多个请求的时候,由于只有一个TCP链接,如何知道哪个响应是对应哪个请求?为了能够正确响应,TChannel使用了MessageExchange,一个请求对应一个MessageExchange。客户端会以stream id 为下标索引,保存所有的MessageExchange。当有一个请求时,它会阻塞在MessageExchange.recvCh, 响应回来会根据响应的stream id获取对应的MessageExchange, 并把帧放到 MessageExchange.recvCh 从而实现无序响应。

有序写入——发起请求

先写入队列

 1github.com/uber/tchannel-go/reqres.go #139
 2func (w *reqResWriter) flushFragment(fragment *writableFragment) error {
 3  ......
 4  frame := fragment.frame.(*Frame)
 5  ......
 6  select {
 7  ......
 8  case w.conn.sendCh <- frame:
 9    return nil
10  }
11}

获取队列数据,写入TCP链接

 1github.com/uber/tchannel-go/connection.go #706
 2func (c *Connection) writeFrames(_ uint32) {
 3  for {
 4    select {
 5    case f := <-c.sendCh:
 6      ......
 7      err := f.WriteOut(c.conn)
 8      ......
 9    }
10  }
11}

    在多路复用中,只有一条TCP链接,为了避免客户端同时写入链接里,TChannel先把帧写入队列“sendCh”,再使用一个消费者获取队列数据,然后有序写入链接里面。

帧结构

 1github.com/uber/tchannel-go/frame.go #107
 2// A Frame is a header and payload
 3type Frame struct {
 4	buffer       []byte // full buffer, including payload and header
 5	headerBuffer []byte // slice referencing just the header
 6
 7	// The header for the frame
 8	Header FrameHeader
 9
10	// The payload for the frame
11	Payload []byte
12}
13
14// FrameHeader is the header for a frame, containing the MessageType and size
15type FrameHeader struct {
16	// The size of the frame including the header
17	size uint16
18
19	// The type of message represented by the frame
20	messageType messageType
21
22	// Left empty
23	reserved1 byte
24
25	// The id of the message represented by the frame
26	ID uint32 //指Stream ID
27
28	// Left empty
29	reserved [8]byte
30}

    帧被分为2部分,一部分是Header Frame(只有16字节);另一部分是Data Frame。这2部分数据按照一定格式标准转成二进制数据进行传输。

服务端响应

服务端响应我们把这个过程分成3个步骤:

 1. 入站握手
 
 2. 读取请求数据
 
 3. 有序写入——响应结果

入站握手

 1github.com/uber/tchannel-go/preinit_connection.go #69
 2func (ch *Channel) inboundHandshake(ctx context.Context, c net.Conn, events connectionEvents) (_ *Connection, err error) {
 3  id := uint32(math.MaxUint32)
 4  ......
 5  req := &initReq{}
 6  id, err = ch.readMessage(c, req)
 7  if err != nil {
 8    return nil, err
 9  }
10  ......
11  res := &initRes{initMessage: ch.getInitMessage(ctx, id)}
12  if err := ch.writeMessage(c, res); err != nil {
13    return nil, err
14  }
15  return ch.newConnection(c, 0 /* initialID */, "" /* outboundHP */, remotePeer, remotePeerAddress, events), nil
16}

    入站握手是对客户端出站握手的响应,当握手成功,服务端这边也会调用newConnection,启动“readFrames” 和 “writeFrames”协程,等待客户端请求。

读取请求数据

 1github.com/uber/tchannel-go/connection.go #615
 2func (c *Connection) readFrames(_ uint32) {
 3  headerBuf := make([]byte, FrameHeaderSize)
 4  ......
 5  for {
 6    ......
 7    //先读头部
 8    if _, err := io.ReadFull(c.conn, headerBuf); err != nil {
 9      handleErr(err)
10      return
11    }
12    frame := c.opts.FramePool.Get()
13
14    if err := frame.ReadBody(headerBuf, c.conn); err != nil {
15      handleErr(err)
16      c.opts.FramePool.Release(frame)
17      return
18    }
19    //handle  frame
20    ......
21  }
22}
23

    在服务端会监听握手成功的链接,如果客户端发送了请求,就会读取链接里面的数据。读取分2步:

  • 先读取Header Frame(16字节)

    Header Frame 的长度固定为16字节,这里面有stream Id 和 Data Frame的长度

  • 再读取Data Frame

    从Header Frame获取到 Data Frame的长度后,根据长度从链接读取指定的字节长度,就获取到正确的Data Frame。

有序写入——响应结果

    服务端的有序写入和客户端的有序写入是一样的功能,只是所处的角色不一样,这里不再重复。

客户端获取响应结果

客户端获取响应结果我们把这个过程分成2个步骤:

 1. 读取响应结果
 
 2. 找到MessageExchange响应

读取响应结果

    客户端获取响应结果和服务端的读取请求数据也是相同的功能,这里不再重复。

找到MessageExchange响应

 1github.com/uber/tchannel-go/mex.go #429
 2func (mexset *messageExchangeSet) forwardPeerFrame(frame *Frame) error {
 3  ......
 4  mexset.RLock()
 5  mex := mexset.exchanges[frame.Header.ID]
 6  mexset.RUnlock()
 7  ......
 8  //把帧交给MessageExchange.recvCh
 9  if err := mex.forwardPeerFrame(frame); err != nil {
10    ......
11    return err
12  }
13
14  return nil
15}

    在客户端发起调用时介绍过,它会阻塞在MessageExchange.recvCh,当响应回来时会根据stream Id(上面的frame.Header.ID) 找到对应的MessageExchange,并把frame放入recvCh,完成响应。这一步就体现在上面的代码。

结语

    至此UBER的RPC框架TChannel————多路复用介绍完,感谢UBER团队的贡献,让我收益很多。