1:开启出动推送
1:开启出动推送
1:开启出动推送
nsq_to_http 和 nsqd通过 TCP 通讯
1nsq_to_http --channel=ch --topic=test \
2--post="" \
3--nsqd-tcp-address=127.0.0.1:4150 \
4--content-type="application/x-www-form-urlencoded"
2:监听前准备工作
在监听前nsq_to_http会发送
V2
IDENTIFY
SUB test ch
RDY 200
确定要监听的主题,channel和暂接收200条消息
开启读和写事件
github.com/nsqio/go-nsq/conn.go #181
1func (c *Conn) Connect() (*IdentifyResponse, error) {
2 ......
3 c.wg.Add(2)
4 atomic.StoreInt32(&c.readLoopRunning, 1)
5 // 读取nsq发来的数据
6 go c.readLoop()
7 // 对nsq响应
8 go c.writeLoop()
9 return resp, nil
10}
11
200个处理事件,把消息放送给订阅方
github.com/nsqio/go-nsq/consumer.go #1079
1func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
2 if atomic.LoadInt32(&r.connectedFlag) == 1 {
3 panic("already connected")
4 }
5
6 atomic.AddInt32(&r.runningHandlers, int32(concurrency))
7 for i := 0; i < concurrency; i++ {
8 go r.handlerLoop(handler)
9 }
10}
11
3:消息重试次数为 默认最多5次
github.com/nsqio/go-nsq/consumer.go #1126
1func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
2 // message passed the max number of attempts
3 if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
4 r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
5 message.ID, message.Attempts)
6
7 logger, ok := handler.(FailedMessageLogger)
8 if ok {
9 logger.LogFailedMessage(message)
10 }
11
12 return true
13 }
14 return false
15}
超过重试次数会发送 “FIN 消息ID” 告诉nsqd把消息删除掉
github.com/nsqio/go-nsq/conn.go #690
1func (c *Conn) onMessageFinish(m *Message) {
2 c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
3}
4:主动推送的担保
通过HTTP状态实现
github.com/nsqio/nsq/apps/nsq_to_http/nsq_to_http.go
1func (p *PostPublisher) Publish(addr string, msg []byte) error {
2 buf := bytes.NewBuffer(msg)
3 resp, err := HTTPPost(addr, buf)
4 if err != nil {
5 return err
6 }
7 io.Copy(ioutil.Discard, resp.Body)
8 resp.Body.Close()
9
10 if resp.StatusCode < 200 || resp.StatusCode >= 300 {
11 return fmt.Errorf("got status code %d", resp.StatusCode)
12 }
13 return nil
14}
15
如果订阅者返回 HTTP状态码 2XX,nsq_to_http 返回“FIN 消息ID”告诉nsqd消息接收成功,可从缓存区删除。
如果订阅者返回 HTTP状态码 非2XX,nsq_to_http返回 “REQ 消息ID”告诉nsqd,把消息从缓存区放回队列。