声明

分享到:

声明

image

声明

  1. 这篇文章需要了解istio,k8s,golang,envoy,mixer基础知识
  2. 分析的环境为k8s,istio版本为0.8.0

遥测报告是什么

    这篇文章主要介绍mixer提供的一个GRPC接口,这个接口负责接收envoy上报的日志,并将日志在stdio和prometheus展现出来。 “遥测报告”这个词是从istio的中文翻译文档借过来,第一次听到这个词感觉很陌生,很高大上。通过了解源码,用 “日志订阅“ 这个词来理解这个接口的作用会容易点。用一句话来总结这个接口的功能:我有这些日志,你想用来做什么?stdio和prometheus只是这些日志的另一种展示形式。

 1istio.io/istio/mixer/pkg/api/grpcServer.go #187
 2func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
 3  ......
 4  var errors *multierror.Error
 5  for i := 0; i < len(req.Attributes); i++ {
 6    ......
 7    if i > 0 {
 8      if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {
 9        ......
10        break
11      }
12    }
13    ......
14    if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {
15      ......
16    }
17    ......
18    if err := reporter.Report(reportBag); err != nil {
19      ......
20      continue
21    }
22    ......
23  }
24  ......
25  if err := reporter.Flush(); err != nil {
26    errors = multierror.Append(errors, err)
27  }
28  reporter.Done()
29  ......
30  return reportResp, nil
31}

接收了什么数据接收 —— ReportRequest

    Report接口的第二个参数是envoy上报给mixer的数据。下面的数据来源:把日志打印到终端后再截取出来。

结构

1istio.io/api/mixer/v1/report.pb.go #22
2type ReportRequest struct {
3  ......
4  Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`
5  ......
6  DefaultWords []string 
7  ......
8  GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`
9}

接收的数据

req.Attributes[{"strings":{"131":92,"152":-1,"154":-2,"17":-7,"18":-4,"19":90,"22":92},"int64s":{"1":33314,"151":8080,"169":292,"170":918,"23":0,"27":780,"30":200},"bools":{"177":false},"timestamps":{"24":"2018-07-05T08:12:20.125365976Z","28":"2018-07-05T08:12:20.125757852Z"},"durations":{"29":426699},"bytes":{"0":"rBQDuw==","150":"AAAAAAAAAAAAAP//rBQDqg=="},"string_maps":{"15":{"entries":{"100":92,"102":-5,"118":113,"119":-3,"31":-4,"32":90,"33":-7,"55":134,"98":-6}},"26":{"entries":{"117":134,"35":136,"55":-9,"58":110,"60":-8,"82":93}}}}]

req.DefaultWords["istio-pilot.istio-system.svc.cluster.local","kubernetes://istio-pilot-8696f764dd-fqxtg.istio-system","1000","rds","3a7a649f-4eeb-4d70-972c-ad2d43a680af","172.00.00.000","/v1/routes/8088/index/sidecar~172.20.3.187~index-85df88964c-tzzds.default~default.svc.cluster.local","Thu, 05 Jul 2018 08:12:19 GMT","780","/v1/routes/9411/index/sidecar~172.00.00.000~index-85df88964c-tzzds.default~default.svc.cluster.local","bc1f172f-b8e3-4ec0-a070-f2f6de38a24f","718"]

req.GlobalWordCount178

    第一次看到这些数据的时候满脑子问号,和官网介绍的属性词汇一点关联都看不到。在这些数据里我们最主要关注Attributes下的类型:strings,int64s......和那些奇怪的数字。下面会揭开这些谜团。

数据转换 —— UpdateBagFromProto

image

globalList

1istio.io/istio/mixer/pkg/attribute/list.gen.go #13
2globalList = []string{
3    "source.ip",
4    "source.port",
5    "source.name",
6    ......
7}

