在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,Kubernetes则通过使用cient-go的Informer机制达到这一效果。其他组件也是通过Informer机制与Kubernetes API Server进行通信的。
1. Informer机制架构设计
Informer运行原理图:
由图中所示,Informer架构设计有多个核心组件:
Reflector:用于监控(Watch)指定的Kubernetes资源,当监控的资源发生变化时触发响应的变更事件(例如Added事件、Updated事件和Deleted事件),并将其资源对象存放到本地缓存DeltaFIFO中。
DeltaFIFO:可以分开理解。FIFO是一个先进先出的队列,拥有队列操作的基本方法(Add,Update,Delete,List,Pop,Close等);Delta是一个资源对象存储,可以保存资源对象的操作类型(Added、Updated、Deleted、Sync等)
Indexer:是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer与Etcd集群中的数据完全一致。client-go可以很方便的从本地存储中读取相应的资源对象数据,无须每次从远程Etcd集群读取,减轻了api-server及Etcd集群的压力。
1.1 代码练习
Copy func TestInformer (t * testing . T ) {
config, err := clientcmd. BuildConfigFromFlags ( "" , "F:\\code\\env\\config" )
if err != nil {
panic (err)
}
// 创建clientset对象
clientSet, err := kubernetes. NewForConfig (config)
if err != nil {
panic (err)
}
//创建stopCh对象,用于在程序进程退出之前通知Informer退出,因为Informer是一个持久运行的goroutine
stopCh := make ( chan struct {})
defer close (stopCh)
//实例化ShareInformer对象,一个参数是clientset,另一个是time.Minute用于设置多久进行一次resync(重新同步)
//resync会周期性的执行List操作,将所有的资源存放在Informer Store中,如果参数为0,则禁止resync操作
sharedInformers := informers. NewSharedInformerFactory (clientSet, time.Minute)
//得到具体Pod资源的informer对象
informer := sharedInformers. Core (). V1 (). Pods (). Informer ()
// 为Pod资源添加资源事件回调方法,支持AddFunc、UpdateFunc及DeleteFunc
informer. AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc: func (obj interface {}) {
//在正常情况下,kubernetes其他组件在使用Informer机制时触发资源事件回调方法,将资源对象推送到WorkQueue或其他队列中,
//这里是直接输出触发的资源事件
myObj := obj.( metav1 . Object )
log. Printf ( "New Pod Added to Store: %s " , myObj. GetName ())
},
UpdateFunc: func (oldObj, newObj interface {}) {
oObj := oldObj.( metav1 . Object )
nObj := newObj.( metav1 . Object )
log. Printf ( " %s Pod Updated to %s " , oObj. GetName (), nObj. GetName ())
},
DeleteFunc: func (obj interface {}) {
myObj := obj.( metav1 . Object )
log. Printf ( "Pod Deleted from Store: %s " , myObj. GetName ())
},
})
//通过Run函数运行当前Informer,内部为Pod资源类型创建Informer
informer. Run (stopCh)
}
1.2 资源Informer
kubernetes上的每一个资源都实现了Informer机制,每一个Informer上都会实现Informer和Lister方法,下面是PodInformer的接口,
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy // PodInformer provides access to a shared informer and lister for
// Pods.
type PodInformer interface {
Informer () cache . SharedIndexInformer
Lister () v1 . PodLister
}
定义不同资源的Informer,允许监控不同资源的事件,例如监听Node资源对象的informer如下:
Copy nodeInformer := sharedInformers. Node (). V1beta1 (). RuntimeClasses (). Informer ()
1.3 Shared Informer共享机制
Informer也被称为Shared Informer,它是可以共享使用的。若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多的相同ListAndWatch ,太多重复的序列化和反序列化操作会导致api-server负载过重。
Shared Informer可以使同一个资源对象共享一个Reflector,这样可以节约很多资源;Shared Infor定义了一个map数据结构,通过map数据结构实现共享Informer机制。
源码路径 :k8s.io\client-go\informers\factory.go
Copy type sharedInformerFactory struct {
......
informers map [ reflect . Type ] cache . SharedIndexInformer
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
......
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
......
f.informers[informerType] = informer
return informer
}
sharedInformerFactory中的informers字段中存储了资源类型和对应于sharedindexInformer的映射关系,此关系是通过调用InformerFor函数实现,在添加过程中,如果已经存在同类型的Informer,则直接返回,不在继续添加。代码如下:
源码路径 :k8s.io\client-go\informers\factory.go
Copy // InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if ! exists {
resyncPeriod = f.defaultResync
}
informer = newFunc (f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
最后,通过Informer的Start方法使f.informers中的每个informer通过goroutine持久运行,并将其运行状态设置为true:
源码路径 :k8s.io\client-go\informers\factory.go
Copy // Start initializes all requested informers.
func (f * sharedInformerFactory ) Start (stopCh <-chan struct {}) {
f.lock. Lock ()
defer f.lock. Unlock ()
for informerType, informer := range f.informers {
if ! f.startedInformers[informerType] {
// 创建goroutine运行
go informer. Run (stopCh)
// 运行后将该类型的informer运行状态设置为true
f.startedInformers[informerType] = true
}
}
}
2. Reflector
Informer对kubernetes api sever的资源(内置及CRD)执行监控(Watch)操作,最核心的功能是Reflector。Reflector用于监控指定的kubernetes资源,当监控的资源发生变化时,触发相应的变更事件,并将其资源对象存放在本地缓存DeltaFIFO中。
通过NewReflector进行对象实例化,实例化过程中须传入ListerWatcher数据接口对象,它拥有List和Watch方法,用于获取及监控资源列表。只要是实现了List和Watch方法的对象都可以成为ListerWatcher。Reflector对象通过Run函数启动监控并处理监控事件。
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector (lw ListerWatcher , expectedType interface {}, store Store , resyncPeriod time . Duration ) * Reflector {
return NewNamedReflector (naming. GetNameFromCallsite (internalPackages ... ), lw, expectedType, store, resyncPeriod)
}
ListAndWatch函数实现分为两部分:①获取资源列表数据;②监控资源对象
2.1 获取资源列表数据
ListAndWatch中List在第一次运行时会获取该资源下所有的对象数据并将其存储至DeltaFIFIO中。以练习代码为例,流程图如下:
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r * Reflector ) ListAndWatch (stopCh <-chan struct {}) error {
......
if err := func () error {
......
go func () {
......
pager := pager. New (pager. SimplePageFunc ( func (opts metav1 . ListOptions ) ( runtime . Object , error ) {
return r.listerWatcher. List (opts)
}))
......
}
......
resourceVersion = listMetaInterface. GetResourceVersion ()
initTrace. Step ( "Resource version extracted" )
items, err := meta. ExtractList (list)
......
if err := r. syncWith (items, resourceVersion); err != nil {
return fmt. Errorf ( "unable to sync list result: %v " , err)
}
initTrace. Step ( "SyncWith done" )
r. setLastSyncResourceVersion (resourceVersion)
initTrace. Step ( "Resource version updated" )
return nil
}(); err != nil {
return err
}
......
}
而r.listerWatcher.List(opts)函数实际调用的时NewReflector实例化对象时传入的ListerWatcher对象的ListFunc函数;代码如下:
源码路径 :k8s.io\client-go\tools\cache\listwatch.go
Copy // List a set of apiserver resources
func (lw * ListWatch ) List (options metav1 . ListOptions ) ( runtime . Object , error ) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
return lw. ListFunc (options)
}
然后查看PodInformer创建时传入的ListFunc代码如下:
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache. NewSharedIndexInformer (
& cache . ListWatch {
ListFunc: func (options metav1 . ListOptions ) ( runtime . Object , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. CoreV1 (). Pods (namespace). List (context. TODO (), options)
},
......
},
& corev1 . Pod {},
resyncPeriod,
indexers,
)
}
2.2 监控资源对象
Watch(监控)操作通过HTTP协议与kubernetes api server建立长连接,接受api server发来的资源变更事件,Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Encoding)。当client-go调用api-server时,apiserver在Response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便于服务端进行连接,并等待下一个数据块(即资源的事件信息)
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy func (r * Reflector ) ListAndWatch (stopCh <-chan struct {}) error {
......
options = metav1 . ListOptions {
ResourceVersion: resourceVersion,
TimeoutSeconds: & timeoutSeconds,
AllowWatchBookmarks: true ,
}
w, err := r.listerWatcher. Watch (options)
......
if err := r. watchHandler (start, w, & resourceVersion, resyncerrc, stopCh); err != nil {
......
return nil
}
}
}
其中,r.listerWatcher.Watch(options)跟上文的ListFunc类似,在创建Informer的时候将WatchFunc函数作为参数传入。代码如下:
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy ......
WatchFunc: func (options metav1 . ListOptions ) ( watch . Interface , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. CoreV1 (). Pods (namespace). Watch (context. TODO (), options)
},
......
r.watchHandler用于处理资源的变更事件。当触发Added等事件时,将对应资源对象更新到本地缓存DeltaFIFO中并更新ResourceVersion资源版本号。代码如下:
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
......
for {
select {
......
case event, ok := <- w. ResultChan ():
......
switch event.Type {
case watch.Added:
err := r.store. Add (event.Object)
......
case watch.Modified:
err := r.store. Update (event.Object)
......
case watch.Deleted:
err := r.store. Delete (event.Object)
......
default :
......
* resourceVersion = newResourceVersion
r. setLastSyncResourceVersion (newResourceVersion)
eventCount ++
}
}
......
}
3. DeltaFIFO
DeltaFIFO可以分开理解,FIFO是一个先进先出队列,Delta是一个资源对象存储,可以保存资源对象的操作类型。其结构代码如下:
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy type DeltaFIFO struct {
......
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
items map [ string ] Deltas
queue [] string
......
}
type Deltas [] Delta
DeltaFIFO与其他队列最大的不同是,它会保留所有关于资源对象的操作类型,队列中会存在拥有不同操作类型的同一个资源对象,消费者在处理该资源对象时能够了解该资源对象所发生的事情。queue字段存储资源对象的key,该key是通过KeyOf函数计算而来。items字段通过map数据结构的方式存储,value存储的是对象的Delta数组。结构如下图所示:
3.1 生产者方法
DeltarFIFO队列中的资源对象在触发的事件中都调用了queueActionLocked函数,它是DeltaFIFO实现的关键,代码如下:
根据之前代码继续查看r.store.Add(event.Object)
代码,最终调用了f.queueActionLocked(Added, obj)
方法:
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy // Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f * DeltaFIFO ) Add (obj interface {}) error {
f.lock. Lock ()
defer f.lock. Unlock ()
f.populated = true
return f. queueActionLocked (Added, obj)
}
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f * DeltaFIFO ) queueActionLocked (actionType DeltaType , obj interface {}) error {
//计算资源对象的key
id, err := f. KeyOf (obj)
if err != nil {
return KeyError {obj, err}
}
// 将actionType和资源对象构造成Delta,添加到istems中
newDeltas := append (f.items[id], Delta {actionType, obj})
//通过dedupDeltas函数合并去重(针对连续两次删除操作进行合并)
newDeltas = dedupDeltas (newDeltas)
if len (newDeltas) > 0 {
if _, exists := f.items[id]; ! exists {
f.queue = append (f.queue, id)
}
//更新构造后的Delta
f.items[id] = newDeltas
//通过cond.Broadcast通知所有消费者解除阻塞
f.cond. Broadcast ()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete (f.items, id)
}
return nil
}
3.2 消费者方法
Pop方法作为消费者方法使用,从DeltaFIFO的头部取出最早进入队列中的资源对象数据。Pop方法需要传入process函数,用于接收并处理对象的回调方法,代码如下:
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy func (f * DeltaFIFO ) Pop (process PopProcessFunc ) ( interface {}, error ) {
f.lock. Lock ()
defer f.lock. Unlock ()
for {
for len (f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil , ErrFIFOClosed
}
//如果队列中没有数据,将阻塞等待,直到收到cond.Broadcast,说明有数据添加
f.cond. Wait ()
}
id := f.queue[ 0 ]
f.queue = f.queue[ 1 :]
if f.initialPopulationCount > 0 {
f.initialPopulationCount --
}
//通过id获取到item
item, ok := f.items[id]
if ! ok {
// Item may have been deleted subsequently.
continue
}
//从队列中移除
delete (f.items, id)
//将该对象传入process回调函数中,由上层消费者处理
err := process (item)
if e, ok := err.( ErrRequeue ); ok {
//如果回调函数处理出错,则该对象重新存入队列
f. addIfNotPresent (id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
controller的processLoop方法负责从DeltaFIFO队列中取出数据传给process回调函数;查看informer调用时的process函数,根据informer.Run()
可知,初始化回调函数为HandleDeltas
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func (s * sharedIndexInformer ) HandleDeltas (obj interface {}) error {
s.blockDeltas. Lock ()
defer s.blockDeltas. Unlock ()
// from oldest to newest
for _, d := range obj.( Deltas ) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector. AddObject (d.Object)
if old, exists, err := s.indexer. Get (d.Object); err == nil && exists {
if err := s.indexer. Update (d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta. Accessor (d.Object); err == nil {
if oldAccessor, err := meta. Accessor (old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor. GetResourceVersion () == oldAccessor. GetResourceVersion ()
}
}
}
s.processor. distribute ( updateNotification {oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer. Add (d.Object); err != nil {
return err
}
s.processor. distribute ( addNotification {newObj: d.Object}, false )
}
case Deleted:
if err := s.indexer. Delete (d.Object); err != nil {
return err
}
s.processor. distribute ( deleteNotification {oldObj: d.Object}, false )
}
}
return nil
}
HandleDeltas函数作为process回调函数,当资源对象的操作类型为Added、Updated、Deleted时,将该资源对象存储至Indexer(并发安全存储),并通过distribute函数将资源对象分发至SharedInformer。在开始的代码练习中,我们通过informer.AddEventHandler函数添加了对资源事件的处理函数,distribute函数则将资源对象分发到该事件处理函数中。
3.3 Resync机制
Resync机制会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型,Resync函数在Reflector中定时执行,执行周期由NewRelector函数传入的resyncPeriod参数设定。
Copy **源码路径**:k8s.io\client-go\tools\cache\reflector.go
```go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
......
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
//执行Resync操作
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
......
}
Copy **源码路径**:k8s.io\client-go\tools\cache\delta_fifo.go
```go
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
.......
id, err := f.KeyOf(obj)
......
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
f.knownObjects.GetByKey(key)
是Indexer本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象,Indexer对象在NewDeltaFIFO
函数实例化DeltaFIFO对象时传入。syncKeyLocked
通过调用f.queueActionLocked(Sync, obj)
函数实现Resync。
4. Indexer
Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。【Indexer中的数据与Etcd集群中的数据保持完全一致。】client-go可以很方便的从本地存储中读取资源,而不需要从远程的Etcd中读取,减轻了apiserver和etcd集群的压力。
下图为Indexer的存储结构图:
ThreadSafeMap是实现并发安全的存储,Indexer在ThreadSafeMap的基础上进行封装,它继承了与ThreadSafeMap相关的操作方法并实现了Indexer Func等功能,例如Index、IndexKeys、GetIndexers等方法,这些方法为ThreadSafeMap提供了索引功能。
4.1 ThreadSafeMap并发安全存储
ThreadSafeMap是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增删改查操作都会加锁,以保证数据的一致性。ThreadSafeMap将资源对象数据存储一个map数据结构中,其结构代码示例如下:
源码路径 :k8s.io\client-go\tools\cache\thread_safe_store.go
Copy // threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync . RWMutex
items map [ string ] interface {}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得到,计算默认使用MetaNamespaceKeyFunc函数,该函数根据资源对象计算出/格式的key,如果资源对象的为空,则作为key,而items的value用于存储资源对象。
4.2 Indexer索引器
每次增删改ThreadSafeMap数据时,都会使用updateIndices或deleteFromIndices函数变更Indexer。Indexer被设计为可以自定义索引函数,它有4个非常重要的数据结构,分别是Indices、Index、Indexers及IndexFunc。
通过一个代码示例理解下Indexer,代码如下:
Copy //UserIndexFunc 是一个索引器函数
func UserIndexFunc (obj interface {}) ([] string , error ) {
pod := obj.( * corev1 . Pod )
userString := pod.Annotations[ "users" ]
return strings. Split (userString, "," ), nil
}
func TestIndexer (t * testing . T ) {
//实例化Indexer对象,
//第一个参数为KeyFunc,用于计算资源对象的key,默认使用cache.MetaNamespaceKeyFunc
//第二个参数是cache.Indexers,用于定义索引器,其中key为索引器的名称(byUser),value为索引器
index := cache. NewIndexer (cache.MetaNamespaceKeyFunc, cache . Indexers { "byUser" : UserIndexFunc})
//创建三个pod资源对象
pod1 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
//添加3个Pod资源对象
index. Add (pod1)
index. Add (pod2)
index. Add (pod3)
//通过index.Byindex函数查询byUser索引器下匹配ernie字段的Pod列表
erniePods, err := index. ByIndex ( "byUser" , "ernie" )
if err != nil {
panic (err)
}
for _, erniePod := range erniePods {
fmt. Println (erniePod.( * corev1 . Pod ).Name)
}
}
上述代码中涉及了4个重要的数据结构,分别是Indexers,IndexFunc,Indices,Index;数据结构如下:
源码路径 :k8s.io\client-go\tools\cache\index.go
Copy // Indexers maps a name to a IndexFunc
type Indexers map [ string ] IndexFunc
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func (obj interface {}) ([] string , error )
// Indices maps a name to an Index
type Indices map [ string ] Index
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map [ string ] sets . String
其中,各个数据结构介绍如下:
Indexers:存储索引器,key为索引器名称,value为索引器实现的函数。
IndexFunc:索引器函数,定义一个接收资源对象,返回检索列表的函数。
Indices:存储缓存器,key为缓存器名称(在代码示例中,缓存器命名与索引器命名相对应),value为缓存数据。
4.3 Indexer索引器核心实现
index.ByIndex函数通过执行索引器函数得到索引结果,代码如下:
源码路径 :k8s.io\client-go\tools\cache\thread_safe_store.go
Copy // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c * threadSafeMap ) ByIndex (indexName, indexedValue string ) ([] interface {}, error ) {
......
//根据indexName获取定义的indexFunc函数
indexFunc := c.indexers[indexName]
......
//从indices中查找指定的缓存器函数
index := c.indices[indexName]
//根据需要检索的indexKey从缓存数据中查找数据并返回
set := index[indexedValue]
list := make ([] interface {}, 0 , set. Len ())
for key := range set {
list = append (list, c.items[key])
}
return list, nil
}
Index中的缓存数据为Set集合数据结构,Set本质与Slice相同,但Set中不存在相同元素,而Go标准库没有提供Set数据结构,Go语言中的map结构类型是不能存在相同key的,所以kubernetes将map结构类型的key作为Set数据结构,实现Set去重特性。
Indexer的执行流程如下:
5. 总结
由Informer的架构图可知,Informer中包含Controller和Indexer,而Controller又包含Reflector和DeltaFIFO,所以总结下,创建Informer时这些组件的初始化流程,大体分为三布:
5.1 创建工厂SharedInformerFactory
首先需要通过informers.NewSharedInformerFactory(clientSet, time.Minute)
创建sharedInformers工厂对象,看下创建的代码,通过调用NewSharedInformerFactoryWithOptions
函数,返回sharedInformerFactory对象,代码如下:
源码路径 :k8s.io\client-go\informers\factory.go
Copy // NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := & sharedInformerFactory {
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make ( map [ reflect . Type ] cache . SharedIndexInformer ),
startedInformers: make ( map [ reflect . Type ] bool ),
customResync: make ( map [ reflect . Type ] time . Duration ),
}
// Apply all options
for _, opt := range options {
factory = opt (factory)
}
return factory
}
上述代码中将传入的client,defaultResync及其他参数进行默认初始化。
5.2 通过工厂创建具体资源的Informer
继续通过代码sharedInformers.Core().V1().Pods().Informer()
生成具体资源的Informer,这里以Pods为例,查看Informer()
代码如下:
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func (f * podInformer ) Informer () cache . SharedIndexInformer {
return f.factory. InformerFor ( & corev1 . Pod {}, f.defaultInformer)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
Informer()
函数调用了f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
,而f.defaultInformer
函数则作为参数进行传入,继续查看InformerFor()
函数代码:
源码路径 :k8s.io\client-go\informers\factory.go
Copy // InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
//获取对象类型作为informers存储的key
informerType := reflect. TypeOf (obj)
//如果存在就直接返回
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if ! exists {
resyncPeriod = f.defaultResync
}
//若不存在,则利用刚刚传入的defaultInformer函数进行初始化
informer = newFunc (f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
通过注释可知,继续初始化调用defaultInformer
函数,由上代码可知defaultInformer
函数,调用了NewFilteredPodInformer
函数,其中传入了默认的indexer对象,NewFilteredPodInformer
函数代码如下:
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache. NewSharedIndexInformer (
& cache . ListWatch {
ListFunc: func (options metav1 . ListOptions ) ( runtime . Object , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. CoreV1 (). Pods (namespace). List (context. TODO (), options)
},
WatchFunc: func (options metav1 . ListOptions ) ( watch . Interface , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. CoreV1 (). Pods (namespace). Watch (context. TODO (), options)
},
},
& corev1 . Pod {},
resyncPeriod,
indexers,
)
}
从代码可知,该函数通过NewSharedIndexInformer
函数初始化listerWatcher
及通过传入的indexers初始化indexer;indexers的初始化通过NewSharedIndexInformer
函数中的NewIndexer
进行,代码如下:
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := & clock . RealClock {}
sharedIndexInformer := & sharedIndexInformer {
processor: & sharedProcessor {clock: realClock},
indexer: NewIndexer (DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector (fmt. Sprintf ( " %T " , exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
5.3 执行informer.Run
已经了解,sharedIndexInformer中包含controller类型结构,下面看下Controller的类型定义:
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy // `*controller` implements Controller
type controller struct {
config Config
reflector * Reflector
reflectorMutex sync . RWMutex
clock clock . Clock
}
可以看到,Reflector是在controller中被定义的,除此之外还有一些例如config配置及其他时钟,同步的设置。
通过前两步已经完成了listerWatcher
和Indexer
的初始化工作,继续执行informer.Run()
运行informer;查看其代码:
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func (s * sharedIndexInformer ) Run (stopCh <-chan struct {}) {
defer utilruntime. HandleCrash ()
//初始化DeltaFIFO对象
fifo := NewDeltaFIFOWithOptions ( DeltaFIFOOptions {
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true ,
})
//初始化Controller数据结构中的Config对象
cfg := & Config {
Queue: fifo,
//将之前初始化到sharedIndexInformer中的lw,重新赋值,下传到config中
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false ,
ShouldResync: s.processor.shouldResync,
//这里初始化process回调函数为HandleDeltas
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func () {
s.startedLock. Lock ()
defer s.startedLock. Unlock ()
// 在这里进行cotroller的初始化操作
s.controller = New (cfg)
s.controller.( * controller ).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct {})
var wg wait . Group
defer wg. Wait () // Wait for Processor to stop
defer close (processorStopCh) // Tell Processor to stop
wg. StartWithChannel (processorStopCh, s.cacheMutationDetector.Run)
wg. StartWithChannel (processorStopCh, s.processor.run)
defer func () {
s.startedLock. Lock ()
defer s.startedLock. Unlock ()
s.stopped = true // Don't want any new listeners
}()
//执行controller.Run将其运行
s.controller. Run (stopCh)
}
由上代码可知,在informer.Run
方法中,执行了以下操作:
初始化Controller数据结构中的Config对象(包括:①将之前初始化到sharedIndexInformer中的lw,重新赋值,下传到config中;②初始化process回调函数为HandleDeltas
)
由于,我们只梳理初始化流程,此时DeltaFIFO已经被初始化,继续查看s.controller.Run
代码:
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy func (c * controller ) Run (stopCh <-chan struct {}) {
defer utilruntime. HandleCrash ()
go func () {
<- stopCh
c.config.Queue. Close ()
}()
//初始化Reflector对象
r := NewReflector (
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex. Lock ()
//将完成初始化的Reflector对象r赋值给c.reflector
c.reflector = r
c.reflectorMutex. Unlock ()
var wg wait . Group
defer wg. Wait ()
//r,Run将该函数作为参数传入运行
wg. StartWithChannel (stopCh, r.Run)
wait. Until (c.processLoop, time.Second, stopCh)
}
在上述代码中,看到了Reflector的初始化及将其赋值给controller对象中,并将r.Run作为参数传入运行。最后调用c.processLoop
函数;代码见下:
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r * Reflector ) Run (stopCh <-chan struct {}) {
klog. V ( 2 ). Infof ( "Starting reflector %s ( %s ) from %s " , r.expectedTypeName, r.resyncPeriod, r.name)
wait. BackoffUntil ( func () {
if err := r. ListAndWatch (stopCh); err != nil {
r. watchErrorHandler (r, err)
}
}, r.backoffManager, true , stopCh)
klog. V ( 2 ). Infof ( "Stopping reflector %s ( %s ) from %s " , r.expectedTypeName, r.resyncPeriod, r.name)
}
processLoop函数代码为:
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy func (c * controller ) processLoop () {
for {
obj, err := c.config.Queue. Pop ( PopProcessFunc (c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue. AddIfNotPresent (obj)
}
}
}
}
至此,整个informer的初始化流程分析结束。