声明

分享到:

声明

image

声明

  1. 这篇文章需要了解istio,k8s,golang,envoy基础知识
  2. 分析的环境为k8s,istio版本为0.8.0

pilot-discovery的作用

envoy提供一套通用的数据面接口,通过接口可以动态实现服务发现和配置。在istio中需要集成k8s,consul等服务发现系统,所以需要一个中介整理在k8s,consul服务注册和配置信息,并提供给envoy。

envoy v1 API 和 v2 API区别

v1版本API和v2版本API有一段历史,详情可看官网博客。在envoy开源之初,使用HTTP+轮询的方式实现动态服务发现和配置,但是这种方式存在以下缺点:

  1. 由于接口数据使用弱类型,导致实现一些通用服务比较困难。
  2. 控制面更喜欢使用推送的方式,来减少数据在更新时传输的时间。

随着和Google合作加强,官方使用GRPC + push开发了v2版本API,实现了v1版本的SDS/CDS/RDS/LDS接口,继续支持JSON/YAML数据格式,还增加了ADS(把SDS/CDS/RDS/LDS4个接口合在一下),HDS等接口。

建立基础缓存数据

其实pilot-discovery已经算是一个小型的非持久性key/value数据库了,它把istio的配置信息和服务注册信息都进行了缓存。这样可以使配置更快的生效。

缓存了什么数据

  • istio配置
 1istio.io/istio/pilot/pkg/model/config.go
 2var (
 3    ......
 4    // RouteRule describes route rules
 5    RouteRule = ProtoSchema{
 6      Type:        "route-rule",
 7      ......
 8    }
 9    // VirtualService describes v1alpha3 route rules
10    VirtualService = ProtoSchema{
11      Type:        "virtual-service",
12      ......
13    }
14    // Gateway describes a gateway (how a proxy is exposed on the network)
15    Gateway = ProtoSchema{
16      Type:        "gateway",
17      ......
18    }
19    // IngressRule describes ingress rules
20    IngressRule = ProtoSchema{
21      Type:        "ingress-rule",
22      ......
23    }
24)

做过新手任务的同学,应该都很熟悉上面的Type,就是配置信息里面的 kind,配置信息保存进k8s后,会被pilot-discovery通过api-server爬过来进行缓存。

1apiVersion: networking.istio.io/v1alpha3
2kind: VirtualService
3metadata:
4  name: reviews
5  ......
  • 从k8s获取的服务注册信息
 1> istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go #102
 2func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
 3    ......
 4  out.services = out.createInformer(&v1.Service{}, "Service", options.ResyncPeriod,
 5    func(opts meta_v1.ListOptions) (runtime.Object, error) {
 6      return client.CoreV1().Services(options.WatchedNamespace).List(opts)
 7    },
 8    func(opts meta_v1.ListOptions) (watch.Interface, error) {
 9      return client.CoreV1().Services(options.WatchedNamespace).Watch(opts)
10    })
11    ......
12  out.nodes = out.createInformer(&v1.Node{}, "Node", options.ResyncPeriod,
13    func(opts meta_v1.ListOptions) (runtime.Object, error) {
14      return client.CoreV1().Nodes().List(opts)
15    },
16    func(opts meta_v1.ListOptions) (watch.Interface, error) {
17      return client.CoreV1().Nodes().Watch(opts)
18    })
19    ......
20  return out
21}

还有其他数据不一一列出,从上面可以看出,建立缓存都是通过List和Watch方式进行(istio的配置数据也一样),List:第一次初始化数据,Watch:通过轮询的方式获取数据并缓存。

  • 转化成的请求地址
 1https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecs?limit=500&resourceVersion=0
 2https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/servicerolebindings?limit=500&resourceVersion=0
 3https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/virtualservices?limit=500&resourceVersion=0
 4https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecbindings?limit=500&resourceVersion=0
 5https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/serviceroles?limit=500&resourceVersion=0
 6https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/serviceentries?limit=500&resourceVersion=0
 7https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/routerules?limit=500&resourceVersion=0
 8https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/egressrules?limit=500&resourceVersion=0
 9https://{k8s.ip}:443/apis/authentication.istio.io/v1alpha1/policies?limit=500&resourceVersion=0
10https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecbindings?limit=500&resourceVersion=0
11https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/destinationrules?limit=500&resourceVersion=0
12https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecs?limit=500&resourceVersion=0
13https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/gateways?limit=500&resourceVersion=0
14https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/destinationpolicies?limit=500&resourceVersion=0
15
16https://{k8s.ip}:443/api/v1/nodes?limit=500&resourceVersion=0
17https://{k8s.ip}:443/api/v1/namespaces/istio-system/configmaps/istio-ingress-controller-leader-istio
18https://{k8s.ip}:443/api/v1/services?limit=500&resourceVersion=0
19https://{k8s.ip}:443/api/v1/endpoints?limit=500&resourceVersion=0
20https://{k8s.ip}:443/api/v1/pods?limit=500&resourceVersion=0

