为每条消息生成一个message Id
为每条消息生成一个message Id
curl -d 'hello world' 'http://127.0.0.1:4151/put?topic=test'
为每条消息生成一个message Id
github.com/nsqio/nsq/nsqd/http.go # 217
message结构体信息
github.com/nsqio/nsq/nsqd/message.go # 18
1type Message struct {
2 ID MessageID
3 Body []byte
4 Timestamp int64
5 Attempts uint16
6
7 // for in-flight handling
8 deliveryTS time.Time
9 clientID int64
10 pri int64
11 index int
12 deferred time.Duration
13}
将消息放入memoryMsgChan 共享通道
1func (t *Topic) put(m *Message) error {
2 select {
3 //当t.memoryMsgChan 满写不进,channel把数据写入文件
4 case t.memoryMsgChan <- m:
5 default:
6 b := bufferPoolGet()
7 err := writeMessageToBackend(b, m, t.backend)
8 bufferPoolPut(b)
9 t.ctx.nsqd.SetHealth(err)
10 if err != nil {
11 // TODO Error handle
12 ......
13 }
14 }
15 return nil
16}
默认放1W条数据,超过放入磁盘
topic的backend实现在 github.com/nsqio/go-diskqueue/diskqueue.go
如何放入磁盘
github.com/nsqio/nsq/nsqd/message.go # 40
数据保存格式
1binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
2binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
3
4n, err := w.Write(buf[:])
5total += int64(n)
6if err != nil {
7 return total, err
8}
9
10n, err = w.Write(m.ID[:])
11total += int64(n)
12if err != nil {
13 return total, err
14}
15
16n, err = w.Write(m.Body)
消息组成:将 Message 的Timestamp,Attempts,ID,Body, 再加上消息的长度len(len+Timestamp+Attempts+ID+Body),组成一条消息。
消息与消息之间有4个字节的空格
当数据大于maxBytesPerFile(默认100M),将会对文件进行切割。
github.com/nsqio/go-diskqueue/diskqueue.go # 373
1if d.writePos > d.maxBytesPerFile {
2 d.writeFileNum++
3 d.writePos = 0
4
5 // sync every time we start writing to a new file
6 // sync 里会把 writePost,readPos,writeFileNum等信息进行持久化
7 err = d.sync()
8 if err != nil {
9 d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
10 }
11
12 if d.writeFile != nil {
13 d.writeFile.Close()
14 d.writeFile = nil
15 }
16}
writePos
作用:用于追加数据。记录文件最后偏移位置。退出nsq会持久化保存,开启会初始化。