client-go 简介
client-go 简介
[TOC]
client-go 简介
本次使用的版本为:
kubernetes-1.17.0/client-go v0.17.0(2019.12.08)
不要直接go get k8s.io/client-go
(以前发布过v12.0.0的版本)
使用go get k8s.io/client-go@v0.17.0
Kubernetes官方从2016年8月份开始,将Kubernetes资源操作相关的核心源码抽取出来,独立出来一个项目Client-go,作为官方提供的Go client。Kubernetes的部分代码也是基于这个client实现的,所以对这个client的质量、性能等方面还是非常有信心的。
client-go是一个调用kubernetes集群资源对象API的客户端,即通过client-go实现对kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查等操作。大部分对kubernetes进行前置API封装的二次开发都通过client-go这个第三方包来实现。
源码简介
主要package
kubernetes: 访问 Kubernetes API的一系列的clientset
discovery:通过Kubernetes API 进行服务发现
dynamic:对任意Kubernetes对象执行通用操作的动态client
transport:启动连接和鉴权auth
tools/cache:controllers控制器
Client结构
RESTClient
RESTClient是最基础的,相当于的底层基础结构,可以直接通过 是RESTClient提供的RESTful方法如Get(),Put(),Post(),Delete()进行交互
- 同时支持Json 和 protobuf
- 支持所有原生资源和CRDs
- 但是,一般而言,为了更为优雅的处理,需要进一步封装,通过Clientset封装RESTClient,然后再对外提供接口和服务
RESTClient 封装了指定资源URL的通用Kubernetes API的访问姿势
Clientset
Clientset是调用Kubernetes资源对象最常用的client,可以操作所有的资源对象,包含RESTClient。需要指定Group、指定Version,然后根据Resource获取
- 优雅的姿势是利用一个controller对象,再加上Informer
Clientset 是一系列的clients的group组合,注意每个group在一个Clientset中只包含一个版本。
Clientset包含了appsV1、coreV1,这中间包含了RESTClient,因此Clientset是基于RESTClient的。
DynamicClient
Dynamic client 是一种动态的 client,它能处理 kubernetes 所有的资源。不同于 clientset,dynamic client 返回的对象是一个 map[string]interface{},如果一个 controller 中需要控制所有的 API,可以使用dynamic client,目前它在 garbage collector 和 namespace controller中被使用。
- 只支持JSON
dynamic client针对的是所有资源,但是只支持Json;
1# dynamic/interface.go
2
3type ResourceInterface interface {
4 Create(obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
5 Update(obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
6 UpdateStatus(obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
7 Delete(name string, options *metav1.DeleteOptions, subresources ...string) error
8 DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
9 Get(name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
10 List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
11 Watch(opts metav1.ListOptions) (watch.Interface, error)
12 Patch(name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
13}
对象资源的操作接口
默认的每一种资源对象都有一个interface,封装了对象的CURD方法和list/watch方法
如 Deployment(kubernetes/typed/apps/v1/deployment.go):
1// DeploymentInterface has methods to work with Deployment resources.
2type DeploymentInterface interface {
3 Create(*v1.Deployment) (*v1.Deployment, error)
4 Update(*v1.Deployment) (*v1.Deployment, error)
5 UpdateStatus(*v1.Deployment) (*v1.Deployment, error)
6 Delete(name string, options *metav1.DeleteOptions) error
7 DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
8 Get(name string, options metav1.GetOptions) (*v1.Deployment, error)
9 List(opts metav1.ListOptions) (*v1.DeploymentList, error)
10 Watch(opts metav1.ListOptions) (watch.Interface, error)
11 Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Deployment, err error)
12 GetScale(deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
13 UpdateScale(deploymentName string, scale *autoscalingv1.Scale) (*autoscalingv1.Scale, error)
14
15 DeploymentExpansion
16}
在Kubernetes中,所有对象资源的操作方式都是统一的,有个interface当做虚基类,包含资源的所有操作方法,然后各个子类继承然后实现它们,子类中的实现定义会针对不同的资源有不同诠释
client-go架构图
- List/Watch:List是列举apiserver中对象的接口,Watch是监控apiserver资源变化的接口;
- Reflector:我习惯成称之为反射器,实现对apiserver指定类型对象的监控,其中反射实现的就是把监控的结果实例化成具体的对象;
- DeltaIFIFO:将Reflector监控的变化的对象形成一个FIFO队列,此处的Delta就是变化,DeltaFIFO我们已经有文章详细介绍了;
- LocalStore:指的就是Indexer的实现cache,这里面缓存的就是apiserver中的对象(其中有一部分可能还在DeltaFIFO中),此时使用者再查询对象的时候就直接从cache中查找,减少了apiserver的压力;
- Callbacks:通知回调函数,Infomer感知的所有对象变化都是通过回调函数通知使用者(Listener);
自己结合源码的理解如下:
client-go components
Reflector(反射器)
在包缓存内的类型reflector中定义的Reflector监视指定资源类型(kind)的Kubernetes API。执行此操作的函数是ListAndWatch。监视可以是内置资源,也可以是自定义资源。当反射器通过监视API接收关于新资源实例的存在的通知时,它使用相应的列表API获取新创建的对象并将其放入WETCHANDER函数内的Delta FIFO队列中。
Reflector 源码
reflector使用listerWatcher获取资源,并将其保存在store中,此处的store就是DeltaFIFO,Reflector核心处理函数为ListAndWatch(client-go/tools/cache/reflector.go)
1// client-go/tools/cache/reflector.go
2type Reflector struct {
3 // name identifies this reflector. By default it will be a file:line if possible.
4 name string
5 // metrics tracks basic metric information about the reflector
6 metrics *reflectorMetrics// 但凡遇到metrics多半是用于做监控的,可以忽略
7
8 // The type of object we expect to place in the store.
9 expectedType reflect.Type// 反射的类型,也就是要监控的对象类型,比如Pod
10 // The destination to sync up with the watch source
11 store Store// 存储,就是DeltaFIFO,为什么,后面会有代码证明
12 // listerWatcher is used to perform lists and watches.
13 listerWatcher ListerWatcher// 这个是用来从apiserver获取资源用的
14 // period controls timing between one watch ending and
15 // the beginning of the next one.
16 period time.Duration// 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
17 //period这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
18 resyncPeriod time.Duration// 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
19 // resyncPeriod其实这里面同步指的是shared_informer使用者需要定期同步全量对象
20 ShouldResync func() bool// 如果需要同步,调用这个函数问一下,当然前提是该函数指针不为空
21 // clock allows tests to manipulate time
22 clock clock.Clock// 时钟
23 // lastSyncResourceVersion is the resource version token last
24 // observed when doing a sync with the underlying store
25 // it is thread safe, but not synchronized with the underlying store
26 lastSyncResourceVersion string// 最后一次同步的资源版本
27 // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
28 lastSyncResourceVersionMutex sync.RWMutex// 还专门为最后一次同步的资源版本弄了个锁
29 // WatchListPageSize is the requested chunk size of initial and resync watch lists.
30 // Defaults to pager.PageSize.
31 WatchListPageSize int64
32}
- listerWatcher用于获取和监控资源,lister可以获取对象的全量,watcher可以获取对象的增量(变化);
- 系统会周期性的执行list-watch的流程,一旦过程中失败就要重新执行流程,这个重新执行的周期就是period指定的;
- expectedType规定了监控对象的类型,非此类型的对象将会被忽略;
- 实例化后的expectedType类型的对象会被添加到store中;
- kubernetes资源在apiserver中都是有版本的,对象的任何除了修改(添加、删除、更新)都会造成资源版本更新,所以lastSyncResourceVersion就是指的这个版本;
- 如果使用者需要定期同步全量对象,那么Reflector就会定期产生全量对象的同步事件给DeltaFIFO;
ListAndWatch在Reflector.Run函数中启动,并以Reflector.period周期性进行调度。ListAndWatch使用resourceVersion来获取资源的增量变化:在List时会获取资源的首个resourceVersion值,在Watch的时候会使用List获取的resourceVersion来获取资源的增量变化,然后将获取到的资源的resourceVersion保存起来,作为下一次Watch的基线。
1// 代码源自client-go/tools/cache/reflector.go
2func (r *Reflector) Run(stopCh <-chan struct{}) {
3 // func Until(f func(), period time.Duration, stopCh <-chan struct{})是下面函数的声明
4 // 这里面我们不用关心wait.Until是如何实现的,只要知道他调用函数f会被每period周期执行一次
5 // 意思就是f()函数执行完毕再等period时间后在执行一次,也就是r.ListAndWatch()会被周期性的调用
6 wait.Until(func() {
7 if err := r.ListAndWatch(stopCh); err != nil {
8 utilruntime.HandleError(err)
9 }
10 }, r.period, stopCh)
11}
12
13// 代码源自client-go/tools/cache/reflector.go
14func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
15 var resourceVersion string
16 // 很多存储类的系统都是这样设计的,数据采用版本的方式记录,数据每变化(添加、删除、更新)都会触发版本更新,
17 // 这样的做法可以避免全量数据访问。以apiserver资源监控为例,只要监控比缓存中资源版本大的对象就可以了,
18 // 把变化的部分更新到缓存中就可以达到与apiserver一致的效果,一般资源的初始版本为0,从0版本开始列举就是全量的对象了
19 options := metav1.ListOptions{ResourceVersion: "0"}
20 // 与监控相关的内容不多解释
21 r.metrics.numberOfLists.Inc()
22 start := r.clock.Now()
23 // 列举资源,这部分是apimachery相关的内容,读者感兴趣可以自己了解
24 list, err := r.listerWatcher.List(options)
25 if err != nil {
26 return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
27 }
28 // 还是监控相关的
29 r.metrics.listDuration.Observe(time.Since(start).Seconds())
30 // 下面的代码主要是利用apimachinery相关的函数实现,就是把列举返回的结果转换为对象数组
31 // 下面的代码大部分来自apimachinery,此处不做过多说明,读者只要知道实现什么功能就行了
32 listMetaInterface, err := meta.ListAccessor(list)
33 if err != nil {
34 return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
35 }
36 resourceVersion = listMetaInterface.GetResourceVersion()
37
38 items, err := meta.ExtractList(list)
39 if err != nil {
40 return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
41 }
42 // 和监控相关的内容
43 r.metrics.numberOfItemsInList.Observe(float64(len(items)))
44 // 以上部分都是对象实例化的过程,可以称之为反射,也是Reflector这个名字的主要来源,本文不是讲解反射原理的,
45 // 而是作为SharedInformer的前端,所以我们重点介绍的是对象在SharedInformer中流转过程,所以反射原理部分不做为重点讲解
46 // 这可是真正从apiserver同步过来的全量对象,所以要同步到DeltaFIFO中
47 if err := r.syncWith(items, resourceVersion); err != nil {
48 return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
49 }
50 // 设置最新的同步的对象版本
51 r.setLastSyncResourceVersion(resourceVersion)
52 // 下面要启动一个后台协程实现定期的同步操作,这个同步就是将SharedInformer里面的对象全量以同步事件的方式通知使用者
53 // 我们暂且称之为“后台同步协程”,Run()函数退出需要后台同步协程退出,所以下面的cancelCh就是干这个用的
54 // 利用defer close(cancelCh)实现的,而resyncerrc是后台同步协程反向通知Run()函数的报错通道
55 // 当后台同步协程出错,Run()函数接收到信号就可以退出了
56 resyncerrc := make(chan error, 1)
57 cancelCh := make(chan struct{})
58 defer close(cancelCh)
59 // 下面这个匿名函数就是后台同步协程的函数了
60 go func() {
61 // resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
62 resyncCh, cleanup := r.resyncChan()
63 defer func() {
64 cleanup()
65 }()
66 // 死循环等待各种信号
67 for {
68 // 只有定时器有信号才继续处理,其他的都会退出
69 select {
70 case <-resyncCh:
71 case <-stopCh:
72 return
73 case <-cancelCh:
74 return
75 }
76 // ShouldResync是个函数地址,创建反射器对象的时候传入,即便时间到了,也要通过函数问问是否需要同步
77 if r.ShouldResync == nil || r.ShouldResync() {
78 // 我们知道这个store是DeltaFIFO,DeltaFIFO.Resync()做了什么,读者自行温习相关的文章~
79 // 就在这里实现了我们前面提到的同步,从这里看所谓的同步就是以全量对象同步事件的方式通知使用者
80 if err := r.store.Resync(); err != nil {
81 resyncerrc <- err
82 return
83 }
84 }
85 // 清理掉当前的计时器,获取下一个同步时间定时器
86 cleanup()
87 resyncCh, cleanup = r.resyncChan()
88 }
89 }()
90
91 // 前面已经列举了全量对象,接下来就是watch的逻辑了
92 for {
93 // 如果有退出信号就立刻返回,否则就会往下走,因为有default.
94 select {
95 case <-stopCh:
96 return nil
97 default:
98 }
99
100 // 计算watch的超时时间
101 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
102 // 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
103 // 如果没有资源变化总不能一直挂着吧?也不知道是卡死了还是怎么了,所以有一个超时会好一点
104 options = metav1.ListOptions{
105 ResourceVersion: resourceVersion,
106 TimeoutSeconds: &timeoutSeconds,
107 }
108 // 监控相关
109 r.metrics.numberOfWatches.Inc()
110 // 开始监控对象
111 w, err := r.listerWatcher.Watch(options)
112 // watch产生错误了,大部分错误就要退出函数然后再重新来一遍流程
113 if err != nil {
114 switch err {
115 case io.EOF:
116 case io.ErrUnexpectedEOF:
117 default:
118 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
119 }
120 // 类似于网络拒绝连接的错误要等一会儿再试,因为可能网络繁忙
121 if urlError, ok := err.(*url.Error); ok {
122 if opError, ok := urlError.Err.(*net.OpError); ok {
123 if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
124 time.Sleep(time.Second)
125 continue
126 }
127 }
128 }
129 return nil
130 }
131
132 // watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
133 // 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
134 // 用于异步通知退出或者后台同步协程错误
135 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
136 if err != errorStopRequested {
137 glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
138 }
139 return nil
140 }
141 }
142}
143
144上面的函数中,调用了两个私有函数,分别为syncWith()和watchHandler()。syncWith()用于实现一次从apiserver全量对象的同步,这里的同步和我们上面提到的同步不是一回事,这里指的是从apiserver的同步。watchHandler是实现监控apiserver资源变化的处理过程,主要就是把apiserver的资源变化转换为DeltaFIFO调用。我们接下来就看这两个函数的具体实现
145
146接下来我们就要看看watchHandler做了什么?
147
148// 代码源自client-go/tools/cache/reflector.go
149// 实现apiserver全量对象的同步
150func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
151 // 做一次slice类型转换
152 found := make([]interface{}, 0, len(items))
153 for _, item := range items {
154 found = append(found, item)
155 }
156 // 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
157 return r.store.Replace(found, resourceVersion)
158}
159// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
160func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
161 start := r.clock.Now()
162 eventCount := 0
163 // 监控相关
164 defer func() {
165 r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
166 r.metrics.watchDuration.Observe(time.Since(start).Seconds())
167 }()
168
169 // 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
170loop:
171 for {
172 select {
173 // 系统退出信号
174 case <-stopCh:
175 return errorStopRequested
176 // 后台同步协程出错信号
177 case err := <-errc:
178 return err
179 // watch函数返回的是一个chan,通过这个chan持续的读取对象
180 case event, ok := <-w.ResultChan():
181 // 如果不OK,说明chan关闭了,就要重新获取,这里面我们可以推测这个chan可能会运行过程中重新创建
182 // 否则就应该退出而不是继续循环
183 if !ok {
184 break loop
185 }
186 // 看来event可以作为错误的返回值,挺有意思,而不是通过关闭chan,这种方式可以传递错误信息,关闭chan做不到
187 if event.Type == watch.Error {
188 return apierrs.FromObject(event.Object)
189 }
190 // 这里面就是利用反射实例化对象了,而且判断了对象类型是我们设定的类型
191 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
192 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
193 continue
194 }
195 // 和list操作相似,也要获取对象的版本,要更新缓存中的版本,下次watch就可以忽略这些资源了
196 meta, err := meta.Accessor(event.Object)
197 if err != nil {
198 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
199 continue
200 }
201 newResourceVersion := meta.GetResourceVersion()
202 // 根据事件的类型做不同的DeltaFIFO的操作
203 switch event.Type {
204 // 向DeltaFIFO添加一个添加的Delta
205 case watch.Added:
206 err := r.store.Add(event.Object)
207 if err != nil {
208 utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
209 }
210 // 更新对象,向DeltaFIFO添加一个更新的Delta
211 case watch.Modified:
212 err := r.store.Update(event.Object)
213 if err != nil {
214 utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
215 }
216 // 删除对象,向DeltaFIFO添加一个删除的Delta
217 case watch.Deleted:
218 err := r.store.Delete(event.Object)
219 if err != nil {
220 utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
221 }
222 // 其他类型就不知道干什么了,只能报错
223 default:
224 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
225 }
226 // 更新最新资源版本
227 *resourceVersion = newResourceVersion
228 r.setLastSyncResourceVersion(newResourceVersion)
229 eventCount++
230 }
231 }
232 // watch返回时间非常短而且没有任何事件要处理,这个属于异常现象,因为我们watch是设置了超时的
233 watchDuration := r.clock.Now().Sub(start)
234 if watchDuration < 1*time.Second && eventCount == 0 {
235 r.metrics.numberOfShortWatches.Inc()
236 return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
237 }
238
239 return nil
240}
241
242// 代码源自client-go/tools/cache/reflector.go
243func (r *Reflector) setLastSyncResourceVersion(v string) {
244 // 设置已经获取到资源的最新版本
245 r.lastSyncResourceVersionMutex.Lock()
246 defer r.lastSyncResourceVersionMutex.Unlock()
247 r.lastSyncResourceVersion = v
248
249 rv, err := strconv.Atoi(v)
250 if err == nil {
251 r.metrics.lastResourceVersion.Set(float64(rv))
252 }
253}
254
255// 获取resync定时器,叫定时器比较好理解,叫chan很难和定时关联起来
256func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
257 // 如果resyncPeriod说明就不用定时同步,返回的是永久超时的定时器
258 if r.resyncPeriod == 0 {
259 return neverExitWatch, func() bool { return false }
260 }
261 // 构建定时起
262 t := r.clock.NewTimer(r.resyncPeriod)
263 return t.C(), t.Stop
264}
如可以使用如下命令获取Pod的resourceVersion
1kubectl get pod podname -o yaml|grep resourceVersion
上图中的Resync触发的Sync动作,其作用与Replace中的第三步相同,用于将knowObject中的对象与DeltaFIFO中同步。这种操作是有必要的
总结
- Reflector利用apiserver的client列举全量对象(版本为0以后的对象全部列举出来)
- 将全量对象采用Replace()接口同步到DeltaFIFO中,并且更新资源的版本号,这个版本号后续会用到;
- 开启一个协程定时执行resync,如果没有设置定时同步则不会执行,同步就是把全量对象以同步事件的方式通知出去;
- 通过apiserver的client监控(watch)资源,监控的当前资源版本号以后的对象,因为之前的都已经获取到了;
- 一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,产生一个相应的对象Delta,同时更新当前资源的版本;
通过Kubernetes API监控Kubernetes的资源类型
- 采用List、Watch机制
- 可以Watch任何资源包括CRD
- 添加object对象到FIFO队列,然后Informer会从队列里面取数据
(非组件)DeltaFIFO 源码
DeltaFIFO的源码注释写的比较清楚,它是一个生产者-消费者队列(先入先出),生产者为Reflector,消费者为Pop()函数,从架构图中可以看出DeltaFIFO的数据来源为Reflector,通过Pop操作消费数据,消费的数据一方面存储到Indexer中,另一方面可以通过informer的handler进行处理(见下文)。informer的handler处理的数据需要与存储在Indexer中的数据匹配。需要注意的是,Pop的单位是一个Deltas,而不是Delta。
DeltaFIFO同时实现了Queue和Store接口。DeltaFIFO使用Deltas保存了对象状态的变更(Add/Delete/Update)信息(如Pod的删除添加等),Deltas缓存了针对相同对象的多个状态变更信息,如Pod的Deltas[0]可能更新了标签,Deltas[1]可能删除了该Pod。最老的状态变更信息为Newest(),最新的状态变更信息为Oldest()。使用中,获取DeltaFIFO中对象的key以及获取DeltaFIFO都以最新状态为准。
1# tools/cache/delta_fifo.go
2// DeltaType is the type of a change (addition, deletion, etc)
3type DeltaType string // Delta的类型用字符串表达
4
5// Change type definition
6const (
7 Added DeltaType = "Added"
8 Updated DeltaType = "Updated"
9 Deleted DeltaType = "Deleted"
10 // The other types are obvious. You'll get Sync deltas when:
11 // * A watch expires/errors out and a new list/watch cycle is started.
12 // * You've turned on periodic syncs.
13 // (Anything that trigger's DeltaFIFO's Replace() method.)
14 Sync DeltaType = "Sync" // 同步
15)
16
17// Delta is the type stored by a DeltaFIFO. It tells you what change
18// happened, and the object's state after* that change.
19//
20// [*] Unless the change is a deletion, and then you'll get the final
21// state of the object before it was deleted.
22type Delta struct {
23 Type DeltaType // Delta类型,比如增、减,后面有详细说明
24 Object interface{} // 对象,Delta的粒度是一个对象
25}
26
27// Deltas is a list of one or more 'Delta's to an individual object.
28// The oldest delta is at index 0, the newest delta is the last one.
29type Deltas []Delta // Delta数组
我们再说一说如下几个类型,因为他们定义在DeltaFIFO的文件中,而且在很多地方应用:
1# tools/cache/delta_fifo.go
2
3// 这个接口类型就是下面面两个接口类型的组合了
4// A KeyListerGetter is anything that knows how to list its keys and look up by key.
5type KeyListerGetter interface {
6 KeyLister
7 KeyGetter
8}
9// 这是一个非常通用的接口类型,只定义了一个接口函数,就是返回所有的keys
10// A KeyLister is anything that knows how to list its keys.
11type KeyLister interface {
12 ListKeys() []string
13}
14// 这也是一个非常通用的接口类型,只定义了一个接口函数,就是通过key获取对象
15// A KeyGetter is anything that knows how to get the value stored under a given key.
16type KeyGetter interface {
17 GetByKey(key string) (interface{}, bool, error)
18}
有没有发现上面两个接口在client-go.tools.cache.Store这个接口类型中也存在,也就是说实现了Store接口的类型同时也实现了上面三个接口
接下来再来认识一个类型:
1# tools/cache/fifo.go
2// 这个才是FIFO的抽象,DeltaFIFO只是FIFO的一种实现。
3// Queue is exactly like a Store, but has a Pop() method too.
4type Queue interface {
5 Store // 实现了存储接口,这个很好理解,FIFO也是一种存储
6 // 在存储的基础上增加了Pop接口,用于弹出对象
7 // Pop blocks until it has something to process.
8 // It returns the object that was process and the result of processing.
9 // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
10 // should be requeued before releasing the lock on the queue.
11 Pop(PopProcessFunc) (interface{}, error)
12 // 对象如果不在队列中就添加
13 // AddIfNotPresent adds a value previously
14 // returned by Pop back into the queue as long
15 // as nothing else (presumably more recent)
16 // has since been added.
17 AddIfNotPresent(interface{}) error
18 // 通过Replace()放入第一批对象到队列中并且已经被Pop()全部取走
19 // HasSynced returns true if the first batch of items has been popped
20 HasSynced() bool
21 // 关闭队列
22 // Close queue
23 Close()
24}
25
DeltaFIFO实现
我们先来看看DeltaFIFO的类型定义:
1# tools/cache/delta_fifo.go
2// DeltaFIFO is like FIFO, but allows you to process deletes.
3//
4// DeltaFIFO is a producer-consumer queue, where a Reflector is
5// intended to be the producer, and the consumer is whatever calls
6// the Pop() method.
7//
8// DeltaFIFO solves this use case:
9// * You want to process every object change (delta) at most once.
10// * When you process an object, you want to see everything
11// that's happened to it since you last processed it.
12// * You want to process the deletion of objects.
13// * You might want to periodically reprocess objects.
14//
15// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
16// interface{} to satisfy the Store/Queue interfaces, but it
17// will always return an object of type Deltas.
18//
19// A note on threading: If you call Pop() in parallel from multiple
20// threads, you could end up with multiple threads processing slightly
21// different versions of the same object.
22//
23// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
24// to list keys that are "known", for the purpose of figuring out which
25// items have been deleted when Replace() or Delete() are called. The deleted
26// object will be included in the DeleteFinalStateUnknown markers. These objects
27// could be stale.
28type DeltaFIFO struct {
29 // lock/cond protects access to 'items' and 'queue'.
30 lock sync.RWMutex// 读写锁,因为涉及到同时读写,读写锁性能要高
31 cond sync.Cond// 给Pop()接口使用,在没有对象的时候可以阻塞,内部锁复用读写锁
32 // 这个应该是Store的本质了,按照kv的方式存储对象,但是存储的是对象的Deltas数组
33 // We depend on the property that items in the set are in
34 // the queue and vice versa, and that all Deltas in this
35 // map have at least one Delta.
36 items map[string]Deltas
37 queue []string// 这个是为先入先出实现的,存储的就是对象的键
38 // 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
39 // populated is true if the first batch of items inserted by Replace() has been populated
40 // or Delete/Add/Update was called first.
41 populated bool
42 // initialPopulationCount is the number of items inserted by the first call of Replace()
43 initialPopulationCount int// 通过Replace()接口将第一批对象放入队列的对象数量
44
45 // keyFunc is used to make the key used for queued item
46 // insertion and retrieval, and should be deterministic.
47 keyFunc KeyFunc// 对象键计算函数,在Indexer那篇文章介绍过
48 // 前面介绍就是为了这是用,该对象指向的就是Indexer,
49 // knownObjects list keys that are "known", for the
50 // purpose of figuring out which items have been deleted
51 // when Replace() or Delete() is called.
52 knownObjects KeyListerGetter
53
54 // Indication the queue is closed.
55 // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
56 // Currently, not used to gate any of CRED operations.
57 closed bool// 是否已经关闭的标记
58 closedLock sync.Mutex// 专为关闭设计的锁
59}
60
61var (
62 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
63)
queueActionLocked函数
1// 代码源自client-go/tools/cache/delta_fifo.go
2// 从函数名称来看把“动作”放入队列中,这个动作就是DeltaType,而且已经加锁了
3func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
4 // 前面提到的计算对象键的函数
5 id, err := f.KeyOf(obj)
6 if err != nil {
7 return KeyError{obj, err}
8 }
9 // 如果是同步,并且对象未来会被删除,那么就直接返回,没必要记录这个动作了
10 // 肯定有人会问为什么Add/Delete/Update这些动作可以,因为同步对于已经删除的对象是没有意义的
11 // 已经删除的对象后续跟添加、更新有可能,因为同名的对象又被添加了,删除也是有可能
12 // 删除有些复杂,后面会有说明
13 if actionType == Sync && f.willObjectBeDeletedLocked(id) {
14 return nil
15 }
16 // 同一个对象的多次操作,所以要追加到Deltas数组中
17 newDeltas := append(f.items[id], Delta{actionType, obj})
18 // 合并操作,去掉冗余的delta
19 newDeltas = dedupDeltas(newDeltas)
20 // 判断对象是否已经存在
21 _, exists := f.items[id]
22 // 合并后操作有可能变成没有Delta么?后面的代码分析来看应该不会,所以暂时不知道这个判断目的
23 if len(newDeltas) > 0 {
24 // 如果对象没有存在过,那就放入队列中,如果存在说明已经在queue中了,也就没必要再添加了
25 if !exists {
26 f.queue = append(f.queue, id)
27 }
28 // 更新Deltas数组,通知所有调用Pop()的人
29 f.items[id] = newDeltas
30 f.cond.Broadcast()
31 } else if exists {
32 // 直接把对象删除,这段代码我不知道什么条件会进来,因为dedupDeltas()肯定有返回结果的
33 // 后面会有dedupDeltas()详细说明
34 delete(f.items, id)
35 }
36 return nil
37}
- DeltaFIFO生产者和消费者是异步的,如果同一个目标的频繁操作,前面操作还缓存在队列中的时候,那么队列就要缓冲对象的所有操作,那可以将多个操作合并么?这是下面讨论的了;
- 对于更新这种类型的操作在没有全量基础的情况下是没法合并的,同时我们还不知道具体是什么类型的对象,所以能合并的也就是有添加/删除,两个添加/删除操作其实可以视为一个;
合并操作的具体实现:
1// 代码源自client-go/tools/cache/delta_fifo.go
2func dedupDeltas(deltas Deltas) Deltas {
3 // 小于2个delta,那就是1个呗,没啥好合并的
4 n := len(deltas)
5 if n < 2 {
6 return deltas
7 }
8 // 取出最后两个
9 a := &deltas[n-1]
10 b := &deltas[n-2]
11 // 判断如果是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
12 if out := isDup(a, b); out != nil {
13 d := append(Deltas{}, deltas[:n-2]...)
14 return append(d, *out)
15 }
16 return deltas
17}
18// 判断两个Delta是否是重复的
19func isDup(a, b *Delta) *Delta {
20 // 只有一个判断,只能判断是否为删除类操作,和我们上面的判断相同
21 // 这个函数的本意应该还可以判断多种类型的重复,当前来看只能有删除这一种能够合并
22 if out := isDeletionDup(a, b); out != nil {
23 return out
24 }
25
26 return nil
27}
28// 判断是否为删除类的重复
29func isDeletionDup(a, b *Delta) *Delta {
30 // 二者都是删除那肯定有一个是重复的
31 if b.Type != Deleted || a.Type != Deleted {
32 return nil
33 }
34 // 理论上返回最后一个比较好,但是对象已经不再系统监控范围,前一个删除状态是好的
35 if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
36 return a
37 }
38 return b
39}
因为系统对于删除的对象有DeletedFinalStateUnknown这个状态,所以会存在两次删除的情况,但是两次添加同一个对象由于apiserver可以保证对象的唯一性,所以处理中就没有考虑合并两次添加操作。
接下来我们来看看Replace()函数的实现,这个也是Store定义的接口:
1// 代码源自client-go/tools/cache/delta_fifo.go
2func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
3 f.lock.Lock()
4 defer f.lock.Unlock()
5 keys := make(sets.String, len(list))
6 // 遍历所有的输入目标
7 for _, item := range list {
8 // 计算目标键
9 key, err := f.KeyOf(item)
10 if err != nil {
11 return KeyError{item, err}
12 }
13 // 记录处理过的目标键,采用set存储,是为了后续快速查找
14 keys.Insert(key)
15 // 因为输入是目标全量,所以每个目标相当于重新同步了一次
16 if err := f.queueActionLocked(Sync, item); err != nil {
17 return fmt.Errorf("couldn't enqueue object: %v", err)
18 }
19 }
20 // 如果没有存储的话,自己存储的就是所有的老对象,目的要看看那些老对象不在全量集合中,那么就是删除的对象了
21 if f.knownObjects == nil {
22 // 遍历所有的元素
23 for k, oldItem := range f.items {
24 // 这个目标在输入的对象中存在就可以忽略
25 if keys.Has(k) {
26 continue
27 }
28 // 输入对象中没有,说明对象已经被删除了。
29 var deletedObj interface{}
30 if n := oldItem.Newest(); n != nil {
31 deletedObj = n.Object
32 }
33 // 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
34 // 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
35 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
36 return err
37 }
38 }
39
40 // 如果populated还没有设置,说明是第一次并且还没有任何修改操作执行过
41 if !f.populated {
42 f.populated = true
43 f.initialPopulationCount = len(list) // 记录第一次通过来的对象数量
44 }
45
46 return nil
47 }
48 // 下面处理的就是检测某些目标删除但是Delta没有在队列中
49 // 从存储中获取所有对象键
50 knownKeys := f.knownObjects.ListKeys()
51 queuedDeletions := 0
52 for _, k := range knownKeys {
53 // 对象还存在那就忽略
54 if keys.Has(k) {
55 continue
56 }
57 // 获取对象
58 deletedObj, exists, err := f.knownObjects.GetByKey(k)
59 if err != nil {
60 deletedObj = nil
61 glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
62 } else if !exists {
63 deletedObj = nil
64 glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
65 }
66 // 累积删除的对象数量
67 queuedDeletions++
68 // 把对象删除的Delta放入队列
69 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
70 return err
71 }
72 }
73 // 和上面的代码差不多,只是计算initialPopulationCount值的时候增加了删除对象的数量
74 if !f.populated {
75 f.populated = true
76 f.initialPopulationCount = len(list) + queuedDeletions
77 }
78
79 return nil
80}
从Replace()的实现来看,主要用于实现对象的全量更新。这个可以理解为DeltaFIFO在必要的时刻做一次全量更新,这个时刻可以是定期的,也可以是事件触发的。由于DeltaFIFO对外输出的就是所有目标的增量变化,所以每次全量更新都要判断对象是否已经删除,因为在全量更新前可能没有收到目标删除的请求。这一点与cache不同,cache的Replace()相当于重建,因为cache就是对象全量的一种内存映射,所以Replace()就等于重建。
那我来问题一个非常有水平的问题,为什么knownObjects为nil时需要对比队列和对象全量来判断对象是否删除,而knownObjects不为空的时候就不需要了?如果读者想判断自己是否已经全部理解可以不看下面自己想想。
我们前面说过,knownObjects就是Indexer(具体实现是cache),而开篇的那副图已经非常明确的描述了二者以及使用之间的关系。也就是说knownObjects有的对象就是使用者知道的所有对象,此时即便队列(DeltaFIFO)中有相应的对象,在更新的全量对象中又被删除了,那就没必要通知使用者对象删除了,这种情况可以假想为系统短时间添加并删除了对象,对使用者来说等同于没有这个对象。
现在,我们来看看Queue相对于Stored扩展的Pop接口:
1// 代码源自client-go/tools/cache/delta_fifo.go
2func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
3 f.lock.Lock()
4 defer f.lock.Unlock()
5 for {
6 // 队列中有数据么?
7 for len(f.queue) == 0 {
8 // 看来是先判断的是否有数据,后判断是否关闭,这个和chan像
9 if f.IsClosed() {
10 return nil, FIFOClosedError
11 }
12 // 没数据那就等待把
13 f.cond.Wait()
14 }
15 // 取出第一个对象
16 id := f.queue[0]
17 // 数组缩小,相当于把数组中的第一个元素弹出去了,这个不多解释哈
18 f.queue = f.queue[1:]
19 // 取出对象,因为queue中存的是对象键
20 item, ok := f.items[id]
21 // 同步对象计数减一,当减到0就说明外部已经全部同步完毕了
22 if f.initialPopulationCount > 0 {
23 f.initialPopulationCount--
24 }
25 // 对象不存在,这个是什么情况?貌似我们在合并对象的时候代码上有这个逻辑,估计永远不会执行
26 if !ok {
27 continue
28 }
29 // 把对象删除
30 delete(f.items, id)
31 // Pop()需要传入一个回调函数,用于处理对象
32 err := process(item)
33 // 如果需要重新入队列,那就重新入队列
34 if e, ok := err.(ErrRequeue); ok {
35 f.addIfNotPresent(id, item)
36 err = e.Err
37 }
38
39 return item, err
40 }
41}
上面分析的函数基本上就算是把DeltaFIFO核心逻辑分析完毕了,下面我们就把其他的接口函数简单过一下
1// 代码源自client-go/tools/cache/delta_fifo.go
2func (f *DeltaFIFO) HasSynced() bool {
3 f.lock.Lock()
4 defer f.lock.Unlock()
5 // 这里就比较明白了,一次同步全量对象后,并且全部Pop()出去才能算是同步完成
6 // 其实这里所谓的同步就是全量内容已经进入Indexer,Indexer已经是系统中对象的全量快照了
7 return f.populated && f.initialPopulationCount == 0
8}
9// 添加不存在的对象
10func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
11 // 这个要求放入的必须是Deltas数组,就是通过Pop()弹出的对象
12 deltas, ok := obj.(Deltas)
13 if !ok {
14 return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
15 }
16 // 多个Delta都是一个对象,所以用最新的就可以了
17 id, err := f.KeyOf(deltas.Newest().Object)
18 if err != nil {
19 return KeyError{obj, err}
20 }
21 // 后面有实现
22 f.lock.Lock()
23 defer f.lock.Unlock()
24 f.addIfNotPresent(id, deltas)
25 return nil
26}
27// 这个是添加不存在对象的实现
28func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
29 f.populated = true
30 // 这里判断的对象是否存在
31 if _, exists := f.items[id]; exists {
32 return
33 }
34 // 放入队列中
35 f.queue = append(f.queue, id)
36 f.items[id] = deltas
37 f.cond.Broadcast()
38}
39// 重新同步,这个在cache实现是空的,这里面有具体实现
40func (f *DeltaFIFO) Resync() error {
41 f.lock.Lock()
42 defer f.lock.Unlock()
43 // 如果没有Indexer那么重新同步是没有意义的,因为连同步了哪些对象都不知道
44 if f.knownObjects == nil {
45 return nil
46 }
47 // 列举Indexer里面所有的对象键
48 keys := f.knownObjects.ListKeys()
49 // 遍历对象键,为每个对象产生一个同步的Delta
50 for _, k := range keys {
51 // 具体实现后面有介绍
52 if err := f.syncKeyLocked(k); err != nil {
53 return err
54 }
55 }
56 return nil
57}
58// 具体对象同步实现接口
59func (f *DeltaFIFO) syncKeyLocked(key string) error {
60 // 获取对象
61 obj, exists, err := f.knownObjects.GetByKey(key)
62 if err != nil {
63 glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
64 return nil
65 } else if !exists {
66 glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
67 return nil
68 }
69 // 计算对象的键值,有人会问对象键不是已经传入了么?那个是存在Indexer里面的对象键,可能与这里的计算方式不同
70 id, err := f.KeyOf(obj)
71 if err != nil {
72 return KeyError{obj, err}
73 }
74 // 对象已经在存在,说明后续会通知对象的新变化,所以再加更新也没意义
75 if len(f.items[id]) > 0 {
76 return nil
77 }
78 // 添加对象同步的这个Delta
79 if err := f.queueActionLocked(Sync, obj); err != nil {
80 return fmt.Errorf("couldn't queue object: %v", err)
81 }
82 return nil
83}
总结
-
判断是否已同步populated和initialPopulationCount这两个变量存在的目的是什么?我的理解是否已同步指的是第一次从apiserver获取全量对象是否已经全部通知到外部,也就是通过Pop()被取走。所谓的同步就是指apiserver的状态已经同步到缓存中了,也就是Indexer中;
-
接口AddIfNotPresent()存在的目的是什么,只有在Pop()函数中使用了一次,但是在调用这个接口的时候已经从map中删除了,所以肯定不存在。这个接口在我看来主要用来保险的,因为Pop()本身就存在重入队列的可能,外部如果判断返回错误重入队列就可能会重复;
DeltaFIFO结构中比较难以理解的是knownObjects,它的类型为KeyListerGetter。其接口中的方法ListKeys和GetByKey也是Store接口中的方法,因此knownObjects能够被赋值为实现了Store的类型指针;同样地,由于Indexer继承了Store方法,因此knownObjects能够被赋值为实现了Indexer的类型指针。
DeltaFIFO.knownObjects.GetByKey就是执行的store.go中的GetByKey函数,用于获取Indexer中的对象键。
initialPopulationCount用于表示是否完成全量同步,initialPopulationCount在Replace函数中增加,在Pop函数中减小,当initialPopulationCount为0且populated为true时表示Pop了所有Replace添加到DeltaFIFO中的对象,populated用于判断是DeltaFIFO中是否为初始化状态(即没有处理过任何对象)。
在NewSharedIndexInformer(client-go/tools/cache/shared_informer.go)函数中使用下面进行初始化一个sharedIndexInformer,即使用函数DeletionHandlingMetaNamespaceKeyFunc初始化indexer,并在sharedIndexInformer.Run中将该indexer作为knownObjects入参,最终初始化为一个DeltaFIFO。
1NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) //NewDeltaFIFO
2---
3fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) //sharedIndexInformer.Run
knownObjects实际使用时为Indexer,它对应图中的localStore,DeltaFIFO根据其保存的对象状态变更消息处理(增/删/改/同步)knownObjects中相应的对象。其中同步(Sync)Detals中即将被删除的对象是没有意义的(参见willObjectBeDeletedLocked函数)。
ListWatch的list步骤中会调用Replace(client-go/tools/cache/delta_fifo.go)函数来对DeltaFIFO进行全量更新,包括3个步骤:
- Sync所有DeltaFIFO中的对象,将输入对象全部加入DeltaFIFO;
- 如果knownObjects为空,则删除DeltaFIFO中不存在于输入对象的对象,使DeltaFIFO中的有效对象(非DeletedFinalStateUnknown)等同于输入对象;
- 如果knownObjects非空,获取knownObjects中不存在于输入对象的对象,并在DeltaFIFO中删除这些对象。
第2步好理解,knownObjects为空,只需要更新DeltaFIFO即可。第3步中,当knownObjects非空时,需要以knowObjects为基准进行对象的删除,否则会造成indexer中的数据与apiserver的数据不一致,举个例子,比如knownObjects中的对象为{obj1, obj2, obj3},而DeltaFIFO中待处理的对象为{obj2, obj3,obj4},如果仅按照2步骤进行处理,会导致knownObjects中残留obj1,因此需要在DeltaFIFO中添加删除obj1变更消息。从下面ShareInformer章节的图中可以看出,knownObjects(即Indexer)的数据只能通过DeltaFIFO变更。
其它源码
ShareInformer源码
下图为SharedInformer的运行图。可以看出SharedInformer启动了controller,reflector,并将其与Indexer结合起来。
注:不同颜色表示不同的chan,相同颜色表示在同一个chan中的处理 SharedInformer.Run启动了两个chan,s.c.Run为controller的入口,s.c.Run函数中会Pop DeltaFIFO中的元素,并根据DeltaFIFO的元素的类型(Sync/Added/Updated/Deleted)进两类处理,一类会使用indexer.Update,indexer,Add,indexer.Delete对保存的在Store中的数据进行处理;另一类会根据DeltaFIFO的元素的类型将其封装为sharedInformer内部类型updateNotification,addNotification,deleteNotification,传递给s.processor.Listeners.addCh,后续给注册的pl.handler处理。
s.processor.run主要用于处理注册的handler,processorListener.run函数接受processorListener.nextCh中的值,将其作为参数传递给handler进行处理。而processorListener.pop负责将processorListener.addCh中的元素缓存到p.pendingNotifications,并读取p.pendingNotifications中的元素,将其传递到processorListener.nextCh。即processorListener.pop负责管理数据,processorListener.run负责使用processorListener.pop管理的数据进行处理。
1// client-go/tools/cache/controller.go
2type ResourceEventHandler interface {
3 OnAdd(obj interface{})
4 OnUpdate(oldObj, newObj interface{})
5 OnDelete(obj interface{})
6}
sharedIndexInformer有3个状态:启动前,启动后,停止后,由started, stopped两个bool值表示。
stopped=true表示inforer不再运作且不能添加新的handler(因为即使添加了也不会运行)
informer启动前和停止后允许添加新的indexer(sharedIndexInformer.AddIndexers),但不能在informer运行时添加,因为此时需要通过listwatch以及handler等一系列处理来操作sharedIndexInformer.inxder。如果允许同时使用sharedIndexInformer.AddIndexers,可能会造成数据不一致。
还有一个状态sharedProcessor.listenersStarted,用于表示是否所有的s.processor.Listeners都已经启动,如果已经启动,则在添加新的processorListener时,需要运行新添加的processorListener,否则仅仅添加即可(添加后同样会被sharedProcessor.run调度)
1// 代码源自client-go/tools/cache/shared_informer.go
2type SharedInformer interface {
3 // 添加资源事件处理器,关于ResourceEventHandler的定义在下面
4 // 相当于注册回调函数,当有资源变化就会通过回调通知使用者,是不是能和上面介绍的Controller可以联系上了?
5 // 为什么是Add不是Reg,说明可以支持多个handler
6 AddEventHandler(handler ResourceEventHandler)
7 // 上面添加的是不需要周期同步的处理器,下面的接口添加的是需要周期同步的处理器,周期同步上面提了好多遍了,不赘述
8 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
9 // Store这个有专门的文章介绍,这个函数就是获取Store的接口,说明SharedInformer内有Store对象
10 GetStore() Store
11 // Controller在上面的章节介绍了,说明SharedInformer内有Controller对象
12 GetController() Controller
13 // 这个应该是SharedInformer的核心逻辑实现的地方
14 Run(stopCh <-chan struct{})
15 // 因为有Store,这个函数就是告知使用者Store里面是否已经同步了apiserver的资源,这个接口很有用
16 // 当创建完SharedInformer后,通过Reflector从apiserver同步全量对象,然后在通过DeltaFIFO一个一个的同志到cache
17 // 这个接口就是告知使用者,全量的对象是不是已经同步到了cache,这样就可以从cache列举或者查询了
18 HasSynced() bool
19 // 最新同步资源的版本,这个就不多说了,通过Controller(Controller通过Reflector)实现
20 LastSyncResourceVersion() string
21}
22// 扩展了SharedInformer类型,从类型名字上看共享的是Indexer,Indexer也是一种Store的实现
23type SharedIndexInformer interface {
24 // 继承了SharedInformer
25 SharedInformer
26 // 扩展了Indexer相关的接口
27 AddIndexers(indexers Indexers) error
28 GetIndexer() Indexer
29}
30// 代码源自client-go/tools/cache/controller.go,SharedInformer使用者如果需要处理资源的事件
31// 那么就要自己实现相应的回调函数
32type ResourceEventHandler interface {
33 // 添加对象回调函数
34 OnAdd(obj interface{})
35 // 更新对象回调函数
36 OnUpdate(oldObj, newObj interface{})
37 // 删除对象回调函数
38 OnDelete(obj interface{})
39}
40
41// 代码源自client-go/tools/cache/shared_informer.go
42type sharedIndexInformer struct {
43 // Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
44 // 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
45 indexer Indexer
46 controller Controller
47 // sharedIndexInformer把上面提到的ResourceEventHandler进行了在层封装,并统一由sharedProcessor管理,后面章节专门介绍
48 processor *sharedProcessor
49 // CacheMutationDetector其实没啥用,我理解是开发者自己实现的一个调试工具,用来发现对象突变的
50 // 实现方法也比较简单,DeltaFIFO弹出的对象在处理前先备份(深度拷贝)一份,然后定期比对两个对象是否相同
51 // 如果不同那就报警,说明处理过程中有人修改过对象,这个功能默认是关闭,所以我说没啥用
52 cacheMutationDetector CacheMutationDetector
53 // 这两个变量是给Reflector用的,我们知道Reflector是在Controller创建的
54 listerWatcher ListerWatcher
55 objectType runtime.Object
56 // 定期同步的周期,因为可能存在多个ResourceEventHandler,就有可能存在多个同步周期,sharedIndexInformer采用最小的周期
57 // 这个周期值就存储在resyncCheckPeriod中,通过AddEventHandler()添加的处理器都采用defaultEventHandlerResyncPeriod
58 resyncCheckPeriod time.Duration
59 defaultEventHandlerResyncPeriod time.Duration
60 // 时钟
61 clock clock.Clock
62 // 启动、停止标记,肯定有人会问为啥用两个变量,一个变量不就可以实现启动和停止了么?
63 // 其实此处是三个状态,启动前,已启动和已停止,start表示了两个状态,而且为启动标记专门做了个锁
64 // 说明启动前和启动后有互斥的资源操作
65 started, stopped bool
66 startedLock sync.Mutex
67
68 // 这个名字起的也是够了,因为DeltaFIFO每次Pop()的时候需要传入一个函数用来处理Deltas
69 // 处理Deltas也就意味着要把消息通知给处理器,如果此时调用了AddEventHandler()
70 // 就会存在崩溃的问题,所以要有这个锁,阻塞Deltas....细想名字也没毛病~
71 blockDeltas sync.Mutex
72}
CacheMutationDetector
CacheMutationDetector这个就是检测对象在过程中突变的,何所谓突变呢?突变就是莫名其妙的修改了,如何实现突变检测,也是比较简单的。CacheMutationDetector对所有的对象做了一次深度拷贝(DeepCopy),然后定期比较两个对象是否一致,当发现有不同时说明对象突变了,然后就panic。我认为CacheMutationDetector是用来调试的,因为代码默认是关闭的:
1// 代码源自client-go/tools/cache/mutation_detector.go
2// 默认关闭突变检测
3var mutationDetectionEnabled = false
4// 但是可以通过环境变量的KUBE_CACHE_MUTATION_DETECTOR开启
5func init() {
6 mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
7}
8
9// 这个是突变检测的类型抽象
10type CacheMutationDetector interface {
11 AddObject(obj interface{}) // 用于记录所有的对象
12 Run(stopCh <-chan struct{}) // 开启协程定期比对
13}
14// 创建CacheMutationDetector对象
15func NewCacheMutationDetector(name string) CacheMutationDetector {
16 // 如果没有开启选项就构造一个什么都不做的对象
17 if !mutationDetectionEnabled {
18 return dummyMutationDetector{}
19 }
20 // 如果开启了选项,那么就构造一个默认的突变检测器
21 glog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
22 return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
23}
24// 这就是什么都不做的突变检测器
25type dummyMutationDetector struct{}
26func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
27}
28func (dummyMutationDetector) AddObject(obj interface{}) {
29}
sharedProcessor
有没有感觉shared这个词被kubernetes玩儿坏了(继controller之后有一个背玩儿坏的单词),sharedProcessor这又shared啥了?首先需要知道Processor的定义,这里定义的Processor就是处理事件的东西。什么事件,就是SharedInformer向外部通知的事件。因为官方代码没有注释,我猜是shared是同一个SharedInformer,有没有很绕嘴?还有更绕的在后面呢,我们还要了解一个新的类型,那就是processorListener,processor刚说完,又来了个Listener!
通过SharedInformer.AddEventHandler()添加的处理器最终就会封装成processorListener,然后通过sharedProcessor管理起来,通过processorListener的封装就可以达到所谓的有事处理,没事挂起。
processorListener
processorListener可以理解为两个核心功能,一个是processor,一个是listener,用一句话概括,有事做事没事挂起。先看看processorListener的定义:
1// 代码源自clien-go/tools/cache/shared_informer.go
2type processorListener struct {
3 // nextCh、addCh、handler、pendingNotifications的用法请参看我的《golang的chan有趣用法》里面有相关的例子
4 // 总结这四个变量实现了事件的输入、缓冲、处理,事件就是apiserver资源的变化
5 nextCh chan interface{}
6 addCh chan interface{}
7 handler ResourceEventHandler
8 pendingNotifications buffer.RingGrowing
9 // 下面四个变量就是跟定时同步相关的了,requestedResyncPeriod是处理器设定的定时同步周期
10 // resyncPeriod是跟sharedIndexInformer对齐的同步时间,因为sharedIndexInformer管理了多个处理器
11 // 最终所有的处理器都会对齐到一个周期上,nextResync就是下一次同步的时间点
12 requestedResyncPeriod time.Duration
13 resyncPeriod time.Duration
14 nextResync time.Time
15 resyncLock sync.Mutex
16}
17
18// 代码源自client-go/tools/cache/shared_informer.go
19// 对,就这么简单,通过addCh传入,这里面的notification就是我们所谓的事件
20func (p *processorListener) add(notification interface{}) {
21 p.addCh <- notification
22}
23
24// 代码源自client-go/tools/cache/shared_informer.go
25// 这个函数是通过sharedProcessor利用wait.Group启动的,读者可以自行查看wait.Group
26func (p *processorListener) pop() {
27 defer utilruntime.HandleCrash()
28 // nextCh是在这里,函数退出前析构的
29 defer close(p.nextCh)
30 // 临时变量,下面会用到
31 var nextCh chan<- interface{}
32 var notification interface{}
33 // 进入死循环啦
34 for {
35 select {
36 // 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
37 // nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞
38 case nextCh <- notification:
39 // 如果发送成功了,那就从缓冲中再取一个事件出来
40 var ok bool
41 notification, ok = p.pendingNotifications.ReadOne()
42 if !ok {
43 // 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
44 nextCh = nil
45 }
46 // 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
47 case notificationToAdd, ok := <-p.addCh:
48 // 说明p.addCh关闭了,只能退出
49 if !ok {
50 return
51 }
52 // notification为空说明当前还没发送任何事件给处理器
53 if notification == nil {
54 // 那就把刚刚获取的事件通过p.nextCh发送个处理器
55 notification = notificationToAdd
56 nextCh = p.nextCh
57 } else {
58 // 上一个事件还没有发送成功,那就先放到缓存中
59 // pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
60 p.pendingNotifications.WriteOne(notificationToAdd)
61 }
62 }
63 }
64}
65pop()函数实现的非常巧妙,利用一个协程就把接收、缓冲、发送全部解决了。它充分的利用了golang的select可以同时操作多个chan的特性,同时从addChd读取数据从nextCh发送数据,这两个chan任何一个完成都可以激活协程。对于C/C++程序猿理解起来有点费劲,但这就是GO的魅力所在。接下来,我们看看从nextCh读取事件后是如何处理的:
66
67
68// 代码源自client-go/tools/cache/shared_informer.go
69// 这个也是sharedProcessor通过wait.Group启动的
70func (p *processorListener) run() {
71 // 因为wait.Until需要传入退出信号的chan
72 stopCh := make(chan struct{})
73 // wait.Until不多说了,我在前期不点的文章中说过了,只要没有收到退出信号就会周期的执行传入的函数
74 wait.Until(func() {
75 // wait.ExponentialBackoff()和wait.Until()类似,wait.Until()是无限循环
76 // wait.ExponentialBackoff()是尝试几次,每次等待时间会以指数上涨
77 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
78 // 这也是chan的range用法,可以参看我的《深入浅出golang的chan》了解细节
79 for next := range p.nextCh {
80 // 判断事件类型,这里面的handler就是调用SharedInfomer.AddEventHandler()传入的
81 // 理论上处理的不是Deltas么?怎么变成了其他类型,这是SharedInformer做的二次封装,后面会看到
82 switch notification := next.(type) {
83 case updateNotification:
84 p.handler.OnUpdate(notification.oldObj, notification.newObj)
85 case addNotification:
86 p.handler.OnAdd(notification.newObj)
87 case deleteNotification:
88 p.handler.OnDelete(notification.oldObj)
89 default:
90 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
91 }
92 }
93
94 return true, nil
95 })
96
97 // 执行到这里只能是nextCh已经被关闭了,所以关闭stopCh,通知wait.Until()退出
98 if err == nil {
99 close(stopCh)
100 }
101 }, 1*time.Minute, stopCh)
102}
因为processorListener其他函数没啥大用,上面两个函数就就已经把核心功能都实现了。processorListener就是实现了事件的缓冲和处理,此处的处理就是使用者传入的函数。在没有事件的时候可以阻塞处理器,当事件较多是可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。
processorListener的run()和pop()函数是sharedProcessor启动的协程调用的,所以下面就要对sharedProcessor进行分析了。
sharedProcessor管理processorListener
sharedProcessor的定义如下:
1// client-go/tools/cache/shared_informer.go
2// sharedProcessor是通过数组组织处理器的,只是分了需要定时同步和不需要要同步两类
3type sharedProcessor struct {
4 listenersStarted bool // 所有处理器是否已经启动的标识
5 listenersLock sync.RWMutex // 读写锁
6 listeners []*processorListener // 通用的处理器
7 syncingListeners []*processorListener // 需要定时同步的处理器
8 clock clock.Clock // 时钟
9 wg wait.Group // 前面讲过了processorListener每个需要两个协程,
10 // 用wait.Group来管理所有处理器的携程,保证他们都能退出
11}
12// 代码源自client-go/tools/cache/shared_informer.go
13// 添加处理器,sharedIndexInformer.AddEventHandler()就会调用这个函数实现处理器的添加
14func (p *sharedProcessor) addListener(listener *processorListener) {
15 // 加锁,这个很好理解
16 p.listenersLock.Lock()
17 defer p.listenersLock.Unlock()
18 // 把处理器添加到数组中
19 p.addListenerLocked(listener)
20 // 通过wait.Group启动两个协程,做的事情我们在processorListener说过了,这里就是我们上面提到的启动两个协程的地方
21 // 这个地方判断了listenersStarted,这说明sharedProcessor在启动前、后都可以添加处理器
22 if p.listenersStarted {
23 p.wg.Start(listener.run)
24 p.wg.Start(listener.pop)
25 }
26}
27// 把处理器添加到数组中
28func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
29 // 两类(定时同步和不同步)的处理器数组都添加了,这是因为没有定时同步的也会用默认的时间,后面我们会看到
30 // 那么问题来了,那还用两个数组干什么呢?
31 p.listeners = append(p.listeners, listener)
32 p.syncingListeners = append(p.syncingListeners, listener)
33}
34
35// 代码源自client-go/tools/cache/shared_informer.go
36// 通过函数名称也能感觉到分发的感觉~sync表示obj对象是否为同步事件对象
37func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
38 // 加锁没毛病
39 p.listenersLock.RLock()
40 defer p.listenersLock.RUnlock()
41
42 // 无论是否为sync,添加处理器的代码中我们知道两个数组都会被添加,所以判断不判断没啥区别~
43 // 所以我的猜测是代码以前实现的是明显区分两类的,但随着代码的更新二者的界限已经没那么明显了
44 if sync {
45 for _, listener := range p.syncingListeners {
46 listener.add(obj)
47 }
48 } else {
49 for _, listener := range p.listeners {
50 listener.add(obj)
51 }
52 }
53}
54// 代码源自client-go/tools/cache/shared_informer.go
55func (p *sharedProcessor) run(stopCh <-chan struct{}) {
56 // 启动前、后对于添加处理器的逻辑是不同,启动前的处理器是不会立刻启动连个协程执行处理器的pop()和run()函数的
57 // 而是在这里统一的启动
58 func() {
59 p.listenersLock.RLock()
60 defer p.listenersLock.RUnlock()
61 // 遍历所有的处理器,然后为处理器启动两个后台协程
62 for _, listener := range p.listeners {
63 p.wg.Start(listener.run)
64 p.wg.Start(listener.pop)
65 }
66 p.listenersStarted = true
67 }()
68 // 等待退出信号
69 <-stopCh
70 p.listenersLock.RLock()
71 defer p.listenersLock.RUnlock()
72 // 关闭addCh,processorListener.pop()这个协程就会退出,不明白的可以再次回顾代码
73 // 因为processorListener.pop()会关闭processorListener.nextCh,processorListener.run()就会退出
74 // 所以这里只要关闭processorListener.addCh就可以自动实现两个协程的退出,不得不说设计的还是挺巧妙的
75 for _, listener := range p.listeners {
76 close(listener.addCh)
77 }
78 // 等待所有的协程退出,这里指的所有协程就是所有处理器的那两个协程
79 p.wg.Wait()
80}
SharedInformer实现
1// 代码源自client-go/tools/cache/shared_informer.go
2// lw:这个是apiserver客户端相关的,用于Reflector从apiserver获取资源,所以需要外部提供
3// objType:这个SharedInformer监控的对象类型
4// resyncPeriod:同步周期,SharedInformer需要多长时间给使用者发送一次全量对象的同步时间
5func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
6 // 还是用SharedIndexInformer实现的
7 return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
8}
9// 创建SharedIndexInformer对象,其中大部分参数再上面的函数已经介绍了
10// indexers:需要外部提供计算对象索引键的函数,也就是这里面的对象需要通过什么方式创建索引
11func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
12 realClock := &clock.RealClock{}
13 sharedIndexInformer := &sharedIndexInformer{
14 // 管理所有处理器用的,这个上面的章节解释了
15 processor: &sharedProcessor{clock: realClock},
16 // 其实就是在构造cache,读者可以自行查看NewIndexer()的实现,
17 // 在cache中的对象用DeletionHandlingMetaNamespaceKeyFunc计算对象键,用indexers计算索引键
18 // 可以想象成每个对象键是Namespace/Name,每个索引键是Namespace,即按照Namesapce分类
19 // 因为objType决定了只有一种类型对象,所以Namesapce是最大的分类
20 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
21 // 下面这两主要就是给Controller用,确切的说是给Reflector用的
22 listerWatcher: lw,
23 objectType: objType,
24 // 无论是否需要定时同步,SharedInformer都提供了一个默认的同步时间,当然这个是外部设置的
25 resyncCheckPeriod: defaultEventHandlerResyncPeriod,
26 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
27 // 默认没有开启的对象突变检测器,没啥用,也不多介绍
28 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
29 clock: realClock,
30 }
31 return sharedIndexInformer
32}
33// 代码源自client-go/tools/cache/shared_informer.go
34// 添加没有指定同步周期的事件处理器
35func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
36 // defaultEventHandlerResyncPeriod是默认的同步周期,在创建SharedInformer的时候设置的
37 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
38}
39// 添加需要定期同步的事件处理器
40func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
41 // 因为是否已经开始对于添加事件处理器的方式不同,后面会有介绍,所以此处加了锁
42 s.startedLock.Lock()
43 defer s.startedLock.Unlock()
44
45 // 如果已经结束了,那就可以直接返回了
46 if s.stopped {
47 return
48 }
49 // 如果有同步周期,==0就是永远不用同步
50 if resyncPeriod > 0 {
51 // 同步周期不能太短,太短对于系统来说反而是个负担,大量的无效计算浪费在这上面
52 if resyncPeriod < minimumResyncPeriod {
53 resyncPeriod = minimumResyncPeriod
54 }
55 // SharedInformer管理了很多处理器,每个处理器都有自己的同步周期,所以此处要统一成一个,称之为对齐
56 // SharedInformer会选择所有处理器中最小的那个作为所有处理器的同步周期,称为对齐后的同步周期
57 // 此处就要判断是不是比当前对齐后的同步周期还要小
58 if resyncPeriod < s.resyncCheckPeriod {
59 // 如果已经启动了,那么只能用和大家一样的周期
60 if s.started {
61 resyncPeriod = s.resyncCheckPeriod
62 // 如果没启动,那就让大家都用最新的对齐同步周期
63 } else {
64 s.resyncCheckPeriod = resyncPeriod
65 s.processor.resyncCheckPeriodChanged(resyncPeriod)
66 }
67 }
68 }
69 // 创建处理器,代码一直用listener,可能想强调没事件就挂起把,我反而想用处理器这个名词
70 // determineResyncPeriod()这个函数读者自己分析把,非常简单,这里面只要知道创建了处理器就行了
71 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
72 // 如果没有启动,那么直接添加处理器就可以了
73 if !s.started {
74 s.processor.addListener(listener)
75 return
76 }
77
78 // 这个锁就是暂停再想所有的处理器分发事件用的,因为这样会遍历所有的处理器,此时添加会有风险
79 s.blockDeltas.Lock()
80 defer s.blockDeltas.Unlock()
81 // 添加处理器
82 s.processor.addListener(listener)
83 // 这里有意思啦,遍历缓冲中的所有对象,通知处理器,因为SharedInformer已经启动了,可能很多对象已经让其他的处理器处理过了,
84 // 所以这些对象就不会再通知新添加的处理器,此处就是解决这个问题的
85 for _, item := range s.indexer.List() {
86 listener.add(addNotification{newObj: item})
87 }
88}
89// 代码源自client-go/tools/cache/shared_informer.go
90// sharedIndexInformer的核心逻辑函数
91func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
92 defer utilruntime.HandleCrash()
93 // 在此处构造的DeltaFIFO
94 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
95 // 这里的Config是我们介绍Reflector时介绍的那个Config
96 cfg := &Config{
97 // 我前面一直在说Reflector输入到DeltaFIFO,这里算是直接证明了
98 Queue: fifo,
99 // 下面这些变量我们在Reflector都说了,这里赘述
100 ListerWatcher: s.listerWatcher,
101 ObjectType: s.objectType,
102 FullResyncPeriod: s.resyncCheckPeriod,
103 RetryOnError: false,
104 ShouldResync: s.processor.shouldResync,
105 // 这个才是重点,Controller调用DeltaFIFO.Pop()接口传入的就是这个回调函数,也是我们后续重点介绍的
106 Process: s.HandleDeltas,
107 }
108 // 创建Controller,这个不用多说了
109 func() {
110 s.startedLock.Lock()
111 defer s.startedLock.Unlock()
112
113 s.controller = New(cfg)
114 s.controller.(*controller).clock = s.clock
115 s.started = true
116 }()
117 // 这个processorStopCh 是给sharedProcessor和cacheMutationDetector传递退出信号的
118 // 因为这里要创建两个协程运行sharedProcessor和cacheMutationDetector的核心函数
119 processorStopCh := make(chan struct{})
120 var wg wait.Group
121 defer wg.Wait() // Wait for Processor to stop
122 defer close(processorStopCh) // Tell Processor to stop
123 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
124 wg.StartWithChannel(processorStopCh, s.processor.run)
125
126 // Run()函数都退出了,也就应该设置结束的标识了
127 defer func() {
128 s.startedLock.Lock()
129 defer s.startedLock.Unlock()
130 s.stopped = true
131 }()
132 // 启动Controller,Controller一旦运行,整个流程就开始启动了,所以叫Controller也不为过
133 // 毕竟Controller是SharedInformer的发动机嘛
134 s.controller.Run(stopCh)
135}
136
137sharedIndexInformer通过Run()函数启动了Controller和sharedProcess(),Controller通过DeltaFIFO.Pop()函数弹出Deltas,并调用函数处理,这个处理函数就是sharedIndexInformer.HandleDeltas(),这个函数是衔接Controller和sharedProcess的关键点,他把Deltas转换为sharedProcess需要的各种Notification类型。下面我们就对这个函数进行代码分析:
138
139// 代码源自client-go/tools/cache/shared_informer.go
140func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
141 // 看到这里就知道为啥起名为blockDeltas了,这是阻塞处理器Deltas啊~因为分发事件到处理器,所以要加锁
142 s.blockDeltas.Lock()
143 defer s.blockDeltas.Unlock()
144
145 // Deltas里面包含了一个对象的多个增量操作,所以要从最老的Delta到最先的Delta遍历处理
146 for _, d := range obj.(Deltas) {
147 // 根据不同的Delta做不同的操作,但是大致分为对象添加、删除两大类操作
148 // 所有的操作都要先同步到cache在通知处理器,这样保持处理器和cache的状态是一致的
149 switch d.Type {
150 // 同步、添加、更新都是对象添加类的造作,至于是否是更新还要看cache是否有这个对象
151 case Sync, Added, Updated:
152 // 看看对象是不是有定时同步产生的事件
153 isSync := d.Type == Sync
154 // 检测突变,没啥用
155 s.cacheMutationDetector.AddObject(d.Object)
156 // 如果cache中有的对象,一律看做是更新事件
157 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
158 // 把对象更新到cache中
159 if err := s.indexer.Update(d.Object); err != nil {
160 return err
161 }
162 // 通知处理器处理事件
163 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
164 // cache中没有的对象,一律看做是新增事件
165 } else {
166 // 把对象添加到cache中
167 if err := s.indexer.Add(d.Object); err != nil {
168 return err
169 }
170 // 通知处理器处理器事件
171 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
172 }
173 // 对象被删除
174 case Deleted:
175 // 从cache中删除对象
176 if err := s.indexer.Delete(d.Object); err != nil {
177 return err
178 }
179 // 通知所有的处理器对象被删除了
180 s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
181 }
182 }
183 return nil
184}
总结
-
利用apiserver的api实现资源的列举和监控(Reflector实现);
-
利用cache存储apiserver中的部分对象,通过对象类型进行制定,并在cache中采用Namespace做对象的索引
-
先通过apiserver的api将对象的全量列举出来存储在cache中,然后再watch资源,一旦有变化就更新cache中;
-
更新到cache中的过程通过DeltaFIFO实现的有顺序的更新,因为资源状态是通过全量+增量方式实现同步的,所以顺序错误会造成状态不一致;
-
使用者可以注册回调函数(类似挂钩子),在更新到cache的同时通知使用者处理,为了保证回调处理不被某一个处理器阻塞,SharedInformer实现了processorListener异步缓冲处理;
-
整个过程是Controller是发动机,驱动整个流程运转;
用一幅图来总结SharedInformer(其中Reflector.resync()因为是个匿名函数,所以用斜体,其实是不存在这个函数的)
ListerWatcher
Lister用于获取某个资源(如Pod)的全量,Watcher用于获取某个资源的增量变化。实际使用中Lister和Watcher都从apiServer获取资源信息,Lister一般用于首次获取某资源(如Pod)的全量信息,而Watcher用于持续获取该资源的增量变化信息。Lister和Watcher的接口定义如下,使用NewListWatchFromClient函数来初始化ListerWatcher
1// client-go/tools/cache/listwatch.go
2type Lister interface {
3 // List should return a list type object; the Items field will be extracted, and the
4 // ResourceVersion field will be used to start the watch in the right place.
5 List(options metav1.ListOptions) (runtime.Object, error)
6}
7
8// Watcher is any object that knows how to start a watch on a resource.
9type Watcher interface {
10 // Watch should begin a watch at the specified version.
11 Watch(options metav1.ListOptions) (watch.Interface, error)
12}
13
14// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
15type ListerWatcher interface {
16 Lister
17 Watcher
18}
在workqueue的例子中可以看到调用NewListWatchFromClient的地方,该例子会从clientset.CoreV1().RESTClient()获取"pods"的相关信息。
ListerWatcher是针对某一类对象的,比如Pod,不是所有对象的,这个在构造ListerWatcher对象的时候由apiserver的client类型决定了。
1// client-go/examples/workqueue/main.go
2// create the pod watcher
3podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
cache.NewListWatchFromClient函数中的资源名称可以从types.go中获得
1/ k8s.io/api/core/v1/types.go
2const (
3 // Pods, number
4 ResourcePods ResourceName = "pods"
5 // Services, number
6 ResourceServices ResourceName = "services"
7 // ReplicationControllers, number
8 ResourceReplicationControllers ResourceName = "replicationcontrollers"
9 // ResourceQuotas, number
10 ResourceQuotas ResourceName = "resourcequotas"
11 // ResourceSecrets, number
12 ResourceSecrets ResourceName = "secrets"
13 // ResourceConfigMaps, number
14 ResourceConfigMaps ResourceName = "configmaps"
15 // ResourcePersistentVolumeClaims, number
16 ResourcePersistentVolumeClaims ResourceName = "persistentvolumeclaims"
17 // ResourceServicesNodePorts, number
18 ResourceServicesNodePorts ResourceName = "services.nodeports"
19 // ResourceServicesLoadBalancers, number
20 ResourceServicesLoadBalancers ResourceName = "services.loadbalancers"
21 // CPU request, in cores. (500m = .5 cores)
22 ResourceRequestsCPU ResourceName = "requests.cpu"
23 // Memory request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
24 ResourceRequestsMemory ResourceName = "requests.memory"
25 // Storage request, in bytes
26 ResourceRequestsStorage ResourceName = "requests.storage"
27 // Local ephemeral storage request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
28 ResourceRequestsEphemeralStorage ResourceName = "requests.ephemeral-storage"
29 // CPU limit, in cores. (500m = .5 cores)
30 ResourceLimitsCPU ResourceName = "limits.cpu"
31 // Memory limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
32 ResourceLimitsMemory ResourceName = "limits.memory"
33 // Local ephemeral storage limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
34 ResourceLimitsEphemeralStorage ResourceName = "limits.ephemeral-storage"
35)
除了可以从CoreV1版本的API group获取RESTClient信息外,还可以从下面Clientset结构体定义的API group中获取信息
1// client-go/kubernetes/clientset.go
2type Clientset struct {
3 *discovery.DiscoveryClient
4 admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
5 appsV1 *appsv1.AppsV1Client
6 appsV1beta1 *appsv1beta1.AppsV1beta1Client
7 appsV1beta2 *appsv1beta2.AppsV1beta2Client
8 auditregistrationV1alpha1 *auditregistrationv1alpha1.AuditregistrationV1alpha1Client
9 authenticationV1 *authenticationv1.AuthenticationV1Client
10 authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client
11 authorizationV1 *authorizationv1.AuthorizationV1Client
12 authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client
13 autoscalingV1 *autoscalingv1.AutoscalingV1Client
14 autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client
15 autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client
16 batchV1 *batchv1.BatchV1Client
17 batchV1beta1 *batchv1beta1.BatchV1beta1Client
18 batchV2alpha1 *batchv2alpha1.BatchV2alpha1Client
19 certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client
20 coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client
21 coordinationV1 *coordinationv1.CoordinationV1Client
22 coreV1 *corev1.CoreV1Client
23 eventsV1beta1 *eventsv1beta1.EventsV1beta1Client
24 extensionsV1beta1 *extensionsv1beta1.ExtensionsV1beta1Client
25 networkingV1 *networkingv1.NetworkingV1Client
26 networkingV1beta1 *networkingv1beta1.NetworkingV1beta1Client
27 nodeV1alpha1 *nodev1alpha1.NodeV1alpha1Client
28 nodeV1beta1 *nodev1beta1.NodeV1beta1Client
29 policyV1beta1 *policyv1beta1.PolicyV1beta1Client
30 rbacV1 *rbacv1.RbacV1Client
31 rbacV1beta1 *rbacv1beta1.RbacV1beta1Client
32 rbacV1alpha1 *rbacv1alpha1.RbacV1alpha1Client
33 schedulingV1alpha1 *schedulingv1alpha1.SchedulingV1alpha1Client
34 schedulingV1beta1 *schedulingv1beta1.SchedulingV1beta1Client
35 schedulingV1 *schedulingv1.SchedulingV1Client
36 settingsV1alpha1 *settingsv1alpha1.SettingsV1alpha1Client
37 storageV1beta1 *storagev1beta1.StorageV1beta1Client
38 storageV1 *storagev1.StorageV1Client
39 storageV1alpha1 *storagev1alpha1.StorageV1alpha1Client
40}
RESTClient()的返回值为Interface接口类型,该类型中包含如下对资源的操作方法,如Get()就封装了HTTP的Get方法。NewListWatchFromClient初始化ListWatch的时候使用了Get方法
1// client-go/rest/client.go
2type Interface interface {
3 GetRateLimiter() flowcontrol.RateLimiter
4 Verb(verb string) *Request
5 Post() *Request
6 Put() *Request
7 Patch(pt types.PatchType) *Request
8 Get() *Request
9 Delete() *Request
10 APIVersion() schema.GroupVersion
11}
Controller
controller的结构如下,其包含一个配置变量config,在注释中可以看到Config.Queue就是DeltaFIFO。controller定义了如何调度Reflector。
此controller非我们比较熟悉的controller-manager管理的各种各样的controller,kubernetes里面controller简直是泛滥啊。这里的controller定义在client-go/tools/cache/controller.go中,目的是用来把Reflector、DeltaFIFO组合起来形成一个相对固定的、标准的处理流程。理解了Controller,基本就算把SharedInfomer差不多搞懂了。话不多说,先上代码:
1// 代码源自client-go/tools/cache/controller.go
2type Config struct {
3 Queue // SharedInformer使用DeltaFIFO
4 ListerWatcher // 这个用来构造Reflector
5 Process ProcessFunc // 这个在调用DeltaFIFO.Pop()使用,弹出对象要如何处理
6 ObjectType runtime.Object // 对象类型,这个肯定是Reflector使用
7 FullResyncPeriod time.Duration // 全量同步周期,这个在Reflector使用
8 ShouldResync ShouldResyncFunc // Reflector在全量更新的时候会调用该函数询问
9 RetryOnError bool // 错误是否需要尝试
10}
11
12
13
14// 这是一个Controller的抽象
15type Controller interface {
16 Run(stopCh <-chan struct{}) // 核心流程函数
17 HasSynced() bool // apiserver中的对象是否已经同步到了Store中
18 LastSyncResourceVersion() string // 最新的资源版本号
19}
20
21// 代码源自client-go/tools/cache/controller.go
22// controller是Controller的实现类型
23type controller struct {
24 config Config // 配置,上面有讲解
25 reflector *Reflector // 反射器
26 reflectorMutex sync.RWMutex // 反射器的锁
27 clock clock.Clock // 时钟
28}
29// 核心业务逻辑实现
30func (c *controller) Run(stopCh <-chan struct{}) {
31 defer utilruntime.HandleCrash()
32 // 创建一个协程,如果收到系统退出的信号就关闭队列,相当于在这里析构的队列
33 go func() {
34 <-stopCh
35 c.config.Queue.Close()
36 }()
37 // 创建Reflector,传入的参数都是我们上一个章节解释过的,这里不赘述
38 r := NewReflector(
39 c.config.ListerWatcher,
40 c.config.ObjectType,
41 c.config.Queue,
42 c.config.FullResyncPeriod,
43 )
44 // r.ShouldResync的存在就是为了以后使用少些一点代码?否则直接使用c.config.ShouldResync不就完了么?不明白用意
45 r.ShouldResync = c.config.ShouldResync
46 r.clock = c.clock
47 // 记录反射器
48 c.reflectorMutex.Lock()
49 c.reflector = r
50 c.reflectorMutex.Unlock()
51 // wait.Group不是本章的讲解内容,只要把它理解为类似barrier就行了
52 // 被他管理的所有的协程都退出后调用Wait()才会退出,否则就会被阻塞
53 var wg wait.Group
54 defer wg.Wait()
55 // StartWithChannel()会启动协程执行Reflector.Run(),同时接收到stopCh信号就会退出协程
56 wg.StartWithChannel(stopCh, r.Run)
57 // wait.Until()在前面的章节讲过了,周期性的调用c.processLoop(),这里来看是1秒
58 // 不用担心调用频率太高,正常情况下c.processLoop是不会返回的,除非遇到了解决不了的错误,因为他是个循环
59 wait.Until(c.processLoop, time.Second, stopCh)
60}
61
62// 代码源自client-go/tools/cache/controller.go
63func (c *controller) processLoop() {
64 for {
65 // 从队列中弹出一个对象,然后处理它,这才是最主要的部分,这个c.config.Process是构造Controller的时候通过Config传进来的
66 // 所以这个读者要特别注意了,这个函数其实是ShareInformer传进来的,所以在分析SharedInformer的时候要重点分析的
67 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
68 if err != nil {
69 // 如果FIFO关闭了那就退出
70 if err == FIFOClosedError {
71 return
72 }
73 // 如果错误可以再试试
74 if c.config.RetryOnError {
75 c.config.Queue.AddIfNotPresent(obj)
76 }
77 }
78 }
79}
80
81// 代码源自client-go/tools/cache/controller.go
82// HasSynced() 调用的就是DeltaFIFO.HasSynced()实现的
83func (c *controller) HasSynced() bool {
84 return c.config.Queue.HasSynced()
85}
86// LastSyncResourceVersion() 是利用Reflector实现的
87func (c *controller) LastSyncResourceVersion() string {
88 if c.reflector == nil {
89 return ""
90 }
91 return c.reflector.LastSyncResourceVersion()
92}
controller的框架比较简单它使用wg.StartWithChannel启动Reflector.Run,相当于启动了一个DeltaFIFO的生产者(wg.StartWithChannel(stopCh, r.Run)表示可以将r.Run放在独立的协程运行,并可以使用stopCh来停止r.Run);使用wait.Until来启动一个消费者(wait.Until(c.processLoop, time.Second, stopCh)表示每秒会触发一次c.processLoop,但如果c.processLoop在1秒之内没有结束,则运行c.processLoop继续运行,不会结束其运行状态)
processLoop的框架也很简单,它运行了DeltaFIFO.Pop函数,用于消费DeltaFIFO中的对象,并在DeltaFIFO.Pop运行失败后可能重新处理该对象(AddIfNotPresent)
SharedInformerFactory 源码
介绍
SharedInformerFactory就是构造各种Informer的地方,每个SharedInformer其实只负责一种对象,在构造SharedInformer的时候指定了对象类型。SharedInformerFactory可以构造Kubernetes里所有对象的Informer,而且主要用在controller-manager这个服务中。因为controller-manager负责管理绝大部分controller,每类controller不仅需要自己关注的对象的informer,同时也可能需要其他对象的Informer(比如ReplicationController也需要PodInformer,否则他无法感知Pod的启动和关闭,也就达不到监控的目的了),所以一个SharedInformerFactory可以让所有的controller共享使用同一个类对象的Informer。 虽然有同名的,但是他们在不同的包中,虽然代码上有很多相似的地方,但是确实是完全独立的对象。
SharedInformerFactory
1// 代码源自client-go/informers/factory.go
2// SharedInformerFactory是个interfaces,所以肯定有具体的实现类
3type SharedInformerFactory interface {
4 // 在informers这个包中又定义了一个SharedInformerFactory,这个主要是包内抽象,所以此处继承了这个接口
5 internalinterfaces.SharedInformerFactory
6 // 这个暂时不知道干啥用,所以我也不介绍他了
7 ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
8 // 等待所有的Informer都已经同步完成,这里同步其实就是遍历调用SharedInformer.HasSynced()
9 // 所以函数需要周期性的调用指导所有的Informer都已经同步完毕
10 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
11
12 Admissionregistration() admissionregistration.Interface // 返回admissionregistration相关的Informer组
13 Apps() apps.Interface // 返回app相关的Informer组
14 Autoscaling() autoscaling.Interface // 返回autoscaling相关的Informer组
15 Batch() batch.Interface // 返回job相关的Informer组
16 Certificates() certificates.Interface // 返回certificates相关的Informer组
17 Coordination() coordination.Interface // 返回coordination相关的Informer组
18 Core() core.Interface // 返回core相关的Informer组
19 Events() events.Interface // 返回event相关的Informer组
20 Extensions() extensions.Interface // 返回extension相关的Informer组
21 Networking() networking.Interface // 返回networking相关的Informer组
22 Policy() policy.Interface // 返回policy相关的Informer组
23 Rbac() rbac.Interface // 返回rbac相关的Informer组
24 Scheduling() scheduling.Interface // 返回scheduling相关的Informer组
25 Settings() settings.Interface // 返回settings相关的Informer组
26 Storage() storage.Interface // 返回storage相关的Informer组
27}
28
29// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
30type SharedInformerFactory interface {
31 // 核心逻辑函数,类似于很多类的Run()函数
32 Start(stopCh <-chan struct{})
33 // 这个很关键,通过对象类型,返回SharedIndexInformer,这个SharedIndexInformer管理的就是指定的对象
34 // NewInformerFunc用于当SharedInformerFactory没有这个类型的Informer的时候创建使用
35 InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
36}
37// 创建Informer的函数定义,这个函数需要apiserver的客户端以及同步周期,这个同步周期在SharedInformers反复提到
38type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
39
40
41// 代码源自client-go/informers/factory.go
42type sharedInformerFactory struct {
43 // apiserver的客户端,暂时不用关心怎么实现的,只要知道他能列举和监听资源就可以了
44 client kubernetes.Interface
45 // 哈哈,这样看来每个namesapce需要一个SharedInformerFactory,那cache用namespace建索引还有啥用呢?
46 // 并不是所有的使用者都需要指定namesapce,比如kubectl,他就可以列举所有namespace的资源,所以他没有指定namesapce
47 namespace string
48 // 这是个函数指针,用来调整列举选项的,这个选项用来client列举对象使用
49 tweakListOptions internalinterfaces.TweakListOptionsFunc
50 // 互斥锁
51 lock sync.Mutex
52 // 默认的同步周期,这个在SharedInformer需要用
53 defaultResync time.Duration
54 // 每个类型的Informer有自己自定义的同步周期
55 customResync map[reflect.Type]time.Duration
56 // 每类对象一个Informer,但凡使用SharedInformerFactory构建的Informer同一个类型其实都是同一个Informer
57 informers map[reflect.Type]cache.SharedIndexInformer
58 // 各种Informer启动的标记
59 startedInformers map[reflect.Type]bool
60}
61// 代码源自client-go/tools/cache/shared_informer.go
62// 这是一个通用的构造SharedInformerFactory的接口函数,没有任何其他的选项,只包含了apiserver的client以及同步周期
63func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
64 // 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非没有选项而已
65 return NewSharedInformerFactoryWithOptions(client, defaultResync)
66}
67// 相比于上一个通用的构造函数,这个构造函数增加了namesapce过滤和调整列举选项
68func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
69 // 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非选项是2个
70 // WithNamespace()和WithTweakListOptions()会在后文讲解
71 return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
72}
73// 到了构造SharedInformerFactory核心函数了,其实SharedInformerOption是个有意思的东西
74// 我们写程序喜欢Option是个结构体,但是这种方式的扩展很麻烦,这里面用的是回调函数,这个让我get到新技能了
75func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
76 // 默认只有apiserver的client以及同步周期是需要外部提供的其他的都是可以有默认值的
77 factory := &sharedInformerFactory{
78 client: client,
79 namespace: v1.NamespaceAll,
80 defaultResync: defaultResync,
81 informers: make(map[reflect.Type]cache.SharedIndexInformer),
82 startedInformers: make(map[reflect.Type]bool),
83 customResync: make(map[reflect.Type]time.Duration),
84 }
85
86 //逐一遍历各个选项函数,opt是选项函数,下面面有详细介绍
87 for _, opt := range options {
88 factory = opt(factory)
89 }
90
91 return factory
92}
93// 代码源自client-go/informers/factory.go
94// 这个是SharedInformerFactory构造函数的选项,是一个函数指针,传入的是工厂指针,返回也是工厂指针
95// 很明显,选项函数直接修改工厂对象,然后把修改的对象返回就可以了
96type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
97// 把每个对象类型的同步周期这个参数转换为SharedInformerOption类型
98func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
99 // 这个实现很简单了,我就不多解释了
100 return func(factory *sharedInformerFactory) *sharedInformerFactory {
101 for k, v := range resyncConfig {
102 factory.customResync[reflect.TypeOf(k)] = v
103 }
104 return factory
105 }
106}
107// 这个选项函数用于使用者自定义client的列举选项的
108func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
109 return func(factory *sharedInformerFactory) *sharedInformerFactory {
110 factory.tweakListOptions = tweakListOptions
111 return factory
112 }
113}
114// 这个选项函数用来设置namesapce过滤的
115func WithNamespace(namespace string) SharedInformerOption {
116 return func(factory *sharedInformerFactory) *sharedInformerFactory {
117 factory.namespace = namespace
118 return factory
119 }
120}
121
122// 代码源自client-go/informers/factory.go
123// 其实sharedInformerFactory的Start()函数就是启动所有具体类型的Informer的过程
124// 因为每个类型的Informer都是SharedIndexInformer,需要需要把每个SharedIndexInformer都要启动起来
125func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
126 // 加锁操作
127 f.lock.Lock()
128 defer f.lock.Unlock()
129 // 遍历informers这个map
130 for informerType, informer := range f.informers {
131 // 看看这个Informer是否已经启动过
132 if !f.startedInformers[informerType] {
133 // 如果没启动过,那就启动一个协程执行SharedIndexInformer的Run()函数,我们在分析SharedIndexInformer的时候
134 // 我们知道知道Run()是整个Informer的启动入口点,看了《深入浅出kubernetes之client-go的SharedInformer》
135 // 的同学应该会想Run()是谁调用的呢?这里面应该给你们答案了吧?
136 go informer.Run(stopCh)
137 // 设置Informer已经启动的标记
138 f.startedInformers[informerType] = true
139 }
140 }
141}
142
143// 代码源自client-go/informers/factory.go
144// InformerFor()相当于每个类型Informer的构造函数了,即便具体实现构造的地方是使用者提供的
145// 这个函数需要使用者传入对象类型,因为在sharedInformerFactory里面是按照对象类型组织的Informer
146// 更有趣的是这些Informer不是sharedInformerFactory创建的,需要使用者传入构造函数
147// 这样做既保证了每个类型的Informer只构造一次,同时又保证了具体Informer构造函数的私有化能力
148func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
149 // 加锁操作
150 f.lock.Lock()
151 defer f.lock.Unlock()
152 // 通过反射获取obj的类型
153 informerType := reflect.TypeOf(obj)
154 // 看看这个类型的Informer是否已经创建了?
155 informer, exists := f.informers[informerType]
156 // 如果Informer已经创建,那么就复用这个Informer
157 if exists {
158 return informer
159 }
160 // 获取这个类型定制的同步周期,如果定制的同步周期那就用统一的默认周期
161 resyncPeriod, exists := f.customResync[informerType]
162 if !exists {
163 resyncPeriod = f.defaultResync
164 }
165 // 调用使用者提供构造函数,然后把创建的Informer保存起来
166 informer = newFunc(f.client, resyncPeriod)
167 f.informers[informerType] = informer
168
169 return informer
170}
171// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
172// 这个函数定义就是具体类型Informer的构造函数,后面会有地方说明如何使用
173type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
例子:PodInformer
1// 代码源自client-go/informers/core/v1/pod.go
2// 这个PodInformer是抽象类,Informer()就是获取SharedIndexInformer的接口函数
3type PodInformer interface {
4 Informer() cache.SharedIndexInformer
5 Lister() v1.PodLister
6}
7// 这个是PodInformer的实现类,看到了没,他需要工厂对象的指针,貌似明细了很多把?
8type podInformer struct {
9 factory internalinterfaces.SharedInformerFactory
10 tweakListOptions internalinterfaces.TweakListOptionsFunc
11 namespace string
12}
13// 这个就是要传入工厂的构造函数了
14func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
15 return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
16}
17// 这个是实现Informer()的地方,看到了把,这里面调用了工厂的InformerFor把自己注册进去
18func (f *podInformer) Informer() cache.SharedIndexInformer {
19 return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
20}
21
22也就是说SharedInformerFactory的使用者使用Core().Pod() 这个接口构造了PodInformer,但是需要调用PodInformer.Informer()才能获取到的SharedIndexInformer,而正是这个接口实现了向工厂注册自己。既然已经涉及到了具体的Informer,我们就开始看看每个都是干啥的吧?
23
各种group之Core
client-go为了方便管理,把Informer分类管理,具体的分类在开篇那个图里面已经展示了
1// 代码源自client-go/informers/factory.go
2func (f *sharedInformerFactory) Core() core.Interface {
3 // 调用了内核包里面的New()函数,详情见下文分析
4 return core.New(f, f.namespace, f.tweakListOptions)
5}
6// 代码源自client-go/informers/core/interface.go
7// Interface又是一个被玩坏的名字,如果没有报名,根本不知道干啥的
8type Interface interface {
9 V1() v1.Interface // 只有V1一个版本
10}
11// 这个是Interface的实现类,从名字上没任何关联吧?其实开发者命名也是挺有意思的,Interface定义的是接口
12// 供外部使用,group也有意义,因为Core确实是内核Informer的分组
13type group struct {
14 // 需要工厂对象的指针
15 factory internalinterfaces.SharedInformerFactory
16 // 这两个变量决定了Core这个分组对于SharedInformerFactory来说只有以下两个选项
17 namespace string
18 tweakListOptions internalinterfaces.TweakListOptionsFunc
19}
20// 构造Interface的接口
21func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
22 // 代码也挺简单的,不多说了
23 return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
24}
25// 实现V1()这个接口的函数
26func (g *group) V1() v1.Interface {
27 // 通过调用v1包的New()函数实现的,下面会有相应代码的分析
28 return v1.New(g.factory, g.namespace, g.tweakListOptions)
29}
30
31// 代码源自client-go/informers/core/v1/interface.go
32// 还是抽象类
33type Interface interface {
34 // 获取ComponentStatusInformer
35 ComponentStatuses() ComponentStatusInformer
36 // 获取ConfigMapInformer
37 ConfigMaps() ConfigMapInformer
38 // 获取EndpointsInformer
39 Endpoints() EndpointsInformer
40 // 获取EventInformer
41 Events() EventInformer
42 // 获取LimitRangeInformer
43 LimitRanges() LimitRangeInformer
44 // 获取NamespaceInformer
45 Namespaces() NamespaceInformer
46 // 获取NodeInformer
47 Nodes() NodeInformer
48 // 获取PersistentVolumeInformer
49 PersistentVolumes() PersistentVolumeInformer
50 // 获取PersistentVolumeClaimInformer
51 PersistentVolumeClaims() PersistentVolumeClaimInformer
52 // 获取PodInformer
53 Pods() PodInformer
54 // 获取PodTemplateInformer
55 PodTemplates() PodTemplateInformer
56 // 获取ReplicationControllerInformer
57 ReplicationControllers() ReplicationControllerInformer
58 // 获取ResourceQuotaInformer
59 ResourceQuotas() ResourceQuotaInformer
60 // 获取SecretInformer
61 Secrets() SecretInformer
62 // 获取ServiceInformer
63 Services() ServiceInformer
64 // 获取ServiceAccountInformer
65 ServiceAccounts() ServiceAccountInformer
66}
67// 这个就是上面抽象类的实现了,这个和Core分组的命名都是挺有意思,分组用group作为实现类名
68// 这个用version作为实现类名,确实这个是V1版本
69type version struct {
70 // 工厂的对象指针
71 factory internalinterfaces.SharedInformerFactory
72 // 两个选项,不多说了,说了好多遍了
73 namespace string
74 tweakListOptions internalinterfaces.TweakListOptionsFunc
75}
76// 这个就是Core分组V1版本的构造函数啦
77func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
78 // 应该好理解吧?
79 return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
80}
Core分组有管理了很多Informer,每一个Informer负责获取相应类型的对象。
Core分组之PodInformer
PodInformer是通过Core分组Pods()创建的
1// 代码源自client-go/informers/core/v1/interface.go
2// 上面我们已经说过了version是v1.Interface的实现
3func (v *version) Pods() PodInformer {
4 // 返回了podInformer的对象,说明podInformer是PodInformer 实现类
5 return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
6}
7
8// 代码源自client-go/informers/core/v1/pod.go
9// PodInformer定义了两个接口,分别为Informer()和Lister(),Informer()用来获取SharedIndexInformer对象
10// Lister()用来获取PodLister对象,这个后面会有说明,当前可以不用关心
11type PodInformer interface {
12 Informer() cache.SharedIndexInformer
13 Lister() v1.PodLister
14}
15// PodInformer的实现类,参数都是上面层层传递下来的,这里不说了
16type podInformer struct {
17 factory internalinterfaces.SharedInformerFactory
18 tweakListOptions internalinterfaces.TweakListOptionsFunc
19 namespace string
20}
21// 这个就是需要传递给SharedInformerFactory的构造函数啦,前面也提到了
22func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
23 // NewFilteredPodInformer下面有代码注释
24 return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
25}
26// 实现了PodInformer.Informer()接口函数
27func (f *podInformer) Informer() cache.SharedIndexInformer {
28 // 此处调用了工厂实现了Informer的创建
29 return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
30}
31// 实现了PodInformer.Lister()接口函数
32func (f *podInformer) Lister() v1.PodLister {
33 return v1.NewPodLister(f.Informer().GetIndexer())
34}
35// 真正创建PodInformer的函数
36func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
37 // 还有谁记得构造SharedIndexInformer需要写啥?自己温习《深入浅出kubernetes之client-go的SharedInformer》
38 return cache.NewSharedIndexInformer(
39 // 需要ListWatch两个函数,就是用apiserver的client实现的,此处不重点解释每个代码什么意思
40 // 读者应该能够看懂是利用client实现了Pod的List和Watch
41 &cache.ListWatch{
42 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
43 if tweakListOptions != nil {
44 tweakListOptions(&options)
45 }
46 return client.CoreV1().Pods(namespace).List(options)
47 },
48 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
49 if tweakListOptions != nil {
50 tweakListOptions(&options)
51 }
52 return client.CoreV1().Pods(namespace).Watch(options)
53 },
54 },
55 // 这个是要传入对象的类型,肯定是Pod了
56 &corev1.Pod{},
57 // 同步周期
58 resyncPeriod,
59 // 对象键的计算函数
60 indexers,
61 )
62}
SharedInformerFactory使用
下面以(Core,v1,podInformer)为例结合client-go中提供的代码进行讲解。代码如下,在调用informers.Core().V1().Pods().Informer()的时候会同时调用informers.InformerFor注册到sharedInformerFactory,后续直接调用informers.Start启动注册的informer。
1// client-go/examples/fake-client/main_test.go
2func TestFakeClient(t *testing.T) {
3 ctx, cancel := context.WithCancel(context.Background())
4 defer cancel()
5
6 // Create the fake client.
7 client := fake.NewSimpleClientset()
8
9 // We will create an informer that writes added pods to a channel.
10 pods := make(chan *v1.Pod, 1)
11 informers := informers.NewSharedInformerFactory(client, 0) //创建一个新的shareInformerFactory
12 podInformer := informers.Core().V1().Pods().Informer() //创建一个podInformer,并调用InformerFor函数进行注册
13 podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
14 AddFunc: func(obj interface{}) {
15 pod := obj.(*v1.Pod)
16 t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
17 pods <- pod
18 },
19 })
20
21 // Make sure informers are running.
22 informers.Start(ctx.Done()) //启动所有的informer
23 ...
Informer(告密,通知)
包缓存中的基本控制器中定义的一个Informer从Delta Fifo队列中弹出对象。执行此操作的函数是processLoop。这个基本控制器的工作是保存对象以便以后检索,并调用我们的控制器将对象传递给它。
controller机制的基础
- 循环处理object对象
- 从Reflector取出数据,然后将数据给到Indexer去缓存
- 提供对象事件的handler接口
Client-go包中一个相对较为高端的设计在于Informer的设计,我们知道我们可以直接通过Kubernetes API交互,但是考虑一点就是交互的形式,Informer设计为List/Watch的方式。Informer在初始化的时先通过List去从Kubernetes API中取出资源的全部object对象,并同时缓存,然后后面通过Watch的机制去监控资源,这样的话,通过Informer及其缓存,我们就可以直接和Informer交互而不是每次都和Kubernetes API交互。
Informer另外一块内容在于提供了事件handler机制,并会触发回调,这样上层应用如Controller就可以基于回调处理具体业务逻辑。因为Informer通过List、Watch机制可以监控到所有资源的所有事件,因此只要给Informer添加ResourceEventHandler 实例的回调函数实例取实现OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{})这三个方法,就可以处理好资源的创建、更新和删除操作
Kubernetes中都是各种controller的实现,各种controller都会用到Informer。
1# tools/cache/controller.go
2
3type ResourceEventHandler interface {
4 OnAdd(obj interface{})
5 OnUpdate(oldObj, newObj interface{})
6 OnDelete(obj interface{})
7}
8
9// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
10// as few of the notification functions as you want while still implementing
11// ResourceEventHandler.
12type ResourceEventHandlerFuncs struct {
13 AddFunc func(obj interface{})
14 UpdateFunc func(oldObj, newObj interface{})
15 DeleteFunc func(obj interface{})
16}
Indexer(索引器)
Indexer为对象提供索引功能。它是在包缓存中的类型索引器中定义的。一个典型的索引用例是基于对象标签创建索引。索引器可以基于多个索引函数维护索引。索引器使用线程安全的数据存储来存储对象及其密钥。包缓存中的类型存储中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数将对象的键生成为该对象的<namespace>/<name>
组合。
提供object对象的索引,是线程安全的,缓存对象信息
Indexer源码
1# tools/cache/index.go
2// 索引器是一个存储界面,使您可以使用多个索引功能列出对象。
3// 这里有三种字符串。
4// 一个是存储密钥,如存储界面中所定义。
5// 另一种是索引的名称。
6// 第三类字符串是“索引值”,它是由
7// IndexFunc,可以是字段值或从对象计算出的任何其他字符串。
8type Indexer interface {
9 Store // 此处继承了Store这个interface,定义在cliet-go/tool/cache/store.go中
10 // Index returns the stored objects whose set of indexed values
11 // intersects the set of indexed values of the given object, for
12 // the named index
13 Index(indexName string, obj interface{}) ([]interface{}, error)
14 // IndexKeys returns the storage keys of the stored objects whose
15 // set of indexed values for the named index includes the given
16 // indexed value
17 IndexKeys(indexName, indexedValue string) ([]string, error)
18 // ListIndexFuncValues returns all the indexed values of the given index
19 ListIndexFuncValues(indexName string) []string
20 // ByIndex returns the stored objects whose set of indexed values
21 // for the named index includes the given indexed value
22 ByIndex(indexName, indexedValue string) ([]interface{}, error)
23 // GetIndexer return the indexers
24 GetIndexers() Indexers
25
26 // AddIndexers adds more indexers to this store. If you call this after you already have data
27 // in the store, the results are undefined.
28 AddIndexers(newIndexers Indexers) error
29}
30// Indexer在Store基础上扩展了索引能力,那Indexer是如何实现索引的呢?让我们来看看几个非常关键的类型:
31// IndexFunc知道如何计算对象的一组索引值。计算索引的函数,传入对象,输出字符串索引
32type IndexFunc func(obj interface{}) ([]string, error)
33
34// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
35// unique values for every object. This conversion can create errors when more than one key is found. You
36// should prefer to make proper key and index functions.
37func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
38 return func(obj interface{}) (string, error) {
39 indexKeys, err := indexFunc(obj)
40 if err != nil {
41 return "", err
42 }
43 if len(indexKeys) > 1 {
44 return "", fmt.Errorf("too many keys: %v", indexKeys)
45 }
46 if len(indexKeys) == 0 {
47 return "", fmt.Errorf("unexpected empty indexKeys")
48 }
49 return indexKeys[0], nil
50 }
51}
52
53const (
54 // NamespaceIndex is the lookup name for the most comment index function, which is to index by the namespace field.
55 NamespaceIndex string = "namespace"
56)
57
58// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
59func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
60 meta, err := meta.Accessor(obj)
61 if err != nil {
62 return []string{""}, fmt.Errorf("object has no meta: %v", err)
63 }
64 return []string{meta.GetNamespace()}, nil
65}
66
67// 每种计算索引的方式会输出多个索引(数组),而多个目标可能会算出相同索引,所以就有了这个类型
68// Index maps the indexed value to a set of keys in the store that match on that value
69type Index map[string]sets.String
70
71// 计算索引的函数有很多,用名字分类
72// Indexers maps a name to a IndexFunc
73type Indexers map[string]IndexFunc
74
75// 由于有多种计算索引的方式,那就又要按照计算索引的方式组织索引
76// Indices maps a name to an Index
77type Indices map[string]Index
定义这些类型的目的。何所谓索引,索引目的就是为了快速查找。比如,我们需要查找某个节点上的所有Pod,那就要Pod按照节点名称排序,对应上面的Index类型就是map[nodename]sets.podname。我们可能有很多种查找方式,这就是Indexers这个类型作用了。下面的图帮助读者理解,不代表真正实现: Indexers和Indices都是按照IndexFunc(名字)分组, 每个IndexFunc输出多个IndexKey,产生相同IndexKey的多个对象存储在一个集合中。注意:上图中不代表Indexers和Indices都指向了同一个数据,只是示意都用相同的IndexFunc的名字。
为了方便后面内容的开展,我们先统一一些概念:
- IndexFunc1.....这些都是索引函数的名称,我们称之为索引类,大概意思就是把索引分类了;
- IndexKey1....这些是同一个对象在同一个索引类中的多个索引键值,我们称为索引键,切记索引键有多个;
- ObjKey1.....这些是对象键,每个对象都有唯一的名称;
有了上面的基础,我们再来看看Indexer与索引相关的接口都定义了哪些?
1 // indexName索引类,obj是对象,计算obj在indexName索引类中的索引键,通过索引键把所有的对象取出来
2 // 基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键
3 Index(indexName string, obj interface{}) ([]interface{}, error)
4 // indexKey是indexName索引类中一个索引键,函数返回indexKey指定的所有对象键
5 // 这个对象键是Indexer内唯一的,在添加的时候会计算,后面讲具体Indexer实例的会讲解
6 IndexKeys(indexName, indexKey string) ([]string, error)
7 // 获取indexName索引类中的所有索引键
8 ListIndexFuncValues(indexName string) []string
9 // 这个函数和Index类似,只是返回值不是对象键,而是所有对象
10 ByIndex(indexName, indexKey string) ([]interface{}, error)
11 // 返回Indexers
12 GetIndexers() Indexers
13 // 添加Indexers,就是增加更多的索引分类
14 AddIndexers(newIndexers Indexers) error
我相信通过我的注释很多人已经对Indexer有了初步认识,我们再来看看Store这个interface有哪些接口:
1# tools/cache/store.go
2// Store is a generic object storage interface. Reflector knows how to watch a server
3// and update a store. A generic store is provided, which allows Reflector to be used
4// as a local caching system, and an LRU store, which allows Reflector to work like a
5// queue of items yet to be processed.
6//
7// Store makes no assumptions about stored object identity; it is the responsibility
8// of a Store implementation to provide a mechanism to correctly key objects and to
9// define the contract for obtaining objects by some arbitrary key type.
10type Store interface {
11 Add(obj interface{}) error// 添加对象
12 Update(obj interface{}) error// 更新对象
13 Delete(obj interface{}) error// 删除对象
14 List() []interface{}// 列举对象
15 ListKeys() []string// 列举对象键
16 // 返回obj相同对象键的对象,对象键是通过对象计算出来的字符串
17 Get(obj interface{}) (item interface{}, exists bool, err error)
18 // 通过对象键获取对象
19 GetByKey(key string) (item interface{}, exists bool, err error)
20
21 // Replace will delete the contents of the store, using instead the
22 // given list. Store takes ownership of the list, you should not reference
23 // it after calling this function.
24 // 用[]interface{}替换Store存储的所有对象,等同于删除全部原有对象在逐一添加新的对象
25 Replace([]interface{}, string) error
26 Resync() error// 重新同步
27}
从Store的抽象来看,要求每个对象都要有唯一的键,至于键的计算方式就看具体实现了。我们看了半天的各种抽象,是时候讲解一波具体实现了
Indexer实现之cache
cache就是在ThreadSafeStore的再封装
cache是Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从cache这个类型的名称来看属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。其实cache的定义非常简单,如下所以:
1# tools/cache/store.go
2// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
3// 计算对象键的函数
4type KeyFunc func(obj interface{}) (string, error)
5...
6// cache responsibilities are limited to:
7// 1. Computing keys for objects via keyFunc
8// 2. Invoking methods of a ThreadSafeStorage interface
9type cache struct {
10 // cacheStorage bears the burden of thread safety for the cache
11 cacheStorage ThreadSafeStore// 线程安全的存储
12 // keyFunc is used to make the key for objects stored in and retrieved from items, and
13 // should be deterministic.
14 keyFunc KeyFunc // 计算对象键的函数
15}
16
17var _ Store = &cache{}
18
这里可以看出来cache有一个计算对象键的函数,创建cache对象的时候就要指定这个函数了。
ThreadSafeStore
1# tools/cache/thread_safe_store.go
2// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
3// TL;DR caveats: you must not modify anything returned by Get or List as it will break
4// the indexing feature in addition to not being thread safe.
5//
6// The guarantees of thread safety provided by List/Get are only valid if the caller
7// treats returned items as read-only. For example, a pointer inserted in the store
8// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
9// on the same key and modify the pointer in a non-thread-safe way. Also note that
10// modifying objects stored by the indexers (if any) will *not* automatically lead
11// to a re-index. So it's not a good idea to directly modify the objects returned by
12// Get/List, in general.
13type ThreadSafeStore interface {
14 Add(key string, obj interface{})
15 Update(key string, obj interface{})
16 Delete(key string)
17 Get(key string) (item interface{}, exists bool)
18 List() []interface{}
19 ListKeys() []string
20 Replace(map[string]interface{}, string)
21 Index(indexName string, obj interface{}) ([]interface{}, error)
22 IndexKeys(indexName, indexKey string) ([]string, error)
23 ListIndexFuncValues(name string) []string
24 ByIndex(indexName, indexKey string) ([]interface{}, error)
25 GetIndexers() Indexers
26
27 // AddIndexers adds more indexers to this store. If you call this after you already have data
28 // in the store, the results are undefined.
29 AddIndexers(newIndexers Indexers) error
30 Resync() error
31}
32
33// threadSafeMap implements ThreadSafeStore
34type threadSafeMap struct {
35 lock sync.RWMutex // 读写锁,毕竟读的多写的少,读写锁性能要更好
36 items map[string]interface{} // 存储对象的map,对象键:对象
37
38 // indexers maps a name to an IndexFunc
39 indexers Indexers// 这个不用多解释了把,用于计算索引键的函数map
40 // indices maps a name to an Index
41 indices Indices// 快速索引表,通过索引可以快速找到对象键,然后再从items中取出对象
42}
43
44// 添加对象函数
45func (c *threadSafeMap) Add(key string, obj interface{}) {
46 c.lock.Lock()// 加锁,因为是写操作,所以是全部互斥的那种
47 defer c.lock.Unlock()
48 // 把老的对象取出来
49 oldObject := c.items[key]
50 // 写入新的对象
51 c.items[key] = obj
52 // 由于对象的添加就要更新索引
53 c.updateIndices(oldObject, obj, key)
54}
55// 更新对象函数,和添加对象一模一样,所以就不解释了,为啥Add函数不直接调用Update呢?
56func (c *threadSafeMap) Update(key string, obj interface{}) {
57 c.lock.Lock()
58 defer c.lock.Unlock()
59 oldObject := c.items[key]
60 c.items[key] = obj
61 c.updateIndices(oldObject, obj, key)
62}
63
64// 删除对象
65func (c *threadSafeMap) Delete(key string) {
66 c.lock.Lock()// 加锁,因为是写操作,所以是全部互斥的那种
67 defer c.lock.Unlock()
68 // 判断对象是否存在?
69 if obj, exists := c.items[key]; exists {
70 // 删除对象的索引
71 c.deleteFromIndices(obj, key)
72 // 删除对象本身
73 delete(c.items, key)
74 }
75}
76
77// 获取对象
78func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
79 c.lock.RLock()// 此处只用了读锁
80 defer c.lock.RUnlock()
81 // 利用对象键取出对象
82 item, exists = c.items[key]
83 return item, exists
84}
85// 列举对象
86func (c *threadSafeMap) List() []interface{} {
87 c.lock.RLock()// 此处只用了读锁
88 defer c.lock.RUnlock()
89 // 直接遍历对象map就可以了
90 list := make([]interface{}, 0, len(c.items))
91 for _, item := range c.items {
92 list = append(list, item)
93 }
94 return list
95}
96
97// 列举对象键
98// ListKeys returns a list of all the keys of the objects currently
99// in the threadSafeMap.
100func (c *threadSafeMap) ListKeys() []string {
101 c.lock.RLock()// 此处只用了读锁
102 defer c.lock.RUnlock()
103 // 同样是遍历对象map,但是只输出对象键
104 list := make([]string, 0, len(c.items))
105 for key := range c.items {
106 list = append(list, key)
107 }
108 return list
109}
110// 取代所有对象,相当于重新构造了一遍threadSafeMap
111func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
112 c.lock.Lock()// 此处必须要用全局锁,因为有写操作
113 defer c.lock.Unlock()
114 // 直接覆盖以前的对象
115 c.items = items
116
117 // rebuild any index
118 c.indices = Indices{}// 重建索引
119 for key, item := range c.items {
120 c.updateIndices(nil, item, key)
121 }
122 // 发现没有,resourceVersion此处没有用到,估计是其他的Indexer实现有用
123}
124
125// 这个函数就是通过指定的索引函数计算对象的索引键,然后把索引键的对象全部取出来
126// Index returns a list of items that match on the index function
127// Index is thread-safe so long as you treat all items as immutable
128func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
129 c.lock.RLock()// 只读,所以用读锁
130 defer c.lock.RUnlock()
131// 取出indexName这个分类索引函数
132 indexFunc := c.indexers[indexName]
133 if indexFunc == nil {
134 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
135 }
136
137 // 计算对象的索引键
138 indexKeys, err := indexFunc(obj)
139 if err != nil {
140 return nil, err
141 }
142 // 取出indexName这个分类所有索引
143 index := c.indices[indexName]
144 // 返回对象的对象键的集合
145 var returnKeySet sets.String
146 if len(indexKeys) == 1 {
147 // In majority of cases, there is exactly one value matching.
148 // Optimize the most common path - deduping is not needed here.
149 returnKeySet = index[indexKeys[0]]
150 } else {
151 // Need to de-dupe the return list.
152 // Since multiple keys are allowed, this can happen.
153 returnKeySet = sets.String{}
154 // 遍历刚刚计算出来的所有索引键
155 for _, indexKey := range indexKeys {
156 // 把所有的对象键输出到对象键的集合中
157 for key := range index[indexKey] {
158 returnKeySet.Insert(key)
159 }
160 }
161 }
162 // 通过对象键逐一的把对象取出
163 list := make([]interface{}, 0, returnKeySet.Len())
164 for absoluteKey := range returnKeySet {
165 list = append(list, c.items[absoluteKey])
166 }
167 return list, nil
168}
169
170// 这个函数和上面的函数基本一样,只是索引键不用再计算了,使用者提供
171// ByIndex returns a list of items that match an exact value on the index function
172func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
173 c.lock.RLock()// 同样是读锁
174 defer c.lock.RUnlock()
175 // 判断indexName这个索引分类是否存在
176 indexFunc := c.indexers[indexName]
177 if indexFunc == nil {
178 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
179 }
180 // 取出索引分类的所有索引
181 index := c.indices[indexName]
182 // 再出去索引键的所有对象键
183 set := index[indexKey]
184 // 遍历对象键输出
185 list := make([]interface{}, 0, set.Len())
186 for key := range set {
187 list = append(list, c.items[key])
188 }
189
190 return list, nil
191}
192
193// 你会发现这个函数和ByIndex()基本一样,只是输出的是对象键
194// IndexKeys returns a list of keys that match on the index function.
195// IndexKeys is thread-safe so long as you treat all items as immutable.
196func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
197 c.lock.RLock()// 同样是读锁
198 defer c.lock.RUnlock()
199 // 判断indexName这个索引分类是否存在
200 indexFunc := c.indexers[indexName]
201 if indexFunc == nil {
202 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
203 }
204 // 取出索引分类的所有索引
205 index := c.indices[indexName]
206 // 直接输出索引键内的所有对象键
207 set := index[indexKey]
208 return set.List(), nil
209}
210
211// 这个函数用来获取索引分类内的所有索引键的
212func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
213 c.lock.RLock()// 依然是读锁
214 defer c.lock.RUnlock()
215 // 获取索引分类的所有索引
216 index := c.indices[indexName]
217 // 直接遍历后输出索引键
218 names := make([]string, 0, len(index))
219 for key := range index {
220 names = append(names, key)
221 }
222 return names
223}
224
225func (c *threadSafeMap) GetIndexers() Indexers {
226 return c.indexers
227}
228
229func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
230 c.lock.Lock()
231 defer c.lock.Unlock()
232
233 if len(c.items) > 0 {
234 return fmt.Errorf("cannot add indexers to running index")
235 }
236
237 oldKeys := sets.StringKeySet(c.indexers)
238 newKeys := sets.StringKeySet(newIndexers)
239
240 if oldKeys.HasAny(newKeys.List()...) {
241 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
242 }
243
244 for k, v := range newIndexers {
245 c.indexers[k] = v
246 }
247 return nil
248}
249
250// 当有对象添加或者更新是,需要更新索引,因为代用该函数的函数已经加锁了,所以这个函数没有加锁操作
251// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
252// updateIndices must be called from a function that already has a lock on the cache
253func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
254 // if we got an old object, we need to remove it before we add it again
255 // 在添加和更新的时候都会获取老对象,如果存在老对象,那么就要删除老对象的索引,后面有说明
256 if oldObj != nil {
257 c.deleteFromIndices(oldObj, key)
258 }
259 // 遍历所有的索引函数,因为要为对象在所有的索引分类中创建索引键
260 for name, indexFunc := range c.indexers {
261 // 计算索引键
262 indexValues, err := indexFunc(newObj)
263 if err != nil {
264 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
265 }
266 // 获取索引分类的所有索引
267 index := c.indices[name]
268 if index == nil {
269 // 为空说明这个索引分类还没有任何索引
270 index = Index{}
271 c.indices[name] = index
272 }
273 // 遍历对象的索引键,上面刚刚用索引函数计算出来的
274 for _, indexValue := range indexValues {
275 // 找到索引键的对象集合
276 set := index[indexValue]
277 // 为空说明这个索引键下还没有对象
278 if set == nil {
279 // 创建对象键集合
280 set = sets.String{}
281 index[indexValue] = set
282 }
283 // 把对象键添加到集合中
284 set.Insert(key)
285 }
286 }
287}
288// 这个函数用于删除对象的索引的
289// deleteFromIndices removes the object from each of the managed indexes
290// it is intended to be called from a function that already has a lock on the cache
291func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
292 // 遍历索引函数,也就是把所有索引分类
293 for name, indexFunc := range c.indexers {
294 // 计算对象的索引键
295 indexValues, err := indexFunc(obj)
296 if err != nil {
297 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
298 }
299 // 获取索引分类的所有索引
300 index := c.indices[name]
301 if index == nil {
302 continue
303 }
304 // 遍历对象的索引键
305 for _, indexValue := range indexValues {
306 // 把对象从索引键指定对对象集合删除
307 set := index[indexValue]
308 if set != nil {
309 set.Delete(key)
310
311 // If we don't delete the set when zero, indices with high cardinality
312 // short lived resources can cause memory to increase over time from
313 // unused empty sets. See `kubernetes/kubernetes/issues/84959`.
314 if len(set) == 0 {
315 delete(index, indexValue)
316 }
317 }
318 }
319 }
320}
321
322func (c *threadSafeMap) Resync() error {
323 // Nothing to do
324 return nil
325}
索引键(indexers)和对象键(indices)是两个重要概念,索引键是用于对象快速查找的,经过索引建在map中排序查找会更快;对象键是为对象在存储中的唯一命名的,对象是通过名字+对象的方式存储的。
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力
cache实现了Indexer接口,但cache是包内私有的(首字母小写),只能通过包内封装的函数进行调用。
可以通过NewStore和NewIndexer初始化cache来返回一个Store或Indexer指针(cache实现了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的数据载体为threadSafeMap,threadSafeMap通过NewThreadSafeStore函数初始化。
1// tools/cache/store.go
2// NewStore returns a Store implemented simply with a map and a lock.
3func NewStore(keyFunc KeyFunc) Store {
4 return &cache{
5 cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
6 keyFunc: keyFunc,
7 }
8}
9
10// NewIndexer returns an Indexer implemented simply with a map and a lock.
11func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
12 return &cache{
13 cacheStorage: NewThreadSafeStore(indexers, Indices{}),
14 keyFunc: keyFunc,
15 }
16}
index.go中给出了索引相关的操作(接口);store.go中给出了与操作存储相关的接口,并提供了一个cache实现,当然也可以实现自行实现Store接口;thread_safe_store.go为cache的私有实现。
client-go的indexer实际操作的还是threadSafeMap中的方法和数据,调用关系如下:
可以通过下图理解threadSafeMap中各种索引之间的关系
以namespace作为索引类型为例来讲,首先从indexers获取计算namespace的indexFunc,然后使用该indexFunc计算出与入参对象相关的所有namespaces。indices中保存了所有namespaces下面的对象键,可以获取特定namespace下面的所有对象键,在items中输入特定的对象键就可以得出特定的对象。indexers用于找出与特定对象相关的资源,如找出某Pod相关的secrets。
默认的indexFunc如下,根据对象的namespace进行分类
1# tools/cache/index.go
2func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
3 meta, err := meta.Accessor(obj)
4 if err != nil {
5 return []string{""}, fmt.Errorf("object has no meta: %v", err)
6 }
7 return []string{meta.GetNamespace()}, nil
8}
cache结构中的keyFunc用于生成objectKey,下面是默认的keyFunc。
1# tools/cache/thread_safe_store.go
2func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
3 if key, ok := obj.(ExplicitKey); ok {
4 return string(key), nil
5 }
6 meta, err := meta.Accessor(obj)
7 if err != nil {
8 return "", fmt.Errorf("object has no meta: %v", err)
9 }
10 if len(meta.GetNamespace()) > 0 {
11 return meta.GetNamespace() + "/" + meta.GetName(), nil
12 }
13 return meta.GetName(), nil
14}
总结
- indexer实际的对象存储在threadSafeMap结构中
- indexers划分了不同的索引类型(indexName,如namespace),并按照索引类型进行索引(indexFunc,如MetaNamespaceIndexFunc),得出符合该对象的索引键(indexKeys,如namespaces),一个对象在一个索引类型中可能有多个索引键。
- indices按照索引类型保存了索引(index,如包含所有namespaces下面的obj),进而可以按照索引键找出特定的对象键(keys,如某个namespace下面的对象键),indices用于快速查找对象
- items按照对象键保存了实际的对象
Custom Controller components
Informer reference
这是对Informer实例的引用,该实例知道如何使用自定义资源对象。您的自定义控制器代码需要创建适当的Informer。
controller需要创建合适的Informer才能通过Informer reference操作资源对象
Indexer reference
这是对Indexer实例的引用,该实例知道如何使用自定义资源对象。您的自定义控制器代码需要创建它。您将使用此参考来检索对象以供以后处理。
controller创建Indexer reference然后去利用索引做相关处理
client-go中的基本控制器提供NewIndexerInformer函数来创建Informer和Indexer。在您的代码中,您可以直接调用此函数,也可以使用工厂方法创建通知程序。
1# kubernetes/client-go/blob/master/examples/workqueue/main.go#L174
2indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
3
4# kubernetes/sample-controller/blob/master/main.go#L61
5kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
6 exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
Resource Event Handlers
这些是回调函数,当Informer想要将一个对象传递到您的controller时,它将由Informer调用。编写这些功能的典型模式是获取调度对象的密钥,并将该密钥排入work queue以进行进一步处理。
Informer会回调这些handlers
Work queue
这是您在控制器代码中创建的队列,用于将对象的交付与其处理分离。编写资源事件处理程序(Resource event handler)函数是为了提取交付的对象的键并将其添加到工作队列中。
Resource Event Handlers被回调后将key写到工作队列,这里的key相当于事件通知,后面根据取出事件后,做后续的处理
workqueue源码
indexer用于保存apiserver的资源信息,而workqueue用于保存informer中的handler处理之后的数据。
通用队列
workqueue的接口定义如下:
1// 代码源自client-go/util/workqueue/queue.go
2// 这是一个interface类型,说明有其他的各种各样的实现
3type Interface interface {
4 Add(item interface{}) // 向队列中添加一个元素,interface{}类型,说明可以添加任何类型的元素
5 Len() int // 队列长度,就是元素的个数
6 Get() (item interface{}, shutdown bool) // 从队列中获取一个元素,双返回值,这个和chan的<-很像,第二个返回值告知队列是否已经关闭了
7 Done(item interface{}) // 告知队列该元素已经处理完了
8 ShutDown() // 关闭队列
9 ShuttingDown() bool // 查询队列是否正在关闭
10}
参见上图可以看到真正处理的元素来自queue,dirty和queue中的元素可能不一致,不一致点来自于当Get一个元素后且Done执行前,此时Get操作会删除dirty中的该元素,如果此时发生了Add正在处理的元素的操作,由于此时dirty中没有该元素且processing中存在该元素,会发生dirty中的元素大于queue中元素的情况。但对某一元素的不一致会在Done完成后消除,即Done函数中会判断该元素是否在dirty中,如果存在则会将该元素append到queue中。总之,dirty中的数据都会被append到queue中,后续queue中的数据会insert到processing中进行处理() dType实现了Interface接口。包含下面几个变量:
- queue:使用数组顺序存储了待处理的元素;
- dirty:使用哈希表存储了需要处理的元素,它包含了queue中的所有元素,用于快速查找元素,dirty中可能包含queue中不存在的元素。dirty可以防止重复添加正在处理的元素;
- processing:使用哈希表保存了正在处理的元素,它不包含queue中的元素,但可能包含dirty中的元素
1// 代码源于client-go/util/workqueue/queue.go
2type Type struct {
3 queue []t // 元素数组
4 dirty set // dirty的元素集合
5 processing set // 正在处理的元素集合
6 cond *sync.Cond // 与pthread_cond_t相同,条件同步
7 shuttingDown bool // 关闭标记
8 metrics queueMetrics // 这个metrics和prometheus的metrics概念相同,此处不做过多说明,知道功能就行
9}
10// 以下的这些类型定义也是够了,对于C/C++程序猿来说不能忍~
11type empty struct{} // 空类型,因为sizeof(struct{})=0
12type t interface{} // 元素类型是泛型
13type set map[t]empty // 用map实现的set,所有的value是空数据就行了
14
15
16// 代码源自client-go/util/workqueue/queue.go
17func (q *Type) Add(item interface{}) {
18 // 和pthread_cond_t不同的是golang的cond自带了互斥锁
19 q.cond.L.Lock()
20 defer q.cond.L.Unlock()
21 // 队列正在关闭,直接返回
22 if q.shuttingDown {
23 return
24 }
25 // 已经标记为脏的数据,也直接返回,因为存储在了脏数据的集合中
26 if q.dirty.has(item) {
27 return
28 }
29 // 告知metrics添加了元素
30 q.metrics.add(item)
31 // 添加到脏数据集合中
32 q.dirty.insert(item)
33 // 元素刚被拿走处理,那就直接返回
34 if q.processing.has(item) {
35 return
36 }
37 // 追加到元素数组的尾部
38 q.queue = append(q.queue, item)
39 // 通知有新元素到了,此时有协程阻塞就会被唤醒
40 q.cond.Signal()
41}
421. 队列关闭了,所以不接受任何数据,上面代码也是这么实现的;
432. 队列中没有该元素,那就直接存储在队列中;
443. 队列中已经有了该元素,这个改如何判断?set类型肯定最快,数组需要遍历效率太低,这也是dirty存在的价值之一,上面的代码也通过dirty判断元素是否存在的;
454. 队列曾经存储过该元素,但是已经被拿走还没有调用Done()时,也就是正在处理中的元素,此时再添加当前的元素应该是最新鲜的,处理中的应该是过时的,也就是脏的,我认为dirty的来源就是这个吧~
46
47// 代码源自client-go/util/workqueue/queue.go
48func (q *Type) Get() (item interface{}, shutdown bool) {
49 // 加锁解锁不解释
50 q.cond.L.Lock()
51 defer q.cond.L.Unlock()
52 // 没有数据,阻塞协程
53 for len(q.queue) == 0 && !q.shuttingDown {
54 q.cond.Wait()
55 }
56 // 协程被激活但还没有数据,说明队列被关闭了,这个和chan一样
57 if len(q.queue) == 0 {
58 return nil, true
59 }
60
61 // 弹出第一个元素,我一直感觉golang的slice[1:]这种操作性能不太高~以后有时间看看代码实现
62 item, q.queue = q.queue[0], q.queue[1:]
63 // 通知metrics元素被取走了
64 q.metrics.get(item)
65 // 从dirty集合中移除,加入到processing集合,经过前面的分析这里就很好理解了
66 q.processing.insert(item)
67 q.dirty.delete(item)
68
69 return item, false
70}
71
72// 代码源自client-go/util/workqueue/queue.go
73func (q *Type) Done(item interface{}) {
74 // 加锁解锁不解释
75 q.cond.L.Lock()
76 defer q.cond.L.Unlock()
77 // 通知metrics元素处理完了
78 q.metrics.done(item)
79 // 从processing集合中删除
80 q.processing.delete(item)
81 // 重点来啦,此处判断脏元素集合,看看处理期间是不是又被添加,如果是那就在放到队列中,完全符合我们的分析
82 if q.dirty.has(item) {
83 q.queue = append(q.queue, item)
84 q.cond.Signal()
85 }
86}
延时队列
延时队列接口继承了queue的Interface接口,仅新增了一个AddAfter方法,它用于在duration时间之后将元素添加到queue中。
1// client-go/util/workqueue/delaying_queue.go
2type DelayingInterface interface {
3 Interface// 继承了通用队列所有接口
4 // AddAfter adds an item to the workqueue after the indicated duration has passed
5 AddAfter(item interface{}, duration time.Duration)// 增加了延迟添加的接口
6}
从延时队列的抽象来看,和通用队列基本一样,只是多了延迟添加的接口, 也就增加了一些机制实现元素的延迟添加,这一点可以从延时队列的实现类型上可以看出: delayingType实现了DelayingInterface接口使用waitingForAddCh来传递需要添加到queue的元素,
1// 代码源自client-go/util/workqueue/delaying_queue.go
2type delayingType struct {
3 Interface // 这参数不出意外是通用队列的实现
4 clock clock.Clock // 时钟,用于获取时间
5 stopCh chan struct{} // 延时就意味着异步,就要有另一个协程处理,所以需要退出信号
6 heartbeat clock.Ticker // 定时器,在没有任何数据操作时可以定时的唤醒处理协程,定义为心跳没毛病
7 waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中
8 metrics retryMetrics // 和通用队列中的metrics功能类似
9}
10//
11type waitFor struct {
12 data t // 元素数据,这个t就是在通用队列中定义的类型interface{}
13 readyAt time.Time // 在什么时间添加到队列中
14 index int // 这是个索引,后面会详细说明
15}
delayingType.waitingForAddCh中的元素如果没有超过延时时间会添加到waitForPriorityQueue中,否则直接加入queue中。
1// 代码源自client-go/util/workqueue/delaying_queue.go
2// waitFor的定义上面有,是需要延时添加的元素都要封装成这个类型
3// waitForPriorityQueue就把需要延迟的元素形成了一个队列,队列按照元素的延时添加的时间(readyAt)从小到大排序
4// 实现的策略就是实现了go/src/container/heap/heap.go中的Interface类型,读者可以自行了解heap
5// 这里只需要知道waitForPriorityQueue这个数组是有序的,排序方式是按照时间从小到大
6type waitForPriorityQueue []*waitFor
7// heap需要实现的接口,告知队列长度
8func (pq waitForPriorityQueue) Len() int {
9 return len(pq)
10}
11// heap需要实现的接口,告知第i个元素是否比第j个元素小
12func (pq waitForPriorityQueue) Less(i, j int) bool {
13 return pq[i].readyAt.Before(pq[j].readyAt) // 此处对比的就是时间,所以排序按照时间排序
14}
15// heap需要实现的接口,实现第i和第j个元素换
16func (pq waitForPriorityQueue) Swap(i, j int) {
17 // 这种语法好牛逼,有没有,C/C++程序猿没法理解~
18 pq[i], pq[j] = pq[j], pq[i]
19 pq[i].index = i // 因为heap没有所以,所以需要自己记录索引,这也是为什么waitFor定义索引参数的原因
20 pq[j].index = j
21}
22// heap需要实现的接口,用于向队列中添加数据
23func (pq *waitForPriorityQueue) Push(x interface{}) {
24 n := len(*pq)
25 item := x.(*waitFor)
26 item.index = n // 记录索引值
27 *pq = append(*pq, item) // 放到了数组尾部
28}
29// heap需要实现的接口,用于从队列中弹出最后一个数据
30func (pq *waitForPriorityQueue) Pop() interface{} {
31 n := len(*pq)
32 item := (*pq)[n-1]
33 item.index = -1
34 *pq = (*pq)[0:(n - 1)] // 缩小数组,去掉了最后一个元素
35 return item
36}
37// 返回第一个元素
38func (pq waitForPriorityQueue) Peek() interface{} {
39 return pq[0]
40}
因为延时队列利用waitForPriorityQueue管理所有延时添加的元素,所有的元素在waitForPriorityQueue中按照时间从小到大排序,对于延时队列的处理就会方便很多了。
延时队列实现逻辑比较简单,需要注意的是waitingForQueue是以heap方式实现的队列,队列的pop和push等操作使用的是heap.pop和heap.push
接下来我们就可以分析延时队列的实现了,因为延时队列集成通用队列,所以这里只对新增的函数做说明:
1// 代码源自client-go/util/workqueue/delaying_queue.go
2func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
3 // 如果队列关闭就直接退出
4 if q.ShuttingDown() {
5 return
6 }
7 // 记录metrics
8 q.metrics.retry()
9 // 不需要延迟,那就直接像通用队列一样添加
10 if duration <= 0 {
11 q.Add(item)
12 return
13 }
14
15 // 把元素封装成waitFor传入chan,切记select没有default,所以可能会被阻塞
16 // 这里面用到了stopChan,因为有阻塞的可能,所以用stopChan可以保证退出
17 select {
18 case <-q.stopCh:
19 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
20 }
21}
22
23// 代码源自client-go/util/workqueue/delaying_queue.go
24// 这部分就是演示队列的核心代码
25func (q *delayingType) waitingLoop() {
26 defer utilruntime.HandleCrash()
27 // 这个变量后面会用到,当没有元素需要延时添加的时候利用这个变量实现长时间等待
28 never := make(<-chan time.Time)
29 // 构造我们上面提到的有序队列了,并且初始化
30 waitingForQueue := &waitForPriorityQueue{}
31 heap.Init(waitingForQueue)
32 // 这个map是用来避免对象重复添加的,如果重复添加就只更新时间
33 waitingEntryByData := map[t]*waitFor{}
34 // 开始无限循环
35 for {
36 // 队列关闭了,就可以返回了
37 if q.Interface.ShuttingDown() {
38 return
39 }
40 // 获取当前时间
41 now := q.clock.Now()
42 // 有序队列中是否有元素,有人肯定会问还没向有序队列里添加呢判断啥啊?后面会有添加哈
43 for waitingForQueue.Len() > 0 {
44 // Peek函数我们前面注释了,获取第一个元素,注意:不会从队列中取出哦
45 entry := waitingForQueue.Peek().(*waitFor)
46 // 元素指定添加的时间过了么?如果没有过那就跳出循环
47 if entry.readyAt.After(now) {
48 break
49 }
50 // 既然时间已经过了,那就把它从有序队列拿出来放入通用队列中,这里面需要注意几点:
51 // 1.heap.Pop()弹出的是第一个元素,waitingForQueue.Pop()弹出的是最后一个元素
52 // 2.从有序队列把元素弹出,同时要把元素从上面提到的map删除,因为不用再判断重复添加了
53 // 3.此处是唯一一个地方把元素从有序队列移到通用队列,后面主要是等待时间到过程
54 entry = heap.Pop(waitingForQueue).(*waitFor)
55 q.Add(entry.data)
56 delete(waitingEntryByData, entry.data)
57 }
58
59 // 如果有序队列中没有元素,那就不用等一段时间了,也就是永久等下去
60 // 如果有序队列中有元素,那就用第一个元素指定的时间减去当前时间作为等待时间,逻辑挺简单
61 // 有序队列是用时间排序的,后面的元素需要等待的时间更长,所以先处理排序靠前面的元素
62 nextReadyAt := never
63 if waitingForQueue.Len() > 0 {
64 entry := waitingForQueue.Peek().(*waitFor)
65 nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
66 }
67 // 进入各种等待
68 select {
69 // 有退出信号么?
70 case <-q.stopCh:
71 return
72 // 定时器,没过一段时间没有任何数据,那就再执行一次大循环,从理论上讲这个没用,但是这个具备容错能力,避免BUG死等
73 case <-q.heartbeat.C():
74 // 这个就是有序队列里面需要等待时间信号了,时间到就会有信号
75 case <-nextReadyAt:
76 // 这里是从chan中获取元素的,AddAfter()放入chan中的元素
77 case waitEntry := <-q.waitingForAddCh:
78 // 如果时间已经过了就直接放入通用队列,没过就插入到有序队列
79 if waitEntry.readyAt.After(q.clock.Now()) {
80 insert(waitingForQueue, waitingEntryByData, waitEntry)
81 } else {
82 q.Add(waitEntry.data)
83 }
84 // 下面的代码看似有点多,目的就是把chan中的元素一口气全部取干净,注意用了default意味着chan中没有数据就会立刻停止
85 drained := false
86 for !drained {
87 select {
88 case waitEntry := <-q.waitingForAddCh:
89 if waitEntry.readyAt.After(q.clock.Now()) {
90 insert(waitingForQueue, waitingEntryByData, waitEntry)
91 } else {
92 q.Add(waitEntry.data)
93 }
94 default:
95 drained = true
96 }
97 }
98 }
99 }
100}
101// 下面的代码是把元素插入有序队列的实现
102func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
103 // 看看元素是不是被添加过?如果添加过看谁的时间靠后就用谁的时间
104 existing, exists := knownEntries[entry.data]
105 if exists {
106 if existing.readyAt.After(entry.readyAt) {
107 existing.readyAt = entry.readyAt
108 heap.Fix(q, existing.index)
109 }
110
111 return
112 }
113 // 把元素放入有序队列中,并记录在map里面,这个map就是上面那个用于判断对象是否重复添加的map
114 // 注意,这里面调用的是heap.Push,不是waitForPriorityQueue.Push
115 heap.Push(q, entry)
116 knownEntries[entry.data] = entry
117}
到这里延时队列核心代码基本分析完了,其重要的一点就是golang的heap,他辅助实现了元素按时间先后进行排序,这样延时队列就可以一个一个的等待超时添加了。heap的排序算法实现非常有意思
限速队列
限速队列应用非常广泛,比如我们做某些操作失败时希望重试几次,但是立刻重试很有可能还会失败,我们希望延迟一段时间在重试,而且失败次数越多延迟时间越长,这个时候就有限速的概念在里面了。在分析限速队列前,我们需要知道限速器。
限速队列实现了RateLimiter的3个接口,When用于返回元素的重试时间,Forget用于清除元素的重试记录,NumRequeues返回元素的重试次数
限速器RateLimiter
1// 代码源自client-go/util/workqueue/default_rate_limiter.go
2type RateLimiter interface {
3 When(item interface{}) time.Duration // 返回元素需要等待多长时间
4 Forget(item interface{}) // 抛弃该元素,意味着该元素已经被处理了
5 NumRequeues(item interface{}) int // 元素放入队列多少次了
6}
ItemExponentialFailureRateLimiter(default_rate_limiters)
他会根据元素错误次数逐渐累加等待时间,具体实现如下:
ItemExponentialFailureRateLimiter对使用指数退避的方式进行失败重试,当failures增加时,下次重试的时间就变为了baseDelay.Nanoseconds()) * math.Pow(2, float64(exp),maxDelay用于限制重试时间的最大值,当计算的重试时间超过maxDelay时则采用maxDelay
1// 代码源自client-go/util/workqueue/default_rate_limiters.go
2// 限速器的定义
3type ItemExponentialFailureRateLimiter struct {
4 failuresLock sync.Mutex // 互斥锁
5 failures map[interface{}]int // 记录每个元素错误次数,每调用一次When累加一次
6
7 baseDelay time.Duration // 元素延迟基数,算法后面会有说明
8 maxDelay time.Duration // 元素最大的延迟时间
9}
10// 实现限速器的When接口
11func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
12 r.failuresLock.Lock()
13 defer r.failuresLock.Unlock()
14 // 累加错误计数,比较好理解
15 exp := r.failures[item]
16 r.failures[item] = r.failures[item] + 1
17
18 // 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增,符合Exponential名字
19 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
20 if backoff > math.MaxInt64 {
21 return r.maxDelay
22 }
23 // 计算后的延迟值和最大延迟值二者取最小值
24 calculated := time.Duration(backoff)
25 if calculated > r.maxDelay {
26 return r.maxDelay
27 }
28
29 return calculated
30}
31// 实现限速器的NumRequeues接口,很简单,没什么好说的
32func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
33 r.failuresLock.Lock()
34 defer r.failuresLock.Unlock()
35
36 return r.failures[item]
37}
38// 实现限速器的Forget接口,也很简单,没什么好说的
39func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
40 r.failuresLock.Lock()
41 defer r.failuresLock.Unlock()
42
43 delete(r.failures, item)
44}
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter针对失败次数采用不同的重试时间。当重试次数小于maxFastAttempts时,重试时间为fastDelay,否则我为slowDelay。 ItemFastSlowRateLimiter的限速策略是尝试次数超过阈值用长延迟,否则用短延迟。
1// 代码源自client-go/util/workqueue/default_rate_limiters.go
2// 限速器定义
3type ItemFastSlowRateLimiter struct {
4 failuresLock sync.Mutex // 互斥锁
5 failures map[interface{}]int // 错误次数计数
6
7 maxFastAttempts int // 错误尝试阈值
8 fastDelay time.Duration // 短延迟时间
9 slowDelay time.Duration // 长延迟时间
10}
11// 限速器实现When接口
12func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
13 r.failuresLock.Lock()
14 defer r.failuresLock.Unlock()
15
16 // 累加错误计数
17 r.failures[item] = r.failures[item] + 1
18 // 错误次数超过阈值用长延迟,否则用短延迟
19 if r.failures[item] <= r.maxFastAttempts {
20 return r.fastDelay
21 }
22
23 return r.slowDelay
24}
25// 限速器实现NumRequeues接口,比较简单不多解释
26func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
27 r.failuresLock.Lock()
28 defer r.failuresLock.Unlock()
29
30 return r.failures[item]
31}
32// 限速器实现Forget接口,比较简单不多解释
33func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
34 r.failuresLock.Lock()
35 defer r.failuresLock.Unlock()
36
37 delete(r.failures, item)
38}
MaxOfRateLimiter
MaxOfRateLimiter为一个限速队列列表,它的实现中返回列表中重试时间最长的限速队列的值。 MaxOfRateLimiter是一个非常有意思的限速器,他内部有多个限速器,每次返回最悲观的。何所谓最悲观的,比如内部有三个限速器,When()接口返回的就是三个限速器里面延迟最大的。让我们看看具体实现:
1// 代码源自client-go/util/workqueue/default_rate_limiters.go
2type MaxOfRateLimiter struct {
3 limiters []RateLimiter // 限速器数组,创建该限速器需要提供一个限速器数组
4}
5// 限速器实现When接口
6func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
7 ret := time.Duration(0)
8 // 这里在获取所有限速里面时间最大的
9 for _, limiter := range r.limiters {
10 curr := limiter.When(item)
11 if curr > ret {
12 ret = curr
13 }
14 }
15
16 return ret
17}
18// 限速器实现NumRequeues接口
19func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
20 ret := 0
21 // Requeues也是取最大值
22 for _, limiter := range r.limiters {
23 curr := limiter.NumRequeues(item)
24 if curr > ret {
25 ret = curr
26 }
27 }
28
29 return ret
30}
31// 限速器实现Forget接口
32func (r *MaxOfRateLimiter) Forget(item interface{}) {
33 // 逐一遍历Forget就行了,比较简单
34 for _, limiter := range r.limiters {
35 limiter.Forget(item)
36 }
37}
BucketRateLimiter
使用令牌桶实现一个固定速率的限速器 BucketRateLimiter是利用golang.org.x.time.rate.Limiter实现固定速率(qps)的限速器,至于golang.org.x.time.rate.Limiter的实现原理读者可以自行分析,此处只对BucketRateLimiter做说明。
1// 代码源自client-go/util/workqueue/default_rate_limiters.go
2type BucketRateLimiter struct {
3 *rate.Limiter // 这个就是golang.org.x.time.rate.Limiter
4}
5func (r *BucketRateLimiter) When(item interface{}) time.Duration {
6 return r.Limiter.Reserve().Delay() // 获取延迟,这个延迟会是个相对固定的周期
7}
8func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
9 return 0 // 因为固定频率的,也就不存在重试什么的了
10}
11func (r *BucketRateLimiter) Forget(item interface{}) {
12}
限速队列实现
所有的限速队列实际上就是根据不同的需求,最终提供一个延时时间,在延时时间到后通过AddAfter函数将元素添加添加到队列中。在queue.go中给出了workqueue的基本框架,delaying_queue.go扩展了workqueue的功能,提供了限速的功能,而default_rate_limiters.go提供了多种限速队列,用于给delaying_queue.go中的AddAfter提供延时参数,最后rate_limiting_queue.go给出了使用使用限速队列的入口。
1// 代码源自client-go/util/workqueue/rate_limiting_queue.go
2type RateLimitingInterface interface {
3 DelayingInterface // 继承了延时队列
4 AddRateLimited(item interface{}) // 按照限速方式添加元素的接口
5 Forget(item interface{}) // 丢弃指定元素
6 NumRequeues(item interface{}) int // 查询元素放入队列的次数
7}
8// 这个是限速队列的实现
9type rateLimitingType struct {
10 DelayingInterface // 同样要继承延迟队列
11 rateLimiter RateLimiter // 哈哈,这就对了嘛,加一个限速器就可以了
12}
13// 代码源自client-go/util/workqueue/rate_limitting_queue.go
14func (q *rateLimitingType) AddRateLimited(item interface{}) {
15 // 通过限速器获取延迟时间,然后加入到延时队列
16 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
17}
18func (q *rateLimitingType) NumRequeues(item interface{}) int {
19 return q.rateLimiter.NumRequeues(item) // 太简单了,不解释了
20}
21func (q *rateLimitingType) Forget(item interface{}) {
22 q.rateLimiter.Forget(item) // 太简单了,不解释了
23}
限速队列的使用
- 使用NewItemExponentialFailureRateLimiter初始化一个限速器
- 使用NewRateLimitingQueue新建一个限速队列,并使用上一步的限速器进行初始化
- 后续就可以使用AddRateLimited添加元素
1// client-go/util/workqueue/rate_limiting_queue_test.go
2func TestRateLimitingQueue(t *testing.T) {
3 limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
4 queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
5 fakeClock := clock.NewFakeClock(time.Now())
6 delayingQueue := &delayingType{
7 Interface: New(),
8 clock: fakeClock,
9 heartbeat: fakeClock.NewTicker(maxWait),
10 stopCh: make(chan struct{}),
11 waitingForAddCh: make(chan *waitFor, 1000),
12 metrics: newRetryMetrics(""),
13 deprecatedMetrics: newDeprecatedRetryMetrics(""),
14 }
15 queue.DelayingInterface = delayingQueue
16
17 queue.AddRateLimited("one")
18 waitEntry := <-delayingQueue.waitingForAddCh
19 if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
20 t.Errorf("expected %v, got %v", e, a)
21 }
22
23 queue.Forget("one")
24 if e, a := 0, queue.NumRequeues("one"); e != a {
25 t.Errorf("expected %v, got %v", e, a)
26 }
27}
Process Item
这是您在代码中创建的用于处理工作队列中项目的功能。可以有一个或多个其他函数进行实际处理。这些函数通常将使用索引器引用(Indexer reference)或列表包装器来检索与键相对应的对象。
从工作队列中取出key后进行后续处理,具体处理可以通过Indexer reference
1# kubernetes/client-go/blob/master/examples/workqueue/main.go#L73
2obj, exists, err := c.indexer.GetByKey(key)