agent udp使用
agent udp使用
agent udp使用
初始化udp
在agent开启时初始化了3个UDP服务
每个服务对应处理不同的数据格式
官方推荐使用6831端口接收数据
github.com/uber/jaeger/cmd/agent/app/flags.go #35
1var defaultProcessors = []struct {
2 model model
3 protocol protocol
4 hostPort string
5}{
6 {model: "zipkin", protocol: "compact", hostPort: ":5775"},
7 {model: "jaeger", protocol: "compact", hostPort: ":6831"},
8 {model: "jaeger", protocol: "binary", hostPort: ":6832"},
9}
UDP服务端初始化
github.com/uber/jaeger/cmd/agent/app/servers/thriftudp/transport.go #73
1func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
2 addr, err := net.ResolveUDPAddr("udp", hostPort)
3 if err != nil {
4 return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
5 }
6 conn, err := net.ListenUDP(addr.Network(), addr)
7 if err != nil {
8 return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
9 }
10 return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil
11}
初始化worker
github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #78
1// Serve initiates the readers and starts serving traffic
2func (s *ThriftProcessor) Serve() {
3 s.processing.Add(s.numProcessors)
4 for i := 0; i < s.numProcessors; i++ {
5 go s.processBuffer()
6 }
7
8 s.server.Serve()
9}
数据接收使用对象池和队列处理
github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go
1func NewTBufferedServer(
2 transport thrift.TTransport,
3 maxQueueSize int,
4 maxPacketSize int,
5 mFactory metrics.Factory,
6) (*TBufferedServer, error) {
7 //数据队列
8 dataChan := make(chan *ReadBuf, maxQueueSize)
9 //对象池初始化
10 var readBufPool = &sync.Pool{
11 New: func() interface{} {
12 return &ReadBuf{bytes: make([]byte, maxPacketSize)}
13 },
14 }
15
16 res := &TBufferedServer{dataChan: dataChan,
17 transport: transport,
18 maxQueueSize: maxQueueSize,
19 maxPacketSize: maxPacketSize,
20 readBufPool: readBufPool,
21 }
22 metrics.Init(&res.metrics, mFactory, nil)
23 return res, nil
24}
从客户端接收数据放入队列
github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go #80
1func (s *TBufferedServer) Serve() {
2 atomic.StoreUint32(&s.serving, 1)
3 for s.IsServing() {
4 readBuf := s.readBufPool.Get().(*ReadBuf)
5 n, err := s.transport.Read(readBuf.bytes)
6 if err == nil {
7 readBuf.n = n
8 s.metrics.PacketSize.Update(int64(n))
9 select {
10 case s.dataChan <- readBuf:
11 s.metrics.PacketsProcessed.Inc(1)
12 s.updateQueueSize(1)
13 default:
14 //这里需要注意,如果写比处理快,agent将会扔掉超出的部分数据
15 s.metrics.PacketsDropped.Inc(1)
16 }
17 } else {
18 s.metrics.ReadError.Inc(1)
19 }
20 }
21}
注意点:agent的队列默认存放1000条消息,超过部分会被扔掉。
基于服务器的承载能力可以适当调节参数,减少数据的丢失,保持调用链的完整。
- 增加队列长度(default 1000) --processor.zipkin-compact.server-queue-size
- 增加处理协成数 (default 50) --processor.zipkin-compact.workers
- 增加agent服务节点
处理队列数据
github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #104
1func (s *ThriftProcessor) processBuffer() {
2 for readBuf := range s.server.DataChan() {
3 protocol := s.protocolPool.Get().(thrift.TProtocol)
4 protocol.Transport().Write(readBuf.GetBytes())
5 s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
6
7 if ok, _ := s.handler.Process(protocol, protocol); !ok {
8 // TODO log the error
9 s.metrics.HandlerProcessError.Inc(1)
10 }
11 s.protocolPool.Put(protocol)
12 }
13 s.processing.Done()
14}
jaeger 解析6831端口的数据
github.com/uber/jaeger/thrift-gen/jaeger/agent.go #105
1func (p *AgentProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
2 name, _, seqId, err := iprot.ReadMessageBegin()
3
4 if err != nil {
5 return false, err
6 }
7 if processor, ok := p.GetProcessorFunction(name); ok {
8 return processor.Process(seqId, iprot, oprot)
9 }
10 iprot.Skip(thrift.STRUCT)
11 iprot.ReadMessageEnd()
12 x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
13 oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
14 x7.Write(oprot)
15 oprot.WriteMessageEnd()
16 oprot.Flush()
17 return false, x7
18
19}
解析EmitBatch数据
github.com/uber/jaeger/thrift-gen/agent/agent.go #187
1func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
2 args := AgentEmitBatchArgs{}
3 if err = args.Read(iprot); err != nil {
4 iprot.ReadMessageEnd()
5 return false, err
6 }
7
8 iprot.ReadMessageEnd()
9 var err2 error
10 if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
11 return true, err2
12 }
13 return true, nil
14}
数据解析完后使用tchan 提交数据
github.com/uber/jaeger/thrift-gen/jaeger/tchan-jaeger.go #39
1func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) {
2 var resp CollectorSubmitBatchesResult
3 args := CollectorSubmitBatchesArgs{
4 Batches: batches,
5 }
6 success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp)
7 if err == nil && !success {
8 }
9
10 return resp.GetSuccess(), err
11}