TChannel 接收 agent 提交过来的数据

分享到:

TChannel 接收 agent 提交过来的数据

TChannel 接收 agent 提交过来的数据

github.com/jaegertracing/jaeger/cmd/collector/app/span_handler.go #69

 1func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
 2	responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
 3	for _, batch := range batches {
 4		mSpans := make([]*model.Span, 0, len(batch.Spans))
 5		for _, span := range batch.Spans {
 6			mSpan := jConv.ToDomainSpan(span, batch.Process)
 7			mSpans = append(mSpans, mSpan)
 8		}
 9		oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType)
10		if err != nil {
11			return nil, err
12		}
13		batchOk := true
14		for _, ok := range oks {
15			if !ok {
16				batchOk = false
17				break
18			}
19		}
20		res := &jaeger.BatchSubmitResponse{
21			Ok: batchOk,
22		}
23		responses = append(responses, res)
24	}
25	return responses, nil
26}

经过处理后数据放入BoundedQueue(有界队列)

github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #130

 1func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool {
 2	spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
 3	spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)
 4
 5	if !sp.filterSpan(span) {
 6		spanCounts.Rejected.Inc(int64(1))
 7		return true // as in "not dropped", because it's actively rejected
 8	}
 9	item := &queueItem{
10		queuedTime: time.Now(),
11		span:       span,
12	}
13	addedToQueue := sp.queue.Produce(item)
14	if !addedToQueue {
15		sp.metrics.ErrorBusy.Inc(1)
16	}
17	return addedToQueue
18}
19

启用50个协成,处理队列消息

github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #53

 1func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
 2    var startWG sync.WaitGroup
 3    for i := 0; i < num; i++ {
 4        q.stopWG.Add(1)
 5        //这个WG是否多余?
 6        startWG.Add(1)
 7        go func() {
 8            startWG.Done()
 9            defer q.stopWG.Done()
10            for {
11                select {
12                case item := <-q.items:
13                    atomic.AddInt32(&q.size, -1)
14                    consumer(item)
15                case <-q.stopCh:
16                    return
17                }
18            }
19        }()
20    }
21    startWG.Wait()
22}

collector 在处理队列数据的时候和agent一样,处理不完会直接扔掉

可以通过配置参数优化

  • --collector.num-workers (default 50)

  • --collector.queue-size (default 2000)

  • 增加collector服务节点

从队列拿出来后经过处理,把数据存入cassandra数据库

github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #101

1func (sp *spanProcessor) saveSpan(span *model.Span) {
2	startTime := time.Now()
3	if err := sp.spanWriter.WriteSpan(span); err != nil {
4		sp.logger.Error("Failed to save span", zap.Error(err))
5	} else {
6		sp.metrics.SavedBySvc.ReportServiceNameForSpan(span)
7	}
8	sp.metrics.SaveLatency.Record(time.Now().Sub(startTime))
9}

github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/writer.go #122

 1func (s *SpanWriter) WriteSpan(span *model.Span) error {
 2	ds := dbmodel.FromDomain(span)
 3	mainQuery := s.session.Query(
 4		insertSpan,
 5		ds.TraceID,
 6		ds.SpanID,
 7		ds.SpanHash,
 8		ds.ParentID,
 9		ds.OperationName,
10		ds.Flags,
11		ds.StartTime,
12		ds.Duration,
13		ds.Tags,
14		ds.Logs,
15		ds.Refs,
16		ds.Process,
17	)
18
19	if err := s.writerMetrics.traces.Exec(mainQuery, s.logger); err != nil {
20		return s.logError(ds, err, "Failed to insert span", s.logger)
21	}
22	if err := s.saveServiceNameAndOperationName(ds.ServiceName, ds.OperationName); err != nil {
23		// should this be a soft failure?
24		return s.logError(ds, err, "Failed to insert service name and operation name", s.logger)
25	}
26
27	if err := s.indexByTags(span, ds); err != nil {
28		return s.logError(ds, err, "Failed to index tags", s.logger)
29	}
30
31	if err := s.indexBySerice(span.TraceID, ds); err != nil {
32		return s.logError(ds, err, "Failed to index service name", s.logger)
33	}
34
35	if err := s.indexByOperation(span.TraceID, ds); err != nil {
36		return s.logError(ds, err, "Failed to index operation name", s.logger)
37	}
38
39	if err := s.indexByDuration(ds, span.StartTime); err != nil {
40		return s.logError(ds, err, "Failed to index duration", s.logger)
41	}
42	return nil

保存saveServiceNameAndOperationName,collector借助缓存(key/value 和 lru)

借助缓存,Jaeger实现不重复写入Service和OperationName,是否已经写入通过缓存判断,不查询cassandra,减少了查询压力。

github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/service_names.go #69

 1func (s *ServiceNamesStorage) Write(serviceName string) error {
 2    var err error
 3    query := s.session.Query(s.InsertStmt)
 4    if inCache := checkWriteCache(serviceName, s.serviceNames, s.writeCacheTTL); !inCache {
 5        q := query.Bind(serviceName)
 6        err2 := s.metrics.Exec(q, s.logger)
 7        if err2 != nil {
 8        	err = err2
 9        }
10    }
11    return err
12}

    在默认情况下ServiceName缓存长度为10000,OperationName缓存长度十万,如果超过限制重复写入。从实际上考虑这样的限制是否够用?其实Uber发展到现在也只有1000多个ServiceName,所以这个设置可以满足很多公司。