介绍nsq时常见的图。通过源码解剖它的实现。

分享到:

介绍nsq时常见的图。通过源码解剖它的实现。

image

介绍nsq时常见的图。通过源码解剖它的实现。

messagePump 源码

github.com/nsqio/nsq/nsqd/topic.go #215

 1 func (t *Topic) messagePump() {
 2    ......
 3    for {
 4    	select {
 5    	case msg = <-memoryMsgChan:
 6    ......
 7    	case <-t.exitChan:
 8    		goto exit
 9    	}
10
11        for i, channel := range chans {
12            chanMsg := msg
13        
14            // copy the message because each channel
15            // needs a unique instance but...
16            // fastpath to avoid copy if its the first channel
17            // (the topic already created the first copy)
18            if i > 0 {
19            	chanMsg = NewMessage(msg.ID, msg.Body)
20            	chanMsg.Timestamp = msg.Timestamp
21            	chanMsg.deferred = msg.deferred
22            }
23            if chanMsg.deferred != 0 {
24            	channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
25            	continue
26            }
27            err := channel.PutMessage(chanMsg)
28            ......
29        }
30    }
31    ......
32}

创建topic时,都会产生一个消息分发messagePump服务。

github.com/nsqio/nsq/nsqd/topic.go #44

1func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
2    t := &Topic{
3    ......
4    }
5    ......
6    t.waitGroup.Wrap(func() { t.messagePump() })
7    t.ctx.nsqd.Notify(t)
8    return t
9}

向topic发送消息时会触发一个memoryMsgChan事件。通过这个事件,消息被分发到topic的各个channel下。