1:开启出动推送

分享到:

1:开启出动推送

1:开启出动推送

nsq_to_http 和 nsqd通过 TCP 通讯

1nsq_to_http --channel=ch --topic=test \
2--post="" \
3--nsqd-tcp-address=127.0.0.1:4150 \
4--content-type="application/x-www-form-urlencoded"

2:监听前准备工作

在监听前nsq_to_http会发送

V2

IDENTIFY

SUB test ch

RDY 200

确定要监听的主题,channel和暂接收200条消息

开启读和写事件

github.com/nsqio/go-nsq/conn.go #181

 1func (c *Conn) Connect() (*IdentifyResponse, error) {
 2    ......
 3    c.wg.Add(2)
 4    atomic.StoreInt32(&c.readLoopRunning, 1)
 5    // 读取nsq发来的数据
 6    go c.readLoop()
 7    // 对nsq响应
 8    go c.writeLoop()
 9    return resp, nil
10}
11

200个处理事件,把消息放送给订阅方

github.com/nsqio/go-nsq/consumer.go #1079

 1func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
 2    if atomic.LoadInt32(&r.connectedFlag) == 1 {
 3        panic("already connected")
 4    }
 5    
 6    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
 7    for i := 0; i < concurrency; i++ {
 8        go r.handlerLoop(handler)
 9    }
10}
11

3:消息重试次数为 默认最多5次

github.com/nsqio/go-nsq/consumer.go #1126

 1func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
 2    // message passed the max number of attempts
 3    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
 4        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
 5        	message.ID, message.Attempts)
 6        
 7        logger, ok := handler.(FailedMessageLogger)
 8        if ok {
 9            logger.LogFailedMessage(message)
10        }
11        
12        return true
13    }
14    return false
15}

超过重试次数会发送 “FIN 消息ID” 告诉nsqd把消息删除掉

github.com/nsqio/go-nsq/conn.go #690

1func (c *Conn) onMessageFinish(m *Message) {
2    c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
3}

4:主动推送的担保

通过HTTP状态实现

github.com/nsqio/nsq/apps/nsq_to_http/nsq_to_http.go

 1func (p *PostPublisher) Publish(addr string, msg []byte) error {
 2    buf := bytes.NewBuffer(msg)
 3    resp, err := HTTPPost(addr, buf)
 4    if err != nil {
 5        return err
 6    }
 7    io.Copy(ioutil.Discard, resp.Body)
 8    resp.Body.Close()
 9    
10    if resp.StatusCode < 200 || resp.StatusCode >= 300 {
11    	return fmt.Errorf("got status code %d", resp.StatusCode)
12    }
13    return nil
14}
15

如果订阅者返回 HTTP状态码 2XX,nsq_to_http 返回“FIN 消息ID”告诉nsqd消息接收成功,可从缓存区删除。

如果订阅者返回 HTTP状态码 非2XX,nsq_to_http返回 “REQ 消息ID”告诉nsqd,把消息从缓存区放回队列。