声明
声明
声明
- 这篇文章需要了解istio,k8s,golang,envoy基础知识
- 分析的环境为k8s,istio版本为0.8.0
pilot-discovery的作用
envoy提供一套通用的数据面接口,通过接口可以动态实现服务发现和配置。在istio中需要集成k8s,consul等服务发现系统,所以需要一个中介整理在k8s,consul服务注册和配置信息,并提供给envoy。
envoy v1 API 和 v2 API区别
v1版本API和v2版本API有一段历史,详情可看官网博客。在envoy开源之初,使用HTTP+轮询的方式实现动态服务发现和配置,但是这种方式存在以下缺点:
- 由于接口数据使用弱类型,导致实现一些通用服务比较困难。
- 控制面更喜欢使用推送的方式,来减少数据在更新时传输的时间。
随着和Google合作加强,官方使用GRPC + push开发了v2版本API,实现了v1版本的SDS/CDS/RDS/LDS接口,继续支持
JSON/YAML
数据格式,还增加了ADS(把SDS/CDS/RDS/LDS4个接口合在一下),HDS等接口。
建立基础缓存数据
其实pilot-discovery已经算是一个小型的非持久性key/value数据库了,它把istio的配置信息和服务注册信息都进行了缓存。这样可以使配置更快的生效。
缓存了什么数据
- istio配置
1istio.io/istio/pilot/pkg/model/config.go
2var (
3 ......
4 // RouteRule describes route rules
5 RouteRule = ProtoSchema{
6 Type: "route-rule",
7 ......
8 }
9 // VirtualService describes v1alpha3 route rules
10 VirtualService = ProtoSchema{
11 Type: "virtual-service",
12 ......
13 }
14 // Gateway describes a gateway (how a proxy is exposed on the network)
15 Gateway = ProtoSchema{
16 Type: "gateway",
17 ......
18 }
19 // IngressRule describes ingress rules
20 IngressRule = ProtoSchema{
21 Type: "ingress-rule",
22 ......
23 }
24)
做过新手任务的同学,应该都很熟悉上面的
Type
,就是配置信息里面的kind
,配置信息保存进k8s后,会被pilot-discovery通过api-server爬过来进行缓存。
1apiVersion: networking.istio.io/v1alpha3
2kind: VirtualService
3metadata:
4 name: reviews
5 ......
- 从k8s获取的服务注册信息
1> istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go #102
2func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
3 ......
4 out.services = out.createInformer(&v1.Service{}, "Service", options.ResyncPeriod,
5 func(opts meta_v1.ListOptions) (runtime.Object, error) {
6 return client.CoreV1().Services(options.WatchedNamespace).List(opts)
7 },
8 func(opts meta_v1.ListOptions) (watch.Interface, error) {
9 return client.CoreV1().Services(options.WatchedNamespace).Watch(opts)
10 })
11 ......
12 out.nodes = out.createInformer(&v1.Node{}, "Node", options.ResyncPeriod,
13 func(opts meta_v1.ListOptions) (runtime.Object, error) {
14 return client.CoreV1().Nodes().List(opts)
15 },
16 func(opts meta_v1.ListOptions) (watch.Interface, error) {
17 return client.CoreV1().Nodes().Watch(opts)
18 })
19 ......
20 return out
21}
还有其他数据不一一列出,从上面可以看出,建立缓存都是通过List和Watch方式进行(istio的配置数据也一样),List:第一次初始化数据,Watch:通过轮询的方式获取数据并缓存。
- 转化成的请求地址
1https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecs?limit=500&resourceVersion=0
2https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/servicerolebindings?limit=500&resourceVersion=0
3https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/virtualservices?limit=500&resourceVersion=0
4https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecbindings?limit=500&resourceVersion=0
5https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/serviceroles?limit=500&resourceVersion=0
6https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/serviceentries?limit=500&resourceVersion=0
7https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/routerules?limit=500&resourceVersion=0
8https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/egressrules?limit=500&resourceVersion=0
9https://{k8s.ip}:443/apis/authentication.istio.io/v1alpha1/policies?limit=500&resourceVersion=0
10https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecbindings?limit=500&resourceVersion=0
11https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/destinationrules?limit=500&resourceVersion=0
12https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecs?limit=500&resourceVersion=0
13https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/gateways?limit=500&resourceVersion=0
14https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/destinationpolicies?limit=500&resourceVersion=0
15
16https://{k8s.ip}:443/api/v1/nodes?limit=500&resourceVersion=0
17https://{k8s.ip}:443/api/v1/namespaces/istio-system/configmaps/istio-ingress-controller-leader-istio
18https://{k8s.ip}:443/api/v1/services?limit=500&resourceVersion=0
19https://{k8s.ip}:443/api/v1/endpoints?limit=500&resourceVersion=0
20https://{k8s.ip}:443/api/v1/pods?limit=500&resourceVersion=0
key的生成
在pilot-discovery中把缓存数据分了二大类,一类istio配置信息,另一类服务注册信息。这二类又进行了细分,分别为virtualservices,routerules,nodes,pods等,最后再以
k8s空间/应用名
作为下标缓存数据。
1k8s.io/client-go/tools/cache/store.go #76
2func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
3 if key, ok := obj.(ExplicitKey); ok {
4 return string(key), nil
5 }
6 meta, err := meta.Accessor(obj)
7 if err != nil {
8 return "", fmt.Errorf("object has no meta: %v", err)
9 }
10 if len(meta.GetNamespace()) > 0 {
11 return meta.GetNamespace() + "/" + meta.GetName(), nil
12 }
13 return meta.GetName(), nil
14}
15
16default/sleep
17kube-system/grafana
18istio-system/servicegraph
存储数据
- List 和 Watch
上面提到建立缓存都是通过List和Watch方式进行,来看看它的实现。
1k8s.io/client-go/tools/cache/reflector.go #239
2func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
3 ......
4 list, err := r.listerWatcher.List(options)
5 ......
6 resourceVersion = listMetaInterface.GetResourceVersion()
7 items, err := meta.ExtractList(list)
8 ......
9 //缓存数据
10 if err := r.syncWith(items, resourceVersion); err != nil {
11 return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
12 }
13 ......
14 for {
15 ......
16 w, err := r.listerWatcher.Watch(options)
17 ......
18 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
19 ......
20 return nil
21 }
22 }
23}
- 如何更新缓存
List可以看做第一次初始化数据,Watch更像是监听数据的变化状态:添加,修改和删除。针对这些状态对缓存的数据做增、删、改。
1k8s.io/client-go/tools/cache/reflector.go #358
2func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
3......
4loop:
5 for {
6 select {
7 case <-stopCh:
8 return errorStopRequested
9 case err := <-errc:
10 return err
11 case event, ok := <-w.ResultChan():
12 ......
13 switch event.Type {
14 case watch.Added:
15 err := r.store.Add(event.Object)
16 ......
17 case watch.Modified:
18 err := r.store.Update(event.Object)
19 ......
20 case watch.Deleted:
21 ......
22 err := r.store.Delete(event.Object)
23 ......
24 default:
25 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
26 }
27 ......
28 }
29 }
30 ......
31 return nil
32}
- 限速访问 --令牌桶算法
刚刚看到监听数据的变化是通过for{} 不断请求k8s的api-server接口,如果不加限制,那就成了DDOS攻击了,所以pilot-discovery使用了流量控制。
1k8s.io/client-go/rest/config.go
2const (
3 DefaultQPS float32 = 5.0
4 DefaultBurst int = 10
5)
这样理解这个配置吧,如果1秒内访问次数大于10,那么在接下来的访问中一秒最多只能访问5次。
1k8s.io/client-go/rest/request.go #616
2func (r *Request) request(fn func(*http.Request, *http.Response)) error {
3 ......
4 retries := 0
5 for {
6 ......
7 if retries > 0 {
8 ......
9 //使用令牌桶算法
10 r.tryThrottle()
11 }
12 resp, err := client.Do(req)
13 ......
14 done := func() bool {
15 ......
16 retries++
17 ......
18 }
19}
- 协程安全map
在golang中使用内存key/value缓存非常简单,定义变量
map[string]interface{}
,再往里面放入数据就可以了。但是map结构为非协程安全,所以像pilot-discovery这种小型数据库,同时存在读和写,如果不加上锁,很容易出现争抢共享资源问题。所以需要加锁:thread_safe_store.go
1type ThreadSafeStore interface {
2 Add(key string, obj interface{})
3 Update(key string, obj interface{})
4 Delete(key string)
5 Get(key string) (item interface{}, exists bool)
6 List() []interface{}
7 ListKeys() []string
8 Replace(map[string]interface{}, string)
9 Index(indexName string, obj interface{}) ([]interface{}, error)
10 IndexKeys(indexName, indexKey string) ([]string, error)
11 ListIndexFuncValues(name string) []string
12 ByIndex(indexName, indexKey string) ([]interface{}, error)
13 GetIndexers() Indexers
14
15 // AddIndexers adds more indexers to this store. If you call this after you already have data
16 // in the store, the results are undefined.
17 AddIndexers(newIndexers Indexers) error
18 Resync() error
19}
提供接口
不管是v1 API还是v2 API,都是基于基础缓存的数据,按照envoy的接口文档,把数据拼接成envoy想要的数据。
暴露v1 API RESTFUL
- 暴露的接口
pilot-discovery暴露了
SDS/CDS/RDS/LDS
接口,envoy再使用轮询的方式,通过这些接口获取配置信息
1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #376
2func (ds *DiscoveryService) Register(container *restful.Container) {
3 ws := &restful.WebService{}
4 ws.Produces(restful.MIME_JSON)
5 ......
6 ws.Route(ws.
7 GET(fmt.Sprintf("/v1/registration/{%s}", ServiceKey)).
8 To(ds.ListEndpoints).
9 Doc("SDS registration").
10 Param(ws.PathParameter(ServiceKey, "tuple of service name and tag name").DataType("string")))
11 ......
12 ws.Route(ws.
13 GET(fmt.Sprintf("/v1/clusters/{%s}/{%s}", ServiceCluster, ServiceNode)).
14 To(ds.ListClusters).
15 Doc("CDS registration").
16 Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
17 Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
18 ......
19 ws.Route(ws.
20 GET(fmt.Sprintf("/v1/routes/{%s}/{%s}/{%s}", RouteConfigName, ServiceCluster, ServiceNode)).
21 To(ds.ListRoutes).
22 Doc("RDS registration").
23 Param(ws.PathParameter(RouteConfigName, "route configuration name").DataType("string")).
24 Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
25 Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
26 ......
27 ws.Route(ws.
28 GET(fmt.Sprintf("/v1/listeners/{%s}/{%s}", ServiceCluster, ServiceNode)).
29 To(ds.ListListeners).
30 Doc("LDS registration").
31 Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
32 Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
33 ......
34 container.Add(ws)
35}
- 建立二级缓存
这里的缓存可以这样理解:我们平常开发中,从数据库获取数据,经过逻辑处理,再把最终结果进行缓存,返回给客户端,下次进来,就从缓存获取数据。同理v1 API的接口从基础缓存获取了数据后,把这些数据拼接成envoy需要的格式数据,再把这些数据缓存,返回给envoy。
- ListEndpoints(EDS)
其他几个接口方式一样,不一一列出
1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #567
2> func (ds *DiscoveryService) ListEndpoints(request *restful.Request, response *restful.Response) {
3 ......
4 key := request.Request.URL.String()
5 out, resourceCount, cached := ds.sdsCache.cachedDiscoveryResponse(key)
6 //没有缓存
7 if !cached {
8 /**
9 逻辑处理
10 **/
11 ......
12 resourceCount = uint32(len(endpoints))
13 if resourceCount > 0 {
14 //缓存数据
15 ds.sdsCache.updateCachedDiscoveryResponse(key, resourceCount, out)
16 }
17 }
18 observeResources(methodName, resourceCount)
19 writeResponse(response, out)
20}
暴露v2 API GRPC
我也是刚刚接触GRPC的双向流,我对它的理解是:一个长连接,客户端和服务端可以相互交互。在这里的用法是,客户端envoy打开一个GRPC连接,初始时pilot-discovery把数据响应给envoy,接下来,如果有数据变动,pilot-discovery通过GRPC把数据推给envoy。
- ADS聚合接口
聚合接口就是把
SDS/CDS/RDS/LDS
的配置数据都放在一个接口上。实现有点长,缩减只剩一个接口,但方式是一样的。
1istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go #237
2func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
3 ......
4 var receiveError error
5 reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
6 go receiveThread(con, reqChannel, &receiveError)
7
8 for {
9 // Block until either a request is received or the ticker ticks
10 select {
11 case discReq, ok = <-reqChannel:
12 ......
13 switch discReq.TypeUrl {
14 case ClusterType:
15 ......
16 case ListenerType:
17 ......
18 case RouteType:
19 ......
20 case EndpointType:
21 ......
22 //推送数据
23 err := s.pushEds(con)
24 if err != nil {
25 return err
26 }
27 ......
28 }
29 ......
30 //通过监听事件触发推送数据
31 case <-con.pushChannel:
32 ......
33 if len(con.Clusters) > 0 {
34 err := s.pushEds(con)
35 if err != nil {
36 return err
37 }
38 }
39 ......
40 }
41 }
42}
清二级缓存和触发推送
- 主动触发
清除二级缓存和触发推送在这里其实都是同一个触发点:就是数据变动的时候。数据的变动应该是无序的,但是在更新配置的时候应该井然有序的进行。所以这里使用了任务队列,让事件一件一件接着做。
- 初始化List和Watch,注册Add,Update,Delete事件。
1istio.io/istio/pilot/pkg/config/kube/crd/controller.go #133
2func (c *controller) createInformer(
3 o runtime.Object,
4 otype string,
5 resyncPeriod time.Duration,
6 lf cache.ListFunc,
7 wf cache.WatchFunc) cacheHandler {
8 ......
9 informer.AddEventHandler(
10 cache.ResourceEventHandlerFuncs{
11 AddFunc: func(obj interface{}) {
12 ......
13 c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd))
14 },
15 ......
16 })
17 return cacheHandler{informer: informer, handler: handler}
18}
当事件被触发都会执行
handler.Apply
,再执行注册的方法。
1istio.io/istio/pilot/pkg/serviceregistry/kube/queue.go #142
2func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
3 for _, f := range ch.funcs {
4 if err := f(obj, event); err != nil {
5 return err
6 }
7 }
8 return nil
9}
- 注册方法
1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #328
2func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,
3 environment model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {
4 ......
5 serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
6 if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
7 return nil, err
8 }
9 instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
10 if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
11 return nil, err
12 }
13
14 if configCache != nil {
15 ......
16 configHandler := func(model.Config, model.Event) { out.clearCache() }
17 for _, descriptor := range model.IstioConfigTypes {
18 configCache.RegisterEventHandler(descriptor.Type, configHandler)
19 }
20 }
21
22 return out, nil
23}
方法
out.clearCache()
,实现了清二级缓存和推送数据
1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #480
2func (ds *DiscoveryService) clearCache() {
3 ......
4 //清二级缓存
5 ds.sdsCache.clear()
6 ds.cdsCache.clear()
7 ds.rdsCache.clear()
8 ds.ldsCache.clear()
9 if V2ClearCache != nil {
10 //把数据推送到envoy
11 V2ClearCache()
12 }
13}
- 手动触发
在pilot-discovery开放了一个清二级缓存的接口。
1istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #436
2func (ds *DiscoveryService) Register(container *restful.Container) {
3 ws := &restful.WebService{}
4 ws.Produces(restful.MIME_JSON)
5 ......
6 ws.Route(ws.
7 POST("/cache_stats_delete").
8 To(ds.ClearCacheStats).
9 Doc("Clear discovery service cache stats"))
10 container.Add(ws)
11}
小知识
万恶的panic
在开发使用golang的过程中或多或少都接触过panic。例如引入了一些喜欢用panic的第三包,断言错误等触发了panic,导致整个服务都挂掉。为了避免这些问题,我们一般都是使用
recover
来接收panic,但一直觉得自己的处理方式不是很好。所以这次源码分析特意看了k8s的go客户端是如何处理panic问题,毕竟是Google出品。
1k8s.io/apimachinery/pkg/util/runtime/runtime.go #47
2func HandleCrash(additionalHandlers ...func(interface{})) {
3 if r := recover(); r != nil {
4 //默认会打印 出现panic问题的文件和行数
5 for _, fn := range PanicHandlers {
6 fn(r)
7 }
8 //留给使用方,出现了panic你还想如何处理
9 for _, fn := range additionalHandlers {
10 fn(r)
11 }
12 //如果你确认,可以直接panic
13 if ReallyCrash {
14 // Actually proceed to panic.
15 panic(r)
16 }
17 }
18}
从上面看出k8s客户端的处理方式和我们的想法一样,不过它的封装更友好。在k8s的go客户端中
HandleCrash
,更喜欢和for{}一起使用。
1k8s.io/apimachinery/pkg/watch/streamwatcher.go #88
2func (sw *StreamWatcher) receive() {
3 ......
4 defer utilruntime.HandleCrash()
5 for {
6 ......
7 }
8}
9
10k8s.io/client-go/tools/record/event.go #224
11func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
12 ......
13 go func() {
14 defer utilruntime.HandleCrash()
15 for {
16 ......
17 }
18 }()
19 return watcher
20}
结语
这次的源码分析中,不单单了解了pilot-discovery的设计实现,还通过k8s的go客户端学习到了延迟队列,流量控制,协程安全数据库等相关的实现和应用场景,收获不少。