1:基于TCP协议通讯,TCP长连接

分享到:

1:基于TCP协议通讯,TCP长连接

1:基于TCP协议通讯,TCP长连接

2:心跳测试

nsqd 和 lookupd 之间每隔15秒做一次心跳测试

 1func (n *NSQD) lookupLoop() {
 2    ......
 3    // for announcements, lookupd determines the host automatically
 4    ticker := time.Tick(15 * time.Second)
 5    for {
 6        ......
 7        select {
 8        case <-ticker:
 9        // send a heartbeat and read a response (read detects closed conns)
10        for _, lookupPeer := range lookupPeers {
11            n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)
12            cmd := nsq.Ping()
13            _, err := lookupPeer.Command(cmd)
14            if err != nil {
15            	n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
16            }
17        }
18        ......
19        case <-n.exitChan:
20        	goto exit
21        }
22    }
23exit:
24    n.logf("LOOKUP: closing")
25}
26

nsqd使用tcp 发送 ping + 客户端信息给lookupd。 lookupd 返回 ok。完成心跳测试流程。

3:nsq 注册 lookupd

代码位置:nsqio\nsq\nsqd\lookup.go:31 第一次发送:

1cmd{Name:IDENTIFY,Params:nil,
2Body:{"broadcast_address":"apple.local",
3"hostname":"apple.local","http_port":4151,
4"tcp_port":4150,"version":"1.0.0-compat"}}

nsq 注册时,lookupd 会以ip为下标 记录nsq客户端信息,信息放入 registrationMap 里。

github.com/nsqio/nsq/nsqlookupd/lookup_protocol_v1.go #227

1client.peerInfo = &peerInfo
2if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
3    p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
4}

github.com/nsqio/nsq/nsqlookupd/registration_db.go # 71

 1// add a producer to a registration
 2func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
 3	r.Lock()
 4	defer r.Unlock()
 5	producers := r.registrationMap[k]
 6	found := false
 7	for _, producer := range producers {
 8		if producer.peerInfo.id == p.peerInfo.id {
 9			found = true
10		}
11	}
12	if found == false {
13		r.registrationMap[k] = append(producers, p)
14	}
15	return !found
16}

4:数据传输方式 使用json序列化,再通过 binary 包,把数据转为二进制。

接触到的包

1:time

2:reader

3:binary

4:atomic 善用原子操作,它会比锁更为高效

原子操作