key的生成

在pilot-discovery中把缓存数据分了二大类,一类istio配置信息,另一类服务注册信息。这二类又进行了细分,分别为virtualservices,routerules,nodes,pods等,最后再以k8s空间/应用名作为下标缓存数据。

 1k8s.io/client-go/tools/cache/store.go #76
 2func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
 3  if key, ok := obj.(ExplicitKey); ok {
 4    return string(key), nil
 5  }
 6  meta, err := meta.Accessor(obj)
 7  if err != nil {
 8    return "", fmt.Errorf("object has no meta: %v", err)
 9  }
10  if len(meta.GetNamespace()) > 0 {
11    return meta.GetNamespace() + "/" + meta.GetName(), nil
12  }
13  return meta.GetName(), nil
14}
15
16default/sleep
17kube-system/grafana
18istio-system/servicegraph

存储数据

  • List 和 Watch

上面提到建立缓存都是通过List和Watch方式进行,来看看它的实现。

 1k8s.io/client-go/tools/cache/reflector.go #239
 2func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 3  ......
 4  list, err := r.listerWatcher.List(options)
 5  ......
 6  resourceVersion = listMetaInterface.GetResourceVersion()
 7  items, err := meta.ExtractList(list)
 8  ......
 9  //缓存数据
10  if err := r.syncWith(items, resourceVersion); err != nil {
11    return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
12  }
13  ......
14  for {
15    ......
16    w, err := r.listerWatcher.Watch(options)
17    ......
18    if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
19      ......
20      return nil
21    }
22  }
23}
  • 如何更新缓存

List可以看做第一次初始化数据,Watch更像是监听数据的变化状态:添加,修改和删除。针对这些状态对缓存的数据做增、删、改。

 1k8s.io/client-go/tools/cache/reflector.go #358
 2func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
 3......
 4loop:
 5  for {
 6    select {
 7    case <-stopCh:
 8      return errorStopRequested
 9    case err := <-errc:
10      return err
11    case event, ok := <-w.ResultChan():
12      ......
13      switch event.Type {
14      case watch.Added:
15        err := r.store.Add(event.Object)
16        ......
17      case watch.Modified:
18        err := r.store.Update(event.Object)
19        ......
20      case watch.Deleted:
21      ......
22        err := r.store.Delete(event.Object)
23        ......
24      default:
25        utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
26      }
27      ......
28    }
29  }
30  ......
31  return nil
32}
  • 限速访问 --令牌桶算法

刚刚看到监听数据的变化是通过for{} 不断请求k8s的api-server接口,如果不加限制,那就成了DDOS攻击了,所以pilot-discovery使用了流量控制

1k8s.io/client-go/rest/config.go
2const (
3  DefaultQPS   float32 = 5.0
4  DefaultBurst int     = 10
5)

