介绍nsq时常见的图。通过源码解剖它的实现。
介绍nsq时常见的图。通过源码解剖它的实现。
介绍nsq时常见的图。通过源码解剖它的实现。
messagePump 源码
github.com/nsqio/nsq/nsqd/topic.go #215
1 func (t *Topic) messagePump() {
2 ......
3 for {
4 select {
5 case msg = <-memoryMsgChan:
6 ......
7 case <-t.exitChan:
8 goto exit
9 }
10
11 for i, channel := range chans {
12 chanMsg := msg
13
14 // copy the message because each channel
15 // needs a unique instance but...
16 // fastpath to avoid copy if its the first channel
17 // (the topic already created the first copy)
18 if i > 0 {
19 chanMsg = NewMessage(msg.ID, msg.Body)
20 chanMsg.Timestamp = msg.Timestamp
21 chanMsg.deferred = msg.deferred
22 }
23 if chanMsg.deferred != 0 {
24 channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
25 continue
26 }
27 err := channel.PutMessage(chanMsg)
28 ......
29 }
30 }
31 ......
32}
创建topic时,都会产生一个消息分发messagePump服务。
github.com/nsqio/nsq/nsqd/topic.go #44
1func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
2 t := &Topic{
3 ......
4 }
5 ......
6 t.waitGroup.Wrap(func() { t.messagePump() })
7 t.ctx.nsqd.Notify(t)
8 return t
9}