TChannel 接收 agent 提交过来的数据
TChannel 接收 agent 提交过来的数据
TChannel 接收 agent 提交过来的数据
github.com/jaegertracing/jaeger/cmd/collector/app/span_handler.go #69
1func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
2 responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
3 for _, batch := range batches {
4 mSpans := make([]*model.Span, 0, len(batch.Spans))
5 for _, span := range batch.Spans {
6 mSpan := jConv.ToDomainSpan(span, batch.Process)
7 mSpans = append(mSpans, mSpan)
8 }
9 oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType)
10 if err != nil {
11 return nil, err
12 }
13 batchOk := true
14 for _, ok := range oks {
15 if !ok {
16 batchOk = false
17 break
18 }
19 }
20 res := &jaeger.BatchSubmitResponse{
21 Ok: batchOk,
22 }
23 responses = append(responses, res)
24 }
25 return responses, nil
26}
经过处理后数据放入BoundedQueue(有界队列)
github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #130
1func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool {
2 spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
3 spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)
4
5 if !sp.filterSpan(span) {
6 spanCounts.Rejected.Inc(int64(1))
7 return true // as in "not dropped", because it's actively rejected
8 }
9 item := &queueItem{
10 queuedTime: time.Now(),
11 span: span,
12 }
13 addedToQueue := sp.queue.Produce(item)
14 if !addedToQueue {
15 sp.metrics.ErrorBusy.Inc(1)
16 }
17 return addedToQueue
18}
19
启用50个协成,处理队列消息
github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #53
1func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
2 var startWG sync.WaitGroup
3 for i := 0; i < num; i++ {
4 q.stopWG.Add(1)
5 //这个WG是否多余?
6 startWG.Add(1)
7 go func() {
8 startWG.Done()
9 defer q.stopWG.Done()
10 for {
11 select {
12 case item := <-q.items:
13 atomic.AddInt32(&q.size, -1)
14 consumer(item)
15 case <-q.stopCh:
16 return
17 }
18 }
19 }()
20 }
21 startWG.Wait()
22}
collector 在处理队列数据的时候和agent一样,处理不完会直接扔掉
可以通过配置参数优化
-
--collector.num-workers (default 50)
-
--collector.queue-size (default 2000)
-
增加collector服务节点
从队列拿出来后经过处理,把数据存入cassandra数据库
github.com/jaegertracing/jaeger/cmd/collector/app/span_processor.go #101
1func (sp *spanProcessor) saveSpan(span *model.Span) {
2 startTime := time.Now()
3 if err := sp.spanWriter.WriteSpan(span); err != nil {
4 sp.logger.Error("Failed to save span", zap.Error(err))
5 } else {
6 sp.metrics.SavedBySvc.ReportServiceNameForSpan(span)
7 }
8 sp.metrics.SaveLatency.Record(time.Now().Sub(startTime))
9}
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/writer.go #122
1func (s *SpanWriter) WriteSpan(span *model.Span) error {
2 ds := dbmodel.FromDomain(span)
3 mainQuery := s.session.Query(
4 insertSpan,
5 ds.TraceID,
6 ds.SpanID,
7 ds.SpanHash,
8 ds.ParentID,
9 ds.OperationName,
10 ds.Flags,
11 ds.StartTime,
12 ds.Duration,
13 ds.Tags,
14 ds.Logs,
15 ds.Refs,
16 ds.Process,
17 )
18
19 if err := s.writerMetrics.traces.Exec(mainQuery, s.logger); err != nil {
20 return s.logError(ds, err, "Failed to insert span", s.logger)
21 }
22 if err := s.saveServiceNameAndOperationName(ds.ServiceName, ds.OperationName); err != nil {
23 // should this be a soft failure?
24 return s.logError(ds, err, "Failed to insert service name and operation name", s.logger)
25 }
26
27 if err := s.indexByTags(span, ds); err != nil {
28 return s.logError(ds, err, "Failed to index tags", s.logger)
29 }
30
31 if err := s.indexBySerice(span.TraceID, ds); err != nil {
32 return s.logError(ds, err, "Failed to index service name", s.logger)
33 }
34
35 if err := s.indexByOperation(span.TraceID, ds); err != nil {
36 return s.logError(ds, err, "Failed to index operation name", s.logger)
37 }
38
39 if err := s.indexByDuration(ds, span.StartTime); err != nil {
40 return s.logError(ds, err, "Failed to index duration", s.logger)
41 }
42 return nil
保存saveServiceNameAndOperationName,collector借助缓存(key/value 和 lru)
借助缓存,Jaeger实现不重复写入Service和OperationName,是否已经写入通过缓存判断,不查询cassandra,减少了查询压力。
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/service_names.go #69
1func (s *ServiceNamesStorage) Write(serviceName string) error {
2 var err error
3 query := s.session.Query(s.InsertStmt)
4 if inCache := checkWriteCache(serviceName, s.serviceNames, s.writeCacheTTL); !inCache {
5 q := query.Bind(serviceName)
6 err2 := s.metrics.Exec(q, s.logger)
7 if err2 != nil {
8 err = err2
9 }
10 }
11 return err
12}
在默认情况下ServiceName缓存长度为10000,OperationName缓存长度十万,如果超过限制重复写入。从实际上考虑这样的限制是否够用?其实Uber发展到现在也只有1000多个ServiceName,所以这个设置可以满足很多公司。