保证消息交付至少一次

分享到:

保证消息交付至少一次

保证消息交付至少一次

当客户端获取消息时,会触发memoryMsgChan事件,在这个事件中服务器会把消息放入inFlightMessages中(关键一步)。

github.com/nsqio/nsq/nsqd/protocol_v2.go

 1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
 2    for {
 3        ......
 4    	select {
 5        ......
 6    	case msg := <-memoryMsgChan:
 7            ......
 8            //消息担保的关键一步
 9            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
10            ......
11    	}
12    }
13    ......
14}

github.com/nsqio/nsq/nsqd/channel.go #409

 1func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
 2    now := time.Now()
 3    msg.clientID = clientID
 4    msg.deliveryTS = now
 5    ## 默认 60s
 6    msg.pri = now.Add(timeout).UnixNano()
 7    // 以messageId为下标把数据放到 inFlightMessages内存里
 8    err := c.pushInFlightMessage(msg)
 9    if err != nil {
10    	return err
11    }
12    // 为msg增加一个索引,把消息放入 inFlightPqueue,并以msg.pri 从小到大排序。 
13    c.addToInFlightPQ(msg)
14    return nil
15}

服务端把消息发送给客户端,等待客户端应答:

1:如果客户端返回“FIN”说明客户端已正常接受消息,服务端根据客户端返回的MessageId删除inFlightMessages的数据。

2:如果客户端返回“REQ”说明客户端接收异常,服务器根据返回的MessageId,把数据从inFlightMessages删除,并重新写回memoryMsgChan。

github.com/nsqio/nsq/nsqd/channel.go

 1func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
 2	// remove from inflight first
 3	msg, err := c.popInFlightMessage(clientID, id)
 4	if err != nil {
 5		return err
 6	}
 7	c.removeFromInFlightPQ(msg)
 8	atomic.AddUint64(&c.requeueCount, 1)
 9
10	if timeout == 0 {
11		c.exitMutex.RLock()
12		if c.Exiting() {
13			c.exitMutex.RUnlock()
14			return errors.New("exiting")
15		}
16		err := c.put(msg)
17		c.exitMutex.RUnlock()
18		return err
19	}
20
21	// deferred requeue
22	return c.StartDeferredTimeout(msg, timeout)
23}
24

3:如果因网络问题没有收到客户端应答,queueScanLoop 将开始工作(运行nsqd时启动的服务)。在queueScanLoop中有一个定时事件refreshTicker.C,每5秒执行一次,从inFlightPqueue获取超时消息,并把它放回 channel的 memoryMsgChan里。这条消息将会被重新推送给客户端。

 1func (n *NSQD) queueScanLoop() {
 2    ......
 3    // 5秒
 4    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
 5    
 6    channels := n.channels()
 7    n.resizePool(len(channels), workCh, responseCh, closeCh)
 8
 9    for {
10    	select {
11        ......
12    	case <-refreshTicker.C:
13    		channels = n.channels()
14    		n.resizePool(len(channels), workCh, responseCh, closeCh)
15    		continue
16    	case <-n.exitChan:
17    		goto exit
18    	}
19        // 每5秒最多只处理4个channel
20    	num := n.getOpts().QueueScanSelectionCount
21    	if num > len(channels) {
22    		num = len(channels)
23    	}
24    
25    loop:
26    	for _, i := range util.UniqRands(num, len(channels)) {
27    		workCh <- channels[i]
28    	}
29        ......
30    }
31    ......
32}
33

github.com/nsqio/nsq/nsqd/channel.go #544

 1func (c *Channel) processInFlightQueue(t int64) bool {
 2    ......
 3    dirty := false
 4    for {
 5        c.inFlightMutex.Lock()
 6        // 根据当前时间戳,从inFlightPqueue获取已超时的msg
 7        msg, _ := c.inFlightPQ.PeekAndShift(t)
 8        c.inFlightMutex.Unlock()
 9        
10        if msg == nil {
11        	goto exit
12        }
13        dirty = true
14        // 根据msg.Id从inFlightMessages弹出消息
15        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
16        if err != nil {
17        	goto exit
18        }
19        atomic.AddUint64(&c.timeoutCount, 1)
20        c.RLock()
21        client, ok := c.clients[msg.clientID]
22        c.RUnlock()
23        if ok {
24        	client.TimedOutMessage()
25        }
26        // 将消息重新放入 channel的 memoryMsgChan 里
27        c.put(msg)
28    }
29    ......
30}

在这里可以看出,消息还是不能百分百成功投送,在消息从队列(memoryMsgChan)取出,放入缓存区(inFlightMessages),在这步之后nsq非正常关闭,就会导致该消息的丢失。