agent udp使用

分享到:

agent udp使用

agent udp使用

初始化udp

在agent开启时初始化了3个UDP服务

每个服务对应处理不同的数据格式

官方推荐使用6831端口接收数据

github.com/uber/jaeger/cmd/agent/app/flags.go #35

1var defaultProcessors = []struct {
2	model    model
3	protocol protocol
4	hostPort string
5}{
6	{model: "zipkin", protocol: "compact", hostPort: ":5775"},
7	{model: "jaeger", protocol: "compact", hostPort: ":6831"},
8	{model: "jaeger", protocol: "binary", hostPort: ":6832"},
9}

UDP服务端初始化

github.com/uber/jaeger/cmd/agent/app/servers/thriftudp/transport.go #73

 1func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
 2    addr, err := net.ResolveUDPAddr("udp", hostPort)
 3    if err != nil {
 4    	return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
 5    }
 6    conn, err := net.ListenUDP(addr.Network(), addr)
 7    if err != nil {
 8    	return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
 9    }
10    return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil
11}

初始化worker

github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #78

1// Serve initiates the readers and starts serving traffic
2func (s *ThriftProcessor) Serve() {
3	s.processing.Add(s.numProcessors)
4	for i := 0; i < s.numProcessors; i++ {
5		go s.processBuffer()
6	}
7
8	s.server.Serve()
9}

数据接收使用对象池和队列处理

github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go

 1func NewTBufferedServer(
 2	transport thrift.TTransport,
 3	maxQueueSize int,
 4	maxPacketSize int,
 5	mFactory metrics.Factory,
 6) (*TBufferedServer, error) {
 7    //数据队列
 8    dataChan := make(chan *ReadBuf, maxQueueSize)
 9    //对象池初始化
10    var readBufPool = &sync.Pool{
11    	New: func() interface{} {
12    		return &ReadBuf{bytes: make([]byte, maxPacketSize)}
13    	},
14    }
15    
16    res := &TBufferedServer{dataChan: dataChan,
17    	transport:     transport,
18    	maxQueueSize:  maxQueueSize,
19    	maxPacketSize: maxPacketSize,
20    	readBufPool:   readBufPool,
21    }
22    metrics.Init(&res.metrics, mFactory, nil)
23    return res, nil
24}

从客户端接收数据放入队列

github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go #80

 1func (s *TBufferedServer) Serve() {
 2	atomic.StoreUint32(&s.serving, 1)
 3	for s.IsServing() {
 4		readBuf := s.readBufPool.Get().(*ReadBuf)
 5		n, err := s.transport.Read(readBuf.bytes)
 6		if err == nil {
 7			readBuf.n = n
 8			s.metrics.PacketSize.Update(int64(n))
 9			select {
10			case s.dataChan <- readBuf:
11				s.metrics.PacketsProcessed.Inc(1)
12				s.updateQueueSize(1)
13			default:
14			   //这里需要注意,如果写比处理快,agent将会扔掉超出的部分数据
15			   s.metrics.PacketsDropped.Inc(1)
16			}
17		} else {
18			s.metrics.ReadError.Inc(1)
19		}
20	}
21}

注意点:agent的队列默认存放1000条消息,超过部分会被扔掉。

基于服务器的承载能力可以适当调节参数,减少数据的丢失,保持调用链的完整。

  • 增加队列长度(default 1000) --processor.zipkin-compact.server-queue-size
  • 增加处理协成数 (default 50) --processor.zipkin-compact.workers
  • 增加agent服务节点

处理队列数据

github.com/uber/jaeger/cmd/agent/app/processors/thrift_processor.go #104

 1func (s *ThriftProcessor) processBuffer() {
 2	for readBuf := range s.server.DataChan() {
 3		protocol := s.protocolPool.Get().(thrift.TProtocol)
 4		protocol.Transport().Write(readBuf.GetBytes())
 5		s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
 6
 7		if ok, _ := s.handler.Process(protocol, protocol); !ok {
 8			// TODO log the error
 9			s.metrics.HandlerProcessError.Inc(1)
10		}
11		s.protocolPool.Put(protocol)
12	}
13	s.processing.Done()
14}

jaeger 解析6831端口的数据

github.com/uber/jaeger/thrift-gen/jaeger/agent.go #105

 1func (p *AgentProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
 2	name, _, seqId, err := iprot.ReadMessageBegin()
 3
 4	if err != nil {
 5		return false, err
 6	}
 7	if processor, ok := p.GetProcessorFunction(name); ok {
 8		return processor.Process(seqId, iprot, oprot)
 9	}
10	iprot.Skip(thrift.STRUCT)
11	iprot.ReadMessageEnd()
12	x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
13	oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
14	x7.Write(oprot)
15	oprot.WriteMessageEnd()
16	oprot.Flush()
17	return false, x7
18
19}

解析EmitBatch数据

github.com/uber/jaeger/thrift-gen/agent/agent.go #187

 1func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
 2	args := AgentEmitBatchArgs{}
 3	if err = args.Read(iprot); err != nil {
 4		iprot.ReadMessageEnd()
 5		return false, err
 6	}
 7
 8	iprot.ReadMessageEnd()
 9	var err2 error
10	if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
11		return true, err2
12	}
13	return true, nil
14}

数据解析完后使用tchan 提交数据

github.com/uber/jaeger/thrift-gen/jaeger/tchan-jaeger.go #39

 1func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) {
 2	var resp CollectorSubmitBatchesResult
 3	args := CollectorSubmitBatchesArgs{
 4		Batches: batches,
 5	}
 6	success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp)
 7	if err == nil && !success {
 8	}
 9
10	return resp.GetSuccess(), err
11}