1:使用TCP协议获取
1:使用TCP协议获取
1:使用TCP协议获取
go-nsq 客户端发送消息:
1V2
2
3IDENTIFY
4
5SUB is_test2 ch
6
7RDY 1
8
9RDY 1
10
11FIN 07ee6e602a3fa000
当服务器收到客户端的请求后,会启动messagePump服务。这个服务有点像提前监听:先开启监听服务,至于监听谁你以后再告诉我。
github.com/nsqio/nsq/nsqd/protocol_v2.go #206
V2
github.com/nsqio/nsq/nsqd/tcp.go #33
版本号,用于优雅升级
IDENTIFY
github.com/nsqio/nsq/nsqd/protocol_v2.go #355
验证和初始化客户端通信信息
SUB is_test2 ch
github.com/nsqio/nsq/nsqd/protocol_v2.go #589
1func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
2 ......
3 topic := p.ctx.nsqd.GetTopic(topicName)
4 channel := topic.GetChannel(channelName)
5 channel.AddClient(client.ID, client)
6
7 atomic.StoreInt32(&client.State, stateSubscribed)
8 client.Channel = channel
9 // update message pump
10 client.SubEventChan <- channel
11 ......
12}
13
确定将要处理的channel
触发 client.SubEventChan 事件,告诉messagePump服务需要监听的channel。
1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
2
3 for {
4 ......
5 select {
6 ......
7 case subChannel = <-subEventChan:
8 // you can't SUB anymore
9 subEventChan = nil
10 ......
11 }
12 }
13 ......
14}
RDY 1 go-nsq客户端发送2次,应该是个BUG。
1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
2
3 for {
4
5 if subChannel == nil || !client.IsReadyForMessages() {
6 .....
7 } else if flushed {
8 // last iteration we flushed...
9 // do not select on the flusher ticker channel
10 // 监听 channel
11 memoryMsgChan = subChannel.memoryMsgChan
12 backendMsgChan = subChannel.backend.ReadChan()
13 flusherChan = nil
14 } else {
15 ......
16 }
17
18 ......
19 select {
20 ......
21 //没有看出作用
22 case <-client.ReadyStateChan:
23 ......
24 case msg := <-memoryMsgChan:
25 if sampleRate > 0 && rand.Int31n(100) > sampleRate {
26 continue
27 }
28 msg.Attempts++
29
30 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
31 client.SendingMessage()
32 err = p.SendMessage(client, msg, &buf)
33 if err != nil {
34 goto exit
35 }
36 flushed = false
37 ......
38 }
39 }
40 ......
41}
经过SUB后,客户端会发送RDY告诉服务器,已准备好接收。服务器再从memoryMsgChan 获取数据,响应客户端同时将数据放入 inFlightMessages(消息担保)。
FIN 07ee6e602a3fa000
github.com/nsqio/nsq/nsqd/protocol_v2.go #667
07ee6e602a3fa000 为MessageId,根据这个MessageId删除inFlightMessages里的数据完成整个投送过程。