UpdateBagFromProto

 1istio.io/istio/mixer/pkg/attribute/mutableBag.go #3018
 2func (mb *MutableBag) UpdateBagFromProto(attrs *mixerpb.CompressedAttributes, globalWordList []string) error {
 3  messageWordList := attrs.Words
 4  ......
 5  lg("  setting string attributes:")
 6  for k, v := range attrs.Strings {
 7    name, e = lookup(k, e, globalWordList, messageWordList)
 8    value, e = lookup(v, e, globalWordList, messageWordList)
 9    if err := mb.insertProtoAttr(name, value, seen, lg); err != nil {
10      return err
11    }
12  }
13  lg("  setting int64 attributes:")
14  ......
15  lg("  setting double attributes:")
16  ......
17  lg("  setting bool attributes:")
18  ......
19  lg("  setting timestamp attributes:")
20  ......
21  lg("  setting duration attributes:")
22  ......
23  lg("  setting bytes attributes:")
24  ......
25  lg("  setting string map attributes:")
26
27  ......
28  return e
29}

    Istio属性是强类型,所以在数据转换会根据类型一一转换。从上图可以看出由DefaultWordsglobalList组成一个词典,而 Attributes 记录了上报数据的位置,经过 UpdateBagFromProto的处理,最终转换为:官方的属性词汇

转换结果

1connection.mtls               : false
2context.protocol              : http
3destination.port              : 8080
4......
5request.host                  : rds
6request.method                : GET
7......

数据加工 —— Preprocess

    这个方法在k8s环境下的结果是追加数据

 1istio.io/istio/mixer/template/template.gen.go #33425
 2outBag := newWrapperAttrBag(
 3  func(name string) (value interface{}, found bool) {
 4    field := strings.TrimPrefix(name, fullOutName)
 5    if len(field) != len(name) && out.WasSet(field) {
 6      switch field {
 7      case "source_pod_ip":
 8        return []uint8(out.SourcePodIp), true
 9      case "source_pod_name":
10        return out.SourcePodName, true
11        ......
12      default:
13        return nil, false
14      }
15    }
16    return attrs.Get(name)
17  }
18  ......
19)
20return mapper(outBag)

最终追加的数据

1destination.labels            : map[istio:pilot pod-template-hash:4252932088]
2destination.namespace         : istio-system
3......

数据分发 —— Report

    Report会把数据分发到Variety = istio_adapter_model_v1beta1.TEMPLATE_VARIETY_REPORT Template 里,当然还有一些过滤条件,在当前环境下会分发到 logentry Metric

 1istio.io/istio/mixer/pkg/runtime/dispatcher/session.go #105
 2func (s *session) dispatch() error {
 3  ......
 4  for _, destination := range destinations.Entries() {
 5    var state *dispatchState
 6    if s.variety == tpb.TEMPLATE_VARIETY_REPORT {
 7      state = s.reportStates[destination]
 8      if state == nil {
 9        state = s.impl.getDispatchState(ctx, destination)
10        s.reportStates[destination] = state
11      }
12    }
13
14    for _, group := range destination.InstanceGroups {
15      ......
16      for j, input := range group.Builders {
17        ......
18        var instance interface{}
19        //把日志绑定到 Template里
20        if instance, err = input.Builder(s.bag); err != nil{
21          ......
22          continue
23        }
24        ......
25        if s.variety == tpb.TEMPLATE_VARIETY_REPORT {
26          state.instances = append(state.instances, instance)
27          continue
28        }
29        ......
30      }
31    }
32  }
33  ......
34  return nil
35}

数据展示 —— 异步Flush

    Flush是让 logentryMetric 调用各自的 adapter 对数据进行处理,由于各自的 adapter没有依赖关系所以这里使用了golang的协程进行异步处理。

 1istio.io/istio/mixer/pkg/runtime/dispatcher/session.go #200
 2func (s *session) dispatchBufferedReports() {
 3    // Ensure that we can run dispatches to all destinations in parallel.
 4    s.ensureParallelism(len(s.reportStates))
 5
 6    // dispatch the buffered dispatchStates we've got
 7    for k, v := range s.reportStates {
 8        //在这里会把 v 放入协程进行处理
 9      s.dispatchToHandler(v)
10      delete(s.reportStates, k)
11    }
12    //等待所有adapter完成
13    s.waitForDispatched()
14}

