Informer机制
在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,Kubernetes则通过使用cient-go的Informer机制达到这一效果。其他组件也是通过Informer机制与Kubernetes API Server进行通信的。
1. Informer机制架构设计
Informer运行原理图:
由图中所示,Informer架构设计有多个核心组件:
Reflector:用于监控(Watch)指定的Kubernetes资源,当监控的资源发生变化时触发响应的变更事件(例如Added事件、Updated事件和Deleted事件),并将其资源对象存放到本地缓存DeltaFIFO中。
DeltaFIFO:可以分开理解。FIFO是一个先进先出的队列,拥有队列操作的基本方法(Add,Update,Delete,List,Pop,Close等);Delta是一个资源对象存储,可以保存资源对象的操作类型(Added、Updated、Deleted、Sync等)
Indexer:是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer与Etcd集群中的数据完全一致。client-go可以很方便的从本地存储中读取相应的资源对象数据,无须每次从远程Etcd集群读取,减轻了api-server及Etcd集群的压力。
1.1 代码练习
1.2 资源Informer
kubernetes上的每一个资源都实现了Informer机制,每一个Informer上都会实现Informer和Lister方法,下面是PodInformer的接口,
源码路径:k8s.io\client-go\informers\core\v1\pod.go
定义不同资源的Informer,允许监控不同资源的事件,例如监听Node资源对象的informer如下:
1.3 Shared Informer共享机制
Informer也被称为Shared Informer,它是可以共享使用的。若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多的相同ListAndWatch ,太多重复的序列化和反序列化操作会导致api-server负载过重。
Shared Informer可以使同一个资源对象共享一个Reflector,这样可以节约很多资源;Shared Infor定义了一个map数据结构,通过map数据结构实现共享Informer机制。
源码路径:k8s.io\client-go\informers\factory.go
sharedInformerFactory中的informers字段中存储了资源类型和对应于sharedindexInformer的映射关系,此关系是通过调用InformerFor函数实现,在添加过程中,如果已经存在同类型的Informer,则直接返回,不在继续添加。代码如下:
源码路径:k8s.io\client-go\informers\factory.go
最后,通过Informer的Start方法使f.informers中的每个informer通过goroutine持久运行,并将其运行状态设置为true:
源码路径:k8s.io\client-go\informers\factory.go
2. Reflector
Informer对kubernetes api sever的资源(内置及CRD)执行监控(Watch)操作,最核心的功能是Reflector。Reflector用于监控指定的kubernetes资源,当监控的资源发生变化时,触发相应的变更事件,并将其资源对象存放在本地缓存DeltaFIFO中。
通过NewReflector进行对象实例化,实例化过程中须传入ListerWatcher数据接口对象,它拥有List和Watch方法,用于获取及监控资源列表。只要是实现了List和Watch方法的对象都可以成为ListerWatcher。Reflector对象通过Run函数启动监控并处理监控事件。
源码路径:k8s.io\client-go\tools\cache\reflector.go
ListAndWatch函数实现分为两部分:①获取资源列表数据;②监控资源对象
2.1 获取资源列表数据
ListAndWatch中List在第一次运行时会获取该资源下所有的对象数据并将其存储至DeltaFIFIO中。以练习代码为例,流程图如下:
源码路径:k8s.io\client-go\tools\cache\reflector.go
而r.listerWatcher.List(opts)函数实际调用的时NewReflector实例化对象时传入的ListerWatcher对象的ListFunc函数;代码如下:
源码路径:k8s.io\client-go\tools\cache\listwatch.go
然后查看PodInformer创建时传入的ListFunc代码如下:
源码路径:k8s.io\client-go\informers\core\v1\pod.go
2.2 监控资源对象
Watch(监控)操作通过HTTP协议与kubernetes api server建立长连接,接受api server发来的资源变更事件,Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Encoding)。当client-go调用api-server时,apiserver在Response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便于服务端进行连接,并等待下一个数据块(即资源的事件信息)
源码路径:k8s.io\client-go\tools\cache\reflector.go
其中,r.listerWatcher.Watch(options)跟上文的ListFunc类似,在创建Informer的时候将WatchFunc函数作为参数传入。代码如下:
源码路径:k8s.io\client-go\informers\core\v1\pod.go
r.watchHandler用于处理资源的变更事件。当触发Added等事件时,将对应资源对象更新到本地缓存DeltaFIFO中并更新ResourceVersion资源版本号。代码如下:
源码路径:k8s.io\client-go\tools\cache\reflector.go
3. DeltaFIFO
DeltaFIFO可以分开理解,FIFO是一个先进先出队列,Delta是一个资源对象存储,可以保存资源对象的操作类型。其结构代码如下:
源码路径:k8s.io\client-go\tools\cache\delta_fifo.go
DeltaFIFO与其他队列最大的不同是,它会保留所有关于资源对象的操作类型,队列中会存在拥有不同操作类型的同一个资源对象,消费者在处理该资源对象时能够了解该资源对象所发生的事情。queue字段存储资源对象的key,该key是通过KeyOf函数计算而来。items字段通过map数据结构的方式存储,value存储的是对象的Delta数组。结构如下图所示:
3.1 生产者方法
DeltarFIFO队列中的资源对象在触发的事件中都调用了queueActionLocked函数,它是DeltaFIFO实现的关键,代码如下:
根据之前代码继续查看r.store.Add(event.Object)代码,最终调用了f.queueActionLocked(Added, obj)方法:
源码路径:k8s.io\client-go\tools\cache\delta_fifo.go
3.2 消费者方法
Pop方法作为消费者方法使用,从DeltaFIFO的头部取出最早进入队列中的资源对象数据。Pop方法需要传入process函数,用于接收并处理对象的回调方法,代码如下:
源码路径:k8s.io\client-go\tools\cache\delta_fifo.go
controller的processLoop方法负责从DeltaFIFO队列中取出数据传给process回调函数;查看informer调用时的process函数,根据informer.Run()可知,初始化回调函数为HandleDeltas
源码路径:k8s.io\client-go\tools\cache\shared_informer.go
HandleDeltas函数作为process回调函数,当资源对象的操作类型为Added、Updated、Deleted时,将该资源对象存储至Indexer(并发安全存储),并通过distribute函数将资源对象分发至SharedInformer。在开始的代码练习中,我们通过informer.AddEventHandler函数添加了对资源事件的处理函数,distribute函数则将资源对象分发到该事件处理函数中。
3.3 Resync机制
Resync机制会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型,Resync函数在Reflector中定时执行,执行周期由NewRelector函数传入的resyncPeriod参数设定。
f.knownObjects.GetByKey(key)是Indexer本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象,Indexer对象在NewDeltaFIFO函数实例化DeltaFIFO对象时传入。syncKeyLocked通过调用f.queueActionLocked(Sync, obj)函数实现Resync。
4. Indexer
Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。【Indexer中的数据与Etcd集群中的数据保持完全一致。】client-go可以很方便的从本地存储中读取资源,而不需要从远程的Etcd中读取,减轻了apiserver和etcd集群的压力。
下图为Indexer的存储结构图:
ThreadSafeMap是实现并发安全的存储,Indexer在ThreadSafeMap的基础上进行封装,它继承了与ThreadSafeMap相关的操作方法并实现了Indexer Func等功能,例如Index、IndexKeys、GetIndexers等方法,这些方法为ThreadSafeMap提供了索引功能。
4.1 ThreadSafeMap并发安全存储
ThreadSafeMap是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增删改查操作都会加锁,以保证数据的一致性。ThreadSafeMap将资源对象数据存储一个map数据结构中,其结构代码示例如下:
源码路径:k8s.io\client-go\tools\cache\thread_safe_store.go
items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得到,计算默认使用MetaNamespaceKeyFunc函数,该函数根据资源对象计算出/格式的key,如果资源对象的为空,则作为key,而items的value用于存储资源对象。
4.2 Indexer索引器
每次增删改ThreadSafeMap数据时,都会使用updateIndices或deleteFromIndices函数变更Indexer。Indexer被设计为可以自定义索引函数,它有4个非常重要的数据结构,分别是Indices、Index、Indexers及IndexFunc。
通过一个代码示例理解下Indexer,代码如下:
上述代码中涉及了4个重要的数据结构,分别是Indexers,IndexFunc,Indices,Index;数据结构如下:
源码路径:k8s.io\client-go\tools\cache\index.go
其中,各个数据结构介绍如下:
Indexers:存储索引器,key为索引器名称,value为索引器实现的函数。
IndexFunc:索引器函数,定义一个接收资源对象,返回检索列表的函数。
Indices:存储缓存器,key为缓存器名称(在代码示例中,缓存器命名与索引器命名相对应),value为缓存数据。
Index:存储缓存数据,其结构为K/V。
4.3 Indexer索引器核心实现
index.ByIndex函数通过执行索引器函数得到索引结果,代码如下:
源码路径:k8s.io\client-go\tools\cache\thread_safe_store.go
Index中的缓存数据为Set集合数据结构,Set本质与Slice相同,但Set中不存在相同元素,而Go标准库没有提供Set数据结构,Go语言中的map结构类型是不能存在相同key的,所以kubernetes将map结构类型的key作为Set数据结构,实现Set去重特性。
Indexer的执行流程如下:
5. 总结
由Informer的架构图可知,Informer中包含Controller和Indexer,而Controller又包含Reflector和DeltaFIFO,所以总结下,创建Informer时这些组件的初始化流程,大体分为三布:
5.1 创建工厂SharedInformerFactory
首先需要通过informers.NewSharedInformerFactory(clientSet, time.Minute)创建sharedInformers工厂对象,看下创建的代码,通过调用NewSharedInformerFactoryWithOptions函数,返回sharedInformerFactory对象,代码如下:
源码路径:k8s.io\client-go\informers\factory.go
上述代码中将传入的client,defaultResync及其他参数进行默认初始化。
5.2 通过工厂创建具体资源的Informer
继续通过代码sharedInformers.Core().V1().Pods().Informer()生成具体资源的Informer,这里以Pods为例,查看Informer()代码如下:
源码路径:k8s.io\client-go\informers\core\v1\pod.go
Informer()函数调用了f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer),而f.defaultInformer函数则作为参数进行传入,继续查看InformerFor()函数代码:
源码路径:k8s.io\client-go\informers\factory.go
通过注释可知,继续初始化调用defaultInformer函数,由上代码可知defaultInformer函数,调用了NewFilteredPodInformer函数,其中传入了默认的indexer对象,NewFilteredPodInformer函数代码如下:
源码路径:k8s.io\client-go\informers\core\v1\pod.go
从代码可知,该函数通过NewSharedIndexInformer函数初始化listerWatcher及通过传入的indexers初始化indexer;indexers的初始化通过NewSharedIndexInformer函数中的NewIndexer进行,代码如下:
源码路径:k8s.io\client-go\tools\cache\shared_informer.go
5.3 执行informer.Run
已经了解,sharedIndexInformer中包含controller类型结构,下面看下Controller的类型定义:
源码路径:k8s.io\client-go\tools\cache\controller.go
可以看到,Reflector是在controller中被定义的,除此之外还有一些例如config配置及其他时钟,同步的设置。
通过前两步已经完成了listerWatcher和Indexer的初始化工作,继续执行informer.Run()运行informer;查看其代码:
源码路径:k8s.io\client-go\tools\cache\shared_informer.go
由上代码可知,在informer.Run方法中,执行了以下操作:
初始化DeltaFIFO对象
初始化Controller数据结构中的Config对象(包括:①将之前初始化到sharedIndexInformer中的lw,重新赋值,下传到config中;②初始化process回调函数为
HandleDeltas)传入config对象初始化cotroller操作
执行
s.controller.Run
由于,我们只梳理初始化流程,此时DeltaFIFO已经被初始化,继续查看s.controller.Run代码:
源码路径:k8s.io\client-go\tools\cache\controller.go
在上述代码中,看到了Reflector的初始化及将其赋值给controller对象中,并将r.Run作为参数传入运行。最后调用c.processLoop函数;代码见下:
源码路径:k8s.io\client-go\tools\cache\reflector.go
processLoop函数代码为:
源码路径:k8s.io\client-go\tools\cache\controller.go
至此,整个informer的初始化流程分析结束。
Last updated
Was this helpful?