这样理解这个配置吧,如果1秒内访问次数大于10,那么在接下来的访问中一秒最多只能访问5次。

 1k8s.io/client-go/rest/request.go #616
 2func (r *Request) request(fn func(*http.Request, *http.Response)) error {
 3  ......
 4  retries := 0
 5  for {
 6    ......
 7    if retries > 0 {
 8      ......
 9      //使用令牌桶算法
10      r.tryThrottle()
11    }
12    resp, err := client.Do(req)
13    ......
14    done := func() bool {
15      ......
16      retries++
17      ......
18  }
19}
  • 协程安全map

在golang中使用内存key/value缓存非常简单,定义变量 map[string]interface{},再往里面放入数据就可以了。但是map结构为非协程安全,所以像pilot-discovery这种小型数据库,同时存在读和写,如果不加上锁,很容易出现争抢共享资源问题。所以需要加锁:thread_safe_store.go

 1type ThreadSafeStore interface {
 2  Add(key string, obj interface{})
 3  Update(key string, obj interface{})
 4  Delete(key string)
 5  Get(key string) (item interface{}, exists bool)
 6  List() []interface{}
 7  ListKeys() []string
 8  Replace(map[string]interface{}, string)
 9  Index(indexName string, obj interface{}) ([]interface{}, error)
10  IndexKeys(indexName, indexKey string) ([]string, error)
11  ListIndexFuncValues(name string) []string
12  ByIndex(indexName, indexKey string) ([]interface{}, error)
13  GetIndexers() Indexers
14
15  // AddIndexers adds more indexers to this store.  If you call this after you already have data
16  // in the store, the results are undefined.
17  AddIndexers(newIndexers Indexers) error
18  Resync() error
19}

提供接口

不管是v1 API还是v2 API,都是基于基础缓存的数据,按照envoy的接口文档,把数据拼接成envoy想要的数据。

暴露v1 API RESTFUL

  • 暴露的接口

pilot-discovery暴露了SDS/CDS/RDS/LDS接口,envoy再使用轮询的方式,通过这些接口获取配置信息

 1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #376
 2func (ds *DiscoveryService) Register(container *restful.Container) {
 3  ws := &restful.WebService{}
 4  ws.Produces(restful.MIME_JSON)
 5  ......
 6  ws.Route(ws.
 7    GET(fmt.Sprintf("/v1/registration/{%s}", ServiceKey)).
 8    To(ds.ListEndpoints).
 9    Doc("SDS registration").
10    Param(ws.PathParameter(ServiceKey, "tuple of service name and tag name").DataType("string")))
11  ......
12  ws.Route(ws.
13    GET(fmt.Sprintf("/v1/clusters/{%s}/{%s}", ServiceCluster, ServiceNode)).
14    To(ds.ListClusters).
15    Doc("CDS registration").
16    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
17    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
18  ......
19  ws.Route(ws.
20    GET(fmt.Sprintf("/v1/routes/{%s}/{%s}/{%s}", RouteConfigName, ServiceCluster, ServiceNode)).
21    To(ds.ListRoutes).
22    Doc("RDS registration").
23    Param(ws.PathParameter(RouteConfigName, "route configuration name").DataType("string")).
24    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
25    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
26  ......
27  ws.Route(ws.
28    GET(fmt.Sprintf("/v1/listeners/{%s}/{%s}", ServiceCluster, ServiceNode)).
29    To(ds.ListListeners).
30    Doc("LDS registration").
31    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
32    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
33  ......
34  container.Add(ws)
35}
  • 建立二级缓存

这里的缓存可以这样理解:我们平常开发中,从数据库获取数据,经过逻辑处理,再把最终结果进行缓存,返回给客户端,下次进来,就从缓存获取数据。同理v1 API的接口从基础缓存获取了数据后,把这些数据拼接成envoy需要的格式数据,再把这些数据缓存,返回给envoy。

  1. ListEndpoints(EDS)

其他几个接口方式一样,不一一列出

 1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #567
 2> func (ds *DiscoveryService) ListEndpoints(request *restful.Request, response *restful.Response) {
 3  ......
 4  key := request.Request.URL.String()
 5  out, resourceCount, cached := ds.sdsCache.cachedDiscoveryResponse(key)
 6  //没有缓存
 7  if !cached {
 8    /**
 9    逻辑处理
10    **/
11    ......
12    resourceCount = uint32(len(endpoints))
13    if resourceCount > 0 {
14      //缓存数据
15      ds.sdsCache.updateCachedDiscoveryResponse(key, resourceCount, out)
16    }
17  }
18  observeResources(methodName, resourceCount)
19  writeResponse(response, out)
20}

暴露v2 API GRPC

我也是刚刚接触GRPC的双向流,我对它的理解是:一个长连接,客户端和服务端可以相互交互。在这里的用法是,客户端envoy打开一个GRPC连接,初始时pilot-discovery把数据响应给envoy,接下来,如果有数据变动,pilot-discovery通过GRPC把数据推给envoy。

  • ADS聚合接口

聚合接口就是把SDS/CDS/RDS/LDS的配置数据都放在一个接口上。实现有点长,缩减只剩一个接口,但方式是一样的。

 1istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go #237
 2func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
 3  ......
 4  var receiveError error
 5  reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
 6  go receiveThread(con, reqChannel, &receiveError)
 7
 8  for {
 9    // Block until either a request is received or the ticker ticks
10    select {
11    case discReq, ok = <-reqChannel:
12      ......
13      switch discReq.TypeUrl {
14      case ClusterType:
15      ......
16      case ListenerType:
17      ......
18      case RouteType:
19      ......
20      case EndpointType:
21      ......
22        //推送数据
23        err := s.pushEds(con)
24        if err != nil {
25          return err
26        }
27        ......
28      }
29    ......
30    //通过监听事件触发推送数据
31    case <-con.pushChannel:
32      ......
33      if len(con.Clusters) > 0 {
34        err := s.pushEds(con)
35        if err != nil {
36          return err
37        }
38      }
39      ......
40    }
41  }
42}

清二级缓存和触发推送

  • 主动触发

清除二级缓存和触发推送在这里其实都是同一个触发点:就是数据变动的时候。数据的变动应该是无序的,但是在更新配置的时候应该井然有序的进行。所以这里使用了任务队列,让事件一件一件接着做。

  1. 初始化List和Watch,注册Add,Update,Delete事件。
 1istio.io/istio/pilot/pkg/config/kube/crd/controller.go #133
 2func (c *controller) createInformer(
 3  o runtime.Object,
 4  otype string,
 5  resyncPeriod time.Duration,
 6  lf cache.ListFunc,
 7  wf cache.WatchFunc) cacheHandler {
 8  ......
 9  informer.AddEventHandler(
10    cache.ResourceEventHandlerFuncs{
11      AddFunc: func(obj interface{}) {
12        ......
13        c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd))
14      },
15      ......
16    })
17  return cacheHandler{informer: informer, handler: handler}
18}

当事件被触发都会执行handler.Apply,再执行注册的方法。

1istio.io/istio/pilot/pkg/serviceregistry/kube/queue.go #142
2func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
3  for _, f := range ch.funcs {
4    if err := f(obj, event); err != nil {
5      return err
6    }
7  }
8  return nil
9}
  1. 注册方法
 1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #328
 2func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,
 3  environment model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {
 4  ......
 5  serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
 6  if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
 7    return nil, err
 8  }
 9  instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
