Informer机制

在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,Kubernetes则通过使用cient-go的Informer机制达到这一效果。其他组件也是通过Informer机制与Kubernetes API Server进行通信的。

1. Informer机制架构设计

Informer运行原理图:

由图中所示,Informer架构设计有多个核心组件:

  • Reflector:用于监控(Watch)指定的Kubernetes资源,当监控的资源发生变化时触发响应的变更事件(例如Added事件、Updated事件和Deleted事件),并将其资源对象存放到本地缓存DeltaFIFO中。

  • DeltaFIFO:可以分开理解。FIFO是一个先进先出的队列,拥有队列操作的基本方法(Add,Update,Delete,List,Pop,Close等);Delta是一个资源对象存储,可以保存资源对象的操作类型(Added、Updated、Deleted、Sync等)

  • Indexer:是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer与Etcd集群中的数据完全一致。client-go可以很方便的从本地存储中读取相应的资源对象数据,无须每次从远程Etcd集群读取,减轻了api-server及Etcd集群的压力。

1.1 代码练习

1.2 资源Informer

kubernetes上的每一个资源都实现了Informer机制,每一个Informer上都会实现Informer和Lister方法,下面是PodInformer的接口,

源码路径:k8s.io\client-go\informers\core\v1\pod.go

定义不同资源的Informer,允许监控不同资源的事件,例如监听Node资源对象的informer如下:

1.3 Shared Informer共享机制

Informer也被称为Shared Informer,它是可以共享使用的。若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多的相同ListAndWatch ,太多重复的序列化和反序列化操作会导致api-server负载过重。

Shared Informer可以使同一个资源对象共享一个Reflector,这样可以节约很多资源;Shared Infor定义了一个map数据结构,通过map数据结构实现共享Informer机制。

源码路径:k8s.io\client-go\informers\factory.go

sharedInformerFactory中的informers字段中存储了资源类型和对应于sharedindexInformer的映射关系,此关系是通过调用InformerFor函数实现,在添加过程中,如果已经存在同类型的Informer,则直接返回,不在继续添加。代码如下:

源码路径:k8s.io\client-go\informers\factory.go

最后,通过Informer的Start方法使f.informers中的每个informer通过goroutine持久运行,并将其运行状态设置为true:

源码路径:k8s.io\client-go\informers\factory.go

2. Reflector

Informer对kubernetes api sever的资源(内置及CRD)执行监控(Watch)操作,最核心的功能是Reflector。Reflector用于监控指定的kubernetes资源,当监控的资源发生变化时,触发相应的变更事件,并将其资源对象存放在本地缓存DeltaFIFO中。

通过NewReflector进行对象实例化,实例化过程中须传入ListerWatcher数据接口对象,它拥有List和Watch方法,用于获取及监控资源列表。只要是实现了List和Watch方法的对象都可以成为ListerWatcher。Reflector对象通过Run函数启动监控并处理监控事件。

源码路径:k8s.io\client-go\tools\cache\reflector.go

ListAndWatch函数实现分为两部分:①获取资源列表数据;②监控资源对象

2.1 获取资源列表数据

ListAndWatch中List在第一次运行时会获取该资源下所有的对象数据并将其存储至DeltaFIFIO中。以练习代码为例,流程图如下:

源码路径:k8s.io\client-go\tools\cache\reflector.go

而r.listerWatcher.List(opts)函数实际调用的时NewReflector实例化对象时传入的ListerWatcher对象的ListFunc函数;代码如下:

源码路径:k8s.io\client-go\tools\cache\listwatch.go

然后查看PodInformer创建时传入的ListFunc代码如下:

源码路径:k8s.io\client-go\informers\core\v1\pod.go

2.2 监控资源对象

Watch(监控)操作通过HTTP协议与kubernetes api server建立长连接,接受api server发来的资源变更事件,Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Encoding)。当client-go调用api-server时,apiserver在Response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便于服务端进行连接,并等待下一个数据块(即资源的事件信息)

源码路径:k8s.io\client-go\tools\cache\reflector.go

其中,r.listerWatcher.Watch(options)跟上文的ListFunc类似,在创建Informer的时候将WatchFunc函数作为参数传入。代码如下:

源码路径:k8s.io\client-go\informers\core\v1\pod.go

r.watchHandler用于处理资源的变更事件。当触发Added等事件时,将对应资源对象更新到本地缓存DeltaFIFO中并更新ResourceVersion资源版本号。代码如下:

源码路径:k8s.io\client-go\tools\cache\reflector.go

3. DeltaFIFO

DeltaFIFO可以分开理解,FIFO是一个先进先出队列,Delta是一个资源对象存储,可以保存资源对象的操作类型。其结构代码如下:

源码路径:k8s.io\client-go\tools\cache\delta_fifo.go

DeltaFIFO与其他队列最大的不同是,它会保留所有关于资源对象的操作类型,队列中会存在拥有不同操作类型的同一个资源对象,消费者在处理该资源对象时能够了解该资源对象所发生的事情。queue字段存储资源对象的key,该key是通过KeyOf函数计算而来。items字段通过map数据结构的方式存储,value存储的是对象的Delta数组。结构如下图所示:

3.1 生产者方法

DeltarFIFO队列中的资源对象在触发的事件中都调用了queueActionLocked函数,它是DeltaFIFO实现的关键,代码如下:

根据之前代码继续查看r.store.Add(event.Object)代码,最终调用了f.queueActionLocked(Added, obj)方法:

源码路径:k8s.io\client-go\tools\cache\delta_fifo.go

3.2 消费者方法

Pop方法作为消费者方法使用,从DeltaFIFO的头部取出最早进入队列中的资源对象数据。Pop方法需要传入process函数,用于接收并处理对象的回调方法,代码如下:

源码路径:k8s.io\client-go\tools\cache\delta_fifo.go

controller的processLoop方法负责从DeltaFIFO队列中取出数据传给process回调函数;查看informer调用时的process函数,根据informer.Run()可知,初始化回调函数为HandleDeltas

源码路径:k8s.io\client-go\tools\cache\shared_informer.go

HandleDeltas函数作为process回调函数,当资源对象的操作类型为Added、Updated、Deleted时,将该资源对象存储至Indexer(并发安全存储),并通过distribute函数将资源对象分发至SharedInformer。在开始的代码练习中,我们通过informer.AddEventHandler函数添加了对资源事件的处理函数,distribute函数则将资源对象分发到该事件处理函数中。

3.3 Resync机制

Resync机制会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型,Resync函数在Reflector中定时执行,执行周期由NewRelector函数传入的resyncPeriod参数设定。

f.knownObjects.GetByKey(key)是Indexer本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象,Indexer对象在NewDeltaFIFO函数实例化DeltaFIFO对象时传入。syncKeyLocked通过调用f.queueActionLocked(Sync, obj)函数实现Resync。

4. Indexer

Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。【Indexer中的数据与Etcd集群中的数据保持完全一致。】client-go可以很方便的从本地存储中读取资源,而不需要从远程的Etcd中读取,减轻了apiserver和etcd集群的压力。

下图为Indexer的存储结构图:

ThreadSafeMap是实现并发安全的存储,Indexer在ThreadSafeMap的基础上进行封装,它继承了与ThreadSafeMap相关的操作方法并实现了Indexer Func等功能,例如Index、IndexKeys、GetIndexers等方法,这些方法为ThreadSafeMap提供了索引功能。

4.1 ThreadSafeMap并发安全存储

ThreadSafeMap是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增删改查操作都会加锁,以保证数据的一致性。ThreadSafeMap将资源对象数据存储一个map数据结构中,其结构代码示例如下:

源码路径:k8s.io\client-go\tools\cache\thread_safe_store.go

items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得到,计算默认使用MetaNamespaceKeyFunc函数,该函数根据资源对象计算出/格式的key,如果资源对象的为空,则作为key,而items的value用于存储资源对象。

4.2 Indexer索引器

每次增删改ThreadSafeMap数据时,都会使用updateIndices或deleteFromIndices函数变更Indexer。Indexer被设计为可以自定义索引函数,它有4个非常重要的数据结构,分别是Indices、Index、Indexers及IndexFunc。

通过一个代码示例理解下Indexer,代码如下:

上述代码中涉及了4个重要的数据结构,分别是Indexers,IndexFunc,Indices,Index;数据结构如下:

