1:使用TCP协议获取

分享到:

1:使用TCP协议获取

images

1:使用TCP协议获取

go-nsq 客户端发送消息:

 1V2
 2
 3IDENTIFY
 4
 5SUB is_test2 ch
 6
 7RDY 1
 8
 9RDY 1
10
11FIN 07ee6e602a3fa000

当服务器收到客户端的请求后,会启动messagePump服务。这个服务有点像提前监听:先开启监听服务,至于监听谁你以后再告诉我。

github.com/nsqio/nsq/nsqd/protocol_v2.go #206

V2

github.com/nsqio/nsq/nsqd/tcp.go #33

版本号,用于优雅升级

IDENTIFY

github.com/nsqio/nsq/nsqd/protocol_v2.go #355

验证和初始化客户端通信信息

SUB is_test2 ch

github.com/nsqio/nsq/nsqd/protocol_v2.go #589

 1func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
 2    ......
 3    topic := p.ctx.nsqd.GetTopic(topicName)
 4    channel := topic.GetChannel(channelName)
 5    channel.AddClient(client.ID, client)
 6    
 7    atomic.StoreInt32(&client.State, stateSubscribed)
 8    client.Channel = channel
 9    // update message pump
10    client.SubEventChan <- channel
11    ......
12}
13

确定将要处理的channel

触发 client.SubEventChan 事件,告诉messagePump服务需要监听的channel。

 1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
 2
 3    for {
 4        ......
 5    	select {
 6        ......
 7    	case subChannel = <-subEventChan:
 8    		// you can't SUB anymore
 9    		subEventChan = nil
10        ......
11    	}
12    }
13    ......
14}

RDY 1 go-nsq客户端发送2次,应该是个BUG。

 1func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
 2
 3    for {
 4    
 5        if subChannel == nil || !client.IsReadyForMessages() {
 6        .....
 7        } else if flushed {
 8            // last iteration we flushed...
 9            // do not select on the flusher ticker channel
10            // 监听 channel
11            memoryMsgChan = subChannel.memoryMsgChan
12            backendMsgChan = subChannel.backend.ReadChan()
13            flusherChan = nil
14        } else {
15            ......
16        }
17    
18        ......
19    	select {
20    	......
21    	//没有看出作用
22    	case <-client.ReadyStateChan:
23        ......
24    	case msg := <-memoryMsgChan:
25            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
26            	continue
27            }
28            msg.Attempts++
29            
30            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
31            client.SendingMessage()
32            err = p.SendMessage(client, msg, &buf)
33            if err != nil {
34            	goto exit
35            }
36            flushed = false
37            ......
38    	}
39    }
40    ......
41}

经过SUB后,客户端会发送RDY告诉服务器,已准备好接收。服务器再从memoryMsgChan 获取数据,响应客户端同时将数据放入 inFlightMessages(消息担保)。

FIN 07ee6e602a3fa000

github.com/nsqio/nsq/nsqd/protocol_v2.go #667

07ee6e602a3fa000 为MessageId,根据这个MessageId删除inFlightMessages里的数据完成整个投送过程。