为每条消息生成一个message Id

分享到:

为每条消息生成一个message Id

curl -d 'hello world' 'http://127.0.0.1:4151/put?topic=test'

为每条消息生成一个message Id

github.com/nsqio/nsq/nsqd/http.go # 217

message结构体信息

github.com/nsqio/nsq/nsqd/message.go # 18

 1type Message struct {
 2	ID        MessageID
 3	Body      []byte
 4	Timestamp int64
 5	Attempts  uint16
 6
 7	// for in-flight handling
 8	deliveryTS time.Time
 9	clientID   int64
10	pri        int64
11	index      int
12	deferred   time.Duration
13}

将消息放入memoryMsgChan 共享通道

 1func (t *Topic) put(m *Message) error {
 2    select {
 3    //当t.memoryMsgChan 满写不进,channel把数据写入文件
 4    case t.memoryMsgChan <- m:
 5    default:
 6    	b := bufferPoolGet()
 7    	err := writeMessageToBackend(b, m, t.backend)
 8    	bufferPoolPut(b)
 9    	t.ctx.nsqd.SetHealth(err)
10    	if err != nil {
11    	// TODO Error handle
12        ...... 
13    	}
14    }
15    return nil
16}

默认放1W条数据,超过放入磁盘

topic的backend实现在 github.com/nsqio/go-diskqueue/diskqueue.go

如何放入磁盘

github.com/nsqio/nsq/nsqd/message.go # 40

数据保存格式

images

 1binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
 2binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
 3
 4n, err := w.Write(buf[:])
 5total += int64(n)
 6if err != nil {
 7	return total, err
 8}
 9
10n, err = w.Write(m.ID[:])
11total += int64(n)
12if err != nil {
13	return total, err
14}
15
16n, err = w.Write(m.Body)

消息组成:将 Message 的Timestamp,Attempts,ID,Body, 再加上消息的长度len(len+Timestamp+Attempts+ID+Body),组成一条消息。

消息与消息之间有4个字节的空格

当数据大于maxBytesPerFile(默认100M),将会对文件进行切割。

github.com/nsqio/go-diskqueue/diskqueue.go # 373

 1if d.writePos > d.maxBytesPerFile {
 2    d.writeFileNum++
 3    d.writePos = 0
 4    
 5    // sync every time we start writing to a new file
 6    // sync 里会把 writePost,readPos,writeFileNum等信息进行持久化
 7    err = d.sync()
 8    if err != nil {
 9    	d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
10    }
11    
12    if d.writeFile != nil {
13    	d.writeFile.Close()
14    	d.writeFile = nil
15    }
16}

writePos

作用:用于追加数据。记录文件最后偏移位置。退出nsq会持久化保存,开启会初始化。

readPos 和writePos相对

接触的包

os.Sync
go文件操作大全