协程池

    从上面看到 v 被放入协程进行处理,其实mixer在这里使用了协程池。使用协程池可以减少协程的创建和销毁,还可以控制服务中协程的多少,从而减少对系统的资源占用。mixer的协程池属于提前创建一定数量的协程,提供给业务使用,如果协程池处理不完业务的工作,需要阻塞等待。下面是mixer使用协程池的步骤。

  • 初始化协程池

    建立一个有长度的 channel,我们可以叫它队列。

 1istio.io/istio/mixer/pkg/pool/goroutine.go 
 2func NewGoroutinePool(queueDepth int, singleThreaded bool) *GoroutinePool {
 3  gp := &GoroutinePool{
 4    queue:          make(chan work, queueDepth),
 5    singleThreaded: singleThreaded,
 6  }
 7
 8  gp.AddWorkers(1)
 9  return gp
10}
  • 把任务放入队列

    把可执行的函数和参数当成一个任务放入队列

1func (gp *GoroutinePool) ScheduleWork(fn WorkFunc, param interface{}) {
2    if gp.singleThreaded {
3        fn(param)
4    } else {
5        gp.queue <- work{fn: fn, param: param}
6    }
7}
  • 让工人工作

    想要用多少工人可以按资源分配,工人不断从队列获取任务执行

 1func (gp *GoroutinePool) AddWorkers(numWorkers int) {
 2  if !gp.singleThreaded {
 3    gp.wg.Add(numWorkers)
 4    for i := 0; i < numWorkers; i++ {
 5      go func() {
 6        for work := range gp.queue {
 7          work.fn(work.param)
 8        }
 9        gp.wg.Done()
10      }()
11    }
12  }
13}

logentry 的 adapter 将数据打印到终端(stdio)

  • adapter 交互

    每个Template 都有自己的 DispatchReport,它负责和 adapter交互,并对日志进行展示。

 1istio.io/istio/mixer/template/template.gen.go #1311
 2logentry.TemplateName: {
 3    Name:  logentry.TemplateName,
 4    Impl:  "logentry",
 5    CtrCfg:   &logentry.InstanceParam{},
 6    Variety:  istio_adapter_model_v1beta1.TEMPLATE_VARIETY_REPORT,
 7    ......
 8    DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {
 9        ......
10        instances := make([]*logentry.Instance, len(inst))
11        for i, instance := range inst {
12          instances[i] = instance.(*logentry.Instance)
13        }
14
15        // Invoke the handler.
16        if err := handler.(logentry.Handler).HandleLogEntry(ctx, instances); err != nil {
17            return fmt.Errorf("failed to report all values: %v", err)
18        }
19        return nil
20    },
21}
  • 日志数据整理
 1istio.io/istio/mixer/adapter/stdio/stdio.go #53
 2func (h *handler) HandleLogEntry(_ context.Context, instances []*logentry.Instance) error {
 3    var errors *multierror.Error
 4
 5    fields := make([]zapcore.Field, 0, 6)
 6    for _, instance := range instances {
 7      ......
 8      for _, varName := range h.logEntryVars[instance.Name] {
 9          //过滤adapter不要的数据
10        if value, ok := instance.Variables[varName]; ok {
11            fields = append(fields, zap.Any(varName, value))
12        }
13      }
14      if err := h.write(entry, fields); err != nil {
15          errors = multierror.Append(errors, err)
16      }
17      fields = fields[:0]
18    }
19    return errors.ErrorOrNil()
20}

    每个adapter 都有自己想要的数据,这些数据可在启动文件 istio-demo.yaml 下配置。

 1apiVersion: "config.istio.io/v1alpha2"
 2    kind: logentry
 3    metadata:
 4      name: accesslog
 5      namespace: istio-system
 6    spec:
 7      severity: '"Info"'
 8      timestamp: request.time
 9      variables:
10        originIp: origin.ip | ip("0.0.0.0")
11        sourceIp: source.ip | ip("0.0.0.0")
12        sourceService: source.service | ""
13        ......
  • 展示结果

    下面日志从mixer终端截取

1{"level":"info","time":"2018-07-15T09:27:30.739801Z","instance":"accesslog.logentry.istio-system","apiClaims":"",
2"apiKey":"","apiName":"","apiVersion":"","connectionMtls":false,"destinationIp":"10.00.0.00",
3"destinationNamespace":"istio-system"......}

问题

通过分析这个接口源码我们发现了一些问题:

  1. 接口需要处理完所有 adapter才响应返回
  2. 如果协程池出现阻塞,接口需要一直等待

    基于以上二点我们联想到:如果协程池出现阻塞,这个接口响应相应会变慢,是否会影响到业务的请求?从国人翻译的一篇istio官方博客Mixer 和 SPOF 的迷思里知道,envoy数据上报是通过“fire-and-forget“模式异步完成。但由于没有C++基础,所以我不太明白这里面的“fire-and-forget“是如何实现。

    因为存在上面的疑问,所以我们进行了一次模拟测试。这次测试的假设条件:接口出现了阻塞,分别延迟了50ms,100ms,150ms,200ms,250ms,300ms【模拟阻塞时间】,在相同压力下,观察对业务请求是否有影响。

  • 环境: mac Air 下的 docker for k8s
  • 压测工具:hey
  • 压力:-c 50 -n 200【电脑配置不高】
  • 电脑配置 i5 4G
  • 压测命令:hey -c 50 -n 200 http://127.0.0.1:30935/sleep
  • 被压测的服务代码
  • mixer接口添加延迟代码:
1func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
2    time.Sleep(50 * time.Microsecond)
3    ......
4    return reportResp, nil
5}

注意

压测的每个数据结果都是经过预热后,压测10次并从中获取中位数得到。

结果:

image

    从上图我们可以看出随着延迟的增加,业务处理的QPS也在下降。这说明在当前0.8.0版本下,协程池处理任务不够快【进比出快】,出现了阻塞现象,会影响到业务的请求。当然我们可以通过横向扩展mixer或增加协程池里的工人数量来解决。但是我觉得主要的问题出在阻塞这步上。如果没有阻塞,就不会影响业务

Jaeger相互借鉴,避免阻塞

    这里日志数据处理场景和之前了解的Jaeger很像。Jaeger和mixer处理的都是日志数据,所以它们之间可以相互借鉴。Jaeger也有它自己的协程池,而且和mixer的协程池思想是一样的,虽然实现细节不一样。那如果遇到进比出快的情况Jaeger是如何处理的呢?具体的场景可以看这里

 1github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #76
 2func (q *BoundedQueue) Produce(item interface{}) bool {
 3    if atomic.LoadInt32(&q.stopped) != 0 {
 4        q.onDroppedItem(item)
 5        return false
 6    }
 7    select {
 8    case q.items <- item:
 9        atomic.AddInt32(&q.size, 1)
10        return true
11    default:
12        //丢掉数据
13        if q.onDroppedItem != nil {
14            q.onDroppedItem(item)
15        }
16        return false
17    }
18}

    上面是Jaeger的源码,这里和mixer 的 ScheduleWork 相对应,其中一个区别是如果Jaeger的队列items满了,还有数据进来,数据将会被丢掉,从而避免了阻塞。这个思路也可以用在mixer的日志处理上,牺牲一些日志数据,保证业务请求稳定。毕竟业务的位置是最重要的。

相关博客

Mixer 的适配器模型