源码路径:k8s.io\client-go\tools\cache\index.go

其中,各个数据结构介绍如下:

  • Indexers:存储索引器,key为索引器名称,value为索引器实现的函数。

  • IndexFunc:索引器函数,定义一个接收资源对象,返回检索列表的函数。

  • Indices:存储缓存器,key为缓存器名称(在代码示例中,缓存器命名与索引器命名相对应),value为缓存数据。

  • Index:存储缓存数据,其结构为K/V。

4.3 Indexer索引器核心实现

index.ByIndex函数通过执行索引器函数得到索引结果,代码如下:

源码路径:k8s.io\client-go\tools\cache\thread_safe_store.go

Index中的缓存数据为Set集合数据结构,Set本质与Slice相同,但Set中不存在相同元素,而Go标准库没有提供Set数据结构,Go语言中的map结构类型是不能存在相同key的,所以kubernetes将map结构类型的key作为Set数据结构,实现Set去重特性。

Indexer的执行流程如下:

5. 总结

由Informer的架构图可知,Informer中包含Controller和Indexer,而Controller又包含Reflector和DeltaFIFO,所以总结下,创建Informer时这些组件的初始化流程,大体分为三布:

5.1 创建工厂SharedInformerFactory

首先需要通过informers.NewSharedInformerFactory(clientSet, time.Minute)创建sharedInformers工厂对象,看下创建的代码,通过调用NewSharedInformerFactoryWithOptions函数,返回sharedInformerFactory对象,代码如下:

源码路径:k8s.io\client-go\informers\factory.go

上述代码中将传入的client,defaultResync及其他参数进行默认初始化。

5.2 通过工厂创建具体资源的Informer

继续通过代码sharedInformers.Core().V1().Pods().Informer()生成具体资源的Informer,这里以Pods为例,查看Informer()代码如下:

源码路径:k8s.io\client-go\informers\core\v1\pod.go

Informer()函数调用了f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer),而f.defaultInformer函数则作为参数进行传入,继续查看InformerFor()函数代码:

源码路径:k8s.io\client-go\informers\factory.go

通过注释可知,继续初始化调用defaultInformer函数,由上代码可知defaultInformer函数,调用了NewFilteredPodInformer函数,其中传入了默认的indexer对象,NewFilteredPodInformer函数代码如下:

源码路径:k8s.io\client-go\informers\core\v1\pod.go

从代码可知,该函数通过NewSharedIndexInformer函数初始化listerWatcher及通过传入的indexers初始化indexer;indexers的初始化通过NewSharedIndexInformer函数中的NewIndexer进行,代码如下:

源码路径:k8s.io\client-go\tools\cache\shared_informer.go

5.3 执行informer.Run

已经了解,sharedIndexInformer中包含controller类型结构,下面看下Controller的类型定义:

源码路径:k8s.io\client-go\tools\cache\controller.go

可以看到,Reflector是在controller中被定义的,除此之外还有一些例如config配置及其他时钟,同步的设置。

通过前两步已经完成了listerWatcherIndexer的初始化工作,继续执行informer.Run()运行informer;查看其代码:

源码路径:k8s.io\client-go\tools\cache\shared_informer.go

由上代码可知,在informer.Run方法中,执行了以下操作:

  • 初始化DeltaFIFO对象

  • 初始化Controller数据结构中的Config对象(包括:①将之前初始化到sharedIndexInformer中的lw,重新赋值,下传到config中;②初始化process回调函数为HandleDeltas

  • 传入config对象初始化cotroller操作

  • 执行s.controller.Run

由于,我们只梳理初始化流程,此时DeltaFIFO已经被初始化,继续查看s.controller.Run代码:

源码路径:k8s.io\client-go\tools\cache\controller.go

在上述代码中,看到了Reflector的初始化及将其赋值给controller对象中,并将r.Run作为参数传入运行。最后调用c.processLoop函数;代码见下:

源码路径:k8s.io\client-go\tools\cache\reflector.go

processLoop函数代码为:

源码路径:k8s.io\client-go\tools\cache\controller.go

至此,整个informer的初始化流程分析结束。

Last updated

Was this helpful?