保证消息交付至少一次
保证消息交付至少一次
保证消息交付至少一次
当客户端获取消息时,会触发memoryMsgChan事件,在这个事件中服务器会把消息放入inFlightMessages中(关键一步)。
github.com/nsqio/nsq/nsqd/protocol_v2.go
1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
2 for {
3 ......
4 select {
5 ......
6 case msg := <-memoryMsgChan:
7 ......
8 //消息担保的关键一步
9 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
10 ......
11 }
12 }
13 ......
14}
github.com/nsqio/nsq/nsqd/channel.go #409
1func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
2 now := time.Now()
3 msg.clientID = clientID
4 msg.deliveryTS = now
5 ## 默认 60s
6 msg.pri = now.Add(timeout).UnixNano()
7 // 以messageId为下标把数据放到 inFlightMessages内存里
8 err := c.pushInFlightMessage(msg)
9 if err != nil {
10 return err
11 }
12 // 为msg增加一个索引,把消息放入 inFlightPqueue,并以msg.pri 从小到大排序。
13 c.addToInFlightPQ(msg)
14 return nil
15}
服务端把消息发送给客户端,等待客户端应答:
1:如果客户端返回“FIN”说明客户端已正常接受消息,服务端根据客户端返回的MessageId删除inFlightMessages的数据。
2:如果客户端返回“REQ”说明客户端接收异常,服务器根据返回的MessageId,把数据从inFlightMessages删除,并重新写回memoryMsgChan。
github.com/nsqio/nsq/nsqd/channel.go
1func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
2 // remove from inflight first
3 msg, err := c.popInFlightMessage(clientID, id)
4 if err != nil {
5 return err
6 }
7 c.removeFromInFlightPQ(msg)
8 atomic.AddUint64(&c.requeueCount, 1)
9
10 if timeout == 0 {
11 c.exitMutex.RLock()
12 if c.Exiting() {
13 c.exitMutex.RUnlock()
14 return errors.New("exiting")
15 }
16 err := c.put(msg)
17 c.exitMutex.RUnlock()
18 return err
19 }
20
21 // deferred requeue
22 return c.StartDeferredTimeout(msg, timeout)
23}
24
3:如果因网络问题没有收到客户端应答,queueScanLoop 将开始工作(运行nsqd时启动的服务)。在queueScanLoop中有一个定时事件refreshTicker.C,每5秒执行一次,从inFlightPqueue获取超时消息,并把它放回 channel的 memoryMsgChan里。这条消息将会被重新推送给客户端。
1func (n *NSQD) queueScanLoop() {
2 ......
3 // 5秒
4 refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
5
6 channels := n.channels()
7 n.resizePool(len(channels), workCh, responseCh, closeCh)
8
9 for {
10 select {
11 ......
12 case <-refreshTicker.C:
13 channels = n.channels()
14 n.resizePool(len(channels), workCh, responseCh, closeCh)
15 continue
16 case <-n.exitChan:
17 goto exit
18 }
19 // 每5秒最多只处理4个channel
20 num := n.getOpts().QueueScanSelectionCount
21 if num > len(channels) {
22 num = len(channels)
23 }
24
25 loop:
26 for _, i := range util.UniqRands(num, len(channels)) {
27 workCh <- channels[i]
28 }
29 ......
30 }
31 ......
32}
33
github.com/nsqio/nsq/nsqd/channel.go #544
1func (c *Channel) processInFlightQueue(t int64) bool {
2 ......
3 dirty := false
4 for {
5 c.inFlightMutex.Lock()
6 // 根据当前时间戳,从inFlightPqueue获取已超时的msg
7 msg, _ := c.inFlightPQ.PeekAndShift(t)
8 c.inFlightMutex.Unlock()
9
10 if msg == nil {
11 goto exit
12 }
13 dirty = true
14 // 根据msg.Id从inFlightMessages弹出消息
15 _, err := c.popInFlightMessage(msg.clientID, msg.ID)
16 if err != nil {
17 goto exit
18 }
19 atomic.AddUint64(&c.timeoutCount, 1)
20 c.RLock()
21 client, ok := c.clients[msg.clientID]
22 c.RUnlock()
23 if ok {
24 client.TimedOutMessage()
25 }
26 // 将消息重新放入 channel的 memoryMsgChan 里
27 c.put(msg)
28 }
29 ......
30}