10  if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
11    return nil, err
12  }
13
14  if configCache != nil {
15    ......
16    configHandler := func(model.Config, model.Event) { out.clearCache() }
17    for _, descriptor := range model.IstioConfigTypes {
18      configCache.RegisterEventHandler(descriptor.Type, configHandler)
19    }
20  }
21
22  return out, nil
23}

方法 out.clearCache(),实现了清二级缓存和推送数据

 1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #480
 2func (ds *DiscoveryService) clearCache() {
 3  ......
 4  //清二级缓存
 5  ds.sdsCache.clear()
 6  ds.cdsCache.clear()
 7  ds.rdsCache.clear()
 8  ds.ldsCache.clear()
 9  if V2ClearCache != nil {
10    //把数据推送到envoy
11    V2ClearCache()
12  }
13}
  • 手动触发

在pilot-discovery开放了一个清二级缓存的接口。

 1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #436
 2func (ds *DiscoveryService) Register(container *restful.Container) {
 3  ws := &restful.WebService{}
 4  ws.Produces(restful.MIME_JSON)
 5  ......
 6  ws.Route(ws.
 7    POST("/cache_stats_delete").
 8    To(ds.ClearCacheStats).
 9    Doc("Clear discovery service cache stats"))
10  container.Add(ws)
11}

小知识

万恶的panic

在开发使用golang的过程中或多或少都接触过panic。例如引入了一些喜欢用panic的第三包,断言错误等触发了panic,导致整个服务都挂掉。为了避免这些问题,我们一般都是使用recover来接收panic,但一直觉得自己的处理方式不是很好。所以这次源码分析特意看了k8s的go客户端是如何处理panic问题,毕竟是Google出品。

 1k8s.io/apimachinery/pkg/util/runtime/runtime.go #47
 2func HandleCrash(additionalHandlers ...func(interface{})) {
 3  if r := recover(); r != nil {
 4    //默认会打印 出现panic问题的文件和行数
 5    for _, fn := range PanicHandlers {
 6      fn(r)
 7    }
 8    //留给使用方,出现了panic你还想如何处理
 9    for _, fn := range additionalHandlers {
10      fn(r)
11    }
12    //如果你确认,可以直接panic
13    if ReallyCrash {
14      // Actually proceed to panic.
15      panic(r)
16    }
17  }
18}

从上面看出k8s客户端的处理方式和我们的想法一样,不过它的封装更友好。在k8s的go客户端中HandleCrash,更喜欢和for{}一起使用。

 1k8s.io/apimachinery/pkg/watch/streamwatcher.go #88
 2func (sw *StreamWatcher) receive() {
 3  ......
 4  defer utilruntime.HandleCrash()
 5  for {
 6    ......
 7  }
 8}
 9
10k8s.io/client-go/tools/record/event.go #224
11func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
12  ......
13  go func() {
14    defer utilruntime.HandleCrash()
15    for {
16      ......
17    }
18  }()
19  return watcher
20}

结语

这次的源码分析中,不单单了解了pilot-discovery的设计实现,还通过k8s的go客户端学习到了延迟队列,流量控制,协程安全数据库等相关的实现和应用场景,收获不少。