在Kubernetes系统中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,Kubernetes则通过使用cient-go的Informer机制达到这一效果。其他组件也是通过Informer机制与Kubernetes API Server进行通信的。
1. Informer机制架构设计
1.1 代码练习
Copy func TestInformer (t * testing . T ) {
config, err := clientcmd. BuildConfigFromFlags ( "" , "F:\\code\\env\\config" )
if err != nil {
panic (err)
// 创建clientset对象
clientSet, err := kubernetes. NewForConfig (config)
if err != nil {
panic (err)
stopCh := make ( chan struct {})
defer close (stopCh)
//resync会周期性的执行List操作,将所有的资源存放在Informer Store中,如果参数为0,则禁止resync操作
sharedInformers := informers. NewSharedInformerFactory (clientSet, time.Minute)
informer := sharedInformers. Core (). V1 (). Pods (). Informer ()
// 为Pod资源添加资源事件回调方法,支持AddFunc、UpdateFunc及DeleteFunc
informer. AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc: func (obj interface {}) {
myObj := obj.( metav1 . Object )
log. Printf ( "New Pod Added to Store: %s " , myObj. GetName ())
UpdateFunc: func (oldObj, newObj interface {}) {
oObj := oldObj.( metav1 . Object )
nObj := newObj.( metav1 . Object )
log. Printf ( " %s Pod Updated to %s " , oObj. GetName (), nObj. GetName ())
DeleteFunc: func (obj interface {}) {
myObj := obj.( metav1 . Object )
log. Printf ( "Pod Deleted from Store: %s " , myObj. GetName ())
informer. Run (stopCh)
1.2 资源Informer
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy // PodInformer provides access to a shared informer and lister for
// Pods.
type PodInformer interface {
Informer () cache . SharedIndexInformer
Lister () v1 . PodLister
Copy nodeInformer := sharedInformers. Node (). V1beta1 (). RuntimeClasses (). Informer ()
1.3 Shared Informer共享机制
Informer也被称为Shared Informer,它是可以共享使用的。若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多的相同ListAndWatch ,太多重复的序列化和反序列化操作会导致api-server负载过重。
Shared Informer可以使同一个资源对象共享一个Reflector,这样可以节约很多资源;Shared Infor定义了一个map数据结构,通过map数据结构实现共享Informer机制。
源码路径 :k8s.io\client-go\informers\factory.go
Copy type sharedInformerFactory struct {
informers map [ reflect . Type ] cache . SharedIndexInformer
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f * sharedInformerFactory ) InformerFor (obj runtime . Object , newFunc internalinterfaces . NewInformerFunc ) cache . SharedIndexInformer {
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
f.informers[informerType] = informer
return informer
源码路径 :k8s.io\client-go\informers\factory.go
Copy // InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f * sharedInformerFactory ) InformerFor (obj runtime . Object , newFunc internalinterfaces . NewInformerFunc ) cache . SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
resyncPeriod, exists := f.customResync[informerType]
if ! exists {
resyncPeriod = f.defaultResync
informer = newFunc (f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
源码路径 :k8s.io\client-go\informers\factory.go
Copy // Start initializes all requested informers.
func (f * sharedInformerFactory ) Start (stopCh <-chan struct {}) {
f.lock. Lock ()
defer f.lock. Unlock ()
for informerType, informer := range f.informers {
if ! f.startedInformers[informerType] {
// 创建goroutine运行
go informer. Run (stopCh)
// 运行后将该类型的informer运行状态设置为true
f.startedInformers[informerType] = true
2. Reflector
Informer对kubernetes api sever的资源(内置及CRD)执行监控(Watch)操作,最核心的功能是Reflector。Reflector用于监控指定的kubernetes资源,当监控的资源发生变化时,触发相应的变更事件,并将其资源对象存放在本地缓存DeltaFIFO中。
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector (lw ListerWatcher , expectedType interface {}, store Store , resyncPeriod time . Duration ) * Reflector {
return NewNamedReflector (naming. GetNameFromCallsite (internalPackages ... ), lw, expectedType, store, resyncPeriod)
2.1 获取资源列表数据
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r * Reflector ) ListAndWatch (stopCh <-chan struct {}) error {
if err := func () error {
go func () {
pager := pager. New (pager. SimplePageFunc ( func (opts metav1 . ListOptions ) ( runtime . Object , error ) {
return r.listerWatcher. List (opts)
resourceVersion = listMetaInterface. GetResourceVersion ()
initTrace. Step ( "Resource version extracted" )
items, err := meta. ExtractList (list)
if err := r. syncWith (items, resourceVersion); err != nil {
return fmt. Errorf ( "unable to sync list result: %v " , err)
initTrace. Step ( "SyncWith done" )
r. setLastSyncResourceVersion (resourceVersion)
initTrace. Step ( "Resource version updated" )
return nil
}(); err != nil {
return err
源码路径 :k8s.io\client-go\tools\cache\listwatch.go
Copy // List a set of apiserver resources
func (lw * ListWatch ) List (options metav1 . ListOptions ) ( runtime . Object , error ) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
return lw. ListFunc (options)
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func NewFilteredPodInformer (client kubernetes . Interface , namespace string , resyncPeriod time . Duration , indexers cache . Indexers , tweakListOptions internalinterfaces . TweakListOptionsFunc ) cache . SharedIndexInformer {
return cache. NewSharedIndexInformer (
& cache . ListWatch {
ListFunc: func (options metav1 . ListOptions ) ( runtime . Object , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
return client. CoreV1 (). Pods (namespace). List (context. TODO (), options)
& corev1 . Pod {},
2.2 监控资源对象
Watch(监控)操作通过HTTP协议与kubernetes api server建立长连接,接受api server发来的资源变更事件,Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Encoding)。当client-go调用api-server时,apiserver在Response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便于服务端进行连接,并等待下一个数据块(即资源的事件信息)
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy func (r * Reflector ) ListAndWatch (stopCh <-chan struct {}) error {
options = metav1 . ListOptions {
ResourceVersion: resourceVersion,
TimeoutSeconds: & timeoutSeconds,
AllowWatchBookmarks: true ,
w, err := r.listerWatcher. Watch (options)
if err := r. watchHandler (start, w, & resourceVersion, resyncerrc, stopCh); err != nil {
return nil
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy ......
WatchFunc: func (options metav1 . ListOptions ) ( watch . Interface , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
return client. CoreV1 (). Pods (namespace). Watch (context. TODO (), options)
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // watchHandler watches w and keeps *resourceVersion up to date.
func (r * Reflector ) watchHandler (start time . Time , w watch . Interface , resourceVersion * string , errc chan error , stopCh <-chan struct {}) error {
for {
select {
case event, ok := <- w. ResultChan ():
switch event.Type {
case watch.Added:
err := r.store. Add (event.Object)
case watch.Modified:
err := r.store. Update (event.Object)
case watch.Deleted:
err := r.store. Delete (event.Object)
default :
* resourceVersion = newResourceVersion
r. setLastSyncResourceVersion (newResourceVersion)
eventCount ++
3. DeltaFIFO
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy type DeltaFIFO struct {
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
items map [ string ] Deltas
queue [] string
type Deltas [] Delta
3.1 生产者方法
代码,最终调用了f.queueActionLocked(Added, obj)
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy // Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f * DeltaFIFO ) Add (obj interface {}) error {
f.lock. Lock ()
defer f.lock. Unlock ()
f.populated = true
return f. queueActionLocked (Added, obj)
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f * DeltaFIFO ) queueActionLocked (actionType DeltaType , obj interface {}) error {
id, err := f. KeyOf (obj)
if err != nil {
return KeyError {obj, err}
// 将actionType和资源对象构造成Delta,添加到istems中
newDeltas := append (f.items[id], Delta {actionType, obj})
newDeltas = dedupDeltas (newDeltas)
if len (newDeltas) > 0 {
if _, exists := f.items[id]; ! exists {
f.queue = append (f.queue, id)
f.items[id] = newDeltas
f.cond. Broadcast ()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete (f.items, id)
return nil
3.2 消费者方法
源码路径 :k8s.io\client-go\tools\cache\delta_fifo.go
Copy func (f * DeltaFIFO ) Pop (process PopProcessFunc ) ( interface {}, error ) {
f.lock. Lock ()
defer f.lock. Unlock ()
for {
for len (f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil , ErrFIFOClosed
f.cond. Wait ()
id := f.queue[ 0 ]
f.queue = f.queue[ 1 :]
if f.initialPopulationCount > 0 {
f.initialPopulationCount --
item, ok := f.items[id]
if ! ok {
// Item may have been deleted subsequently.
delete (f.items, id)
err := process (item)
if e, ok := err.( ErrRequeue ); ok {
f. addIfNotPresent (id, item)
err = e.Err
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func (s * sharedIndexInformer ) HandleDeltas (obj interface {}) error {
s.blockDeltas. Lock ()
defer s.blockDeltas. Unlock ()
// from oldest to newest
for _, d := range obj.( Deltas ) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector. AddObject (d.Object)
if old, exists, err := s.indexer. Get (d.Object); err == nil && exists {
if err := s.indexer. Update (d.Object); err != nil {
return err
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta. Accessor (d.Object); err == nil {
if oldAccessor, err := meta. Accessor (old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor. GetResourceVersion () == oldAccessor. GetResourceVersion ()
s.processor. distribute ( updateNotification {oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer. Add (d.Object); err != nil {
return err
s.processor. distribute ( addNotification {newObj: d.Object}, false )
case Deleted:
if err := s.indexer. Delete (d.Object); err != nil {
return err
s.processor. distribute ( deleteNotification {oldObj: d.Object}, false )
return nil
3.3 Resync机制
Copy **源码路径**:k8s.io\client-go\tools\cache\reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
for {
select {
case <-resyncCh:
case <-stopCh:
case <-cancelCh:
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
resyncCh, cleanup = r.resyncChan()
Copy **源码路径**:k8s.io\client-go\tools\cache\delta_fifo.go
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
id, err := f.KeyOf(obj)
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
return nil
通过调用f.queueActionLocked(Sync, obj)
4. Indexer
ThreadSafeMap是实现并发安全的存储,Indexer在ThreadSafeMap的基础上进行封装,它继承了与ThreadSafeMap相关的操作方法并实现了Indexer Func等功能,例如Index、IndexKeys、GetIndexers等方法,这些方法为ThreadSafeMap提供了索引功能。
4.1 ThreadSafeMap并发安全存储
源码路径 :k8s.io\client-go\tools\cache\thread_safe_store.go
Copy // threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync . RWMutex
items map [ string ] interface {}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
4.2 Indexer索引器
Copy //UserIndexFunc 是一个索引器函数
func UserIndexFunc (obj interface {}) ([] string , error ) {
pod := obj.( * corev1 . Pod )
userString := pod.Annotations[ "users" ]
return strings. Split (userString, "," ), nil
func TestIndexer (t * testing . T ) {
index := cache. NewIndexer (cache.MetaNamespaceKeyFunc, cache . Indexers { "byUser" : UserIndexFunc})
pod1 := & corev1 . Pod {ObjectMeta: metav1 . ObjectMeta {Name: "one" , Annotations: map [ string ] string { "users" : "ernie,bert" }}}
pod2 := & corev1 . Pod {ObjectMeta: metav1 . ObjectMeta {Name: "two" , Annotations: map [ string ] string { "users" : "bert,oscar" }}}
pod3 := & corev1 . Pod {ObjectMeta: metav1 . ObjectMeta {Name: "tre" , Annotations: map [ string ] string { "users" : "ernie,elmo" }}}
index. Add (pod1)
index. Add (pod2)
index. Add (pod3)
erniePods, err := index. ByIndex ( "byUser" , "ernie" )
if err != nil {
panic (err)
for _, erniePod := range erniePods {
fmt. Println (erniePod.( * corev1 . Pod ).Name)
源码路径 :k8s.io\client-go\tools\cache\index.go
Copy // Indexers maps a name to a IndexFunc
type Indexers map [ string ] IndexFunc
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func (obj interface {}) ([] string , error )
// Indices maps a name to an Index
type Indices map [ string ] Index
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map [ string ] sets . String
4.3 Indexer索引器核心实现
源码路径 :k8s.io\client-go\tools\cache\thread_safe_store.go
Copy // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c * threadSafeMap ) ByIndex (indexName, indexedValue string ) ([] interface {}, error ) {
indexFunc := c.indexers[indexName]
index := c.indices[indexName]
set := index[indexedValue]
list := make ([] interface {}, 0 , set. Len ())
for key := range set {
list = append (list, c.items[key])
return list, nil
5. 总结
5.1 创建工厂SharedInformerFactory
首先需要通过informers.NewSharedInformerFactory(clientSet, time.Minute)
源码路径 :k8s.io\client-go\informers\factory.go
Copy // NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions (client kubernetes . Interface , defaultResync time . Duration , options ... SharedInformerOption ) SharedInformerFactory {
factory := & sharedInformerFactory {
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make ( map [ reflect . Type ] cache . SharedIndexInformer ),
startedInformers: make ( map [ reflect . Type ] bool ),
customResync: make ( map [ reflect . Type ] time . Duration ),
// Apply all options
for _, opt := range options {
factory = opt (factory)
return factory
5.2 通过工厂创建具体资源的Informer
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func (f * podInformer ) Informer () cache . SharedIndexInformer {
return f.factory. InformerFor ( & corev1 . Pod {}, f.defaultInformer)
func (f * podInformer ) defaultInformer (client kubernetes . Interface , resyncPeriod time . Duration ) cache . SharedIndexInformer {
return NewFilteredPodInformer (client, f.namespace, resyncPeriod, cache . Indexers {cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
函数调用了f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
源码路径 :k8s.io\client-go\informers\factory.go
Copy // InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f * sharedInformerFactory ) InformerFor (obj runtime . Object , newFunc internalinterfaces . NewInformerFunc ) cache . SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
resyncPeriod, exists := f.customResync[informerType]
if ! exists {
resyncPeriod = f.defaultResync
informer = newFunc (f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
源码路径 :k8s.io\client-go\informers\core\v1\pod.go
Copy func NewFilteredPodInformer (client kubernetes . Interface , namespace string , resyncPeriod time . Duration , indexers cache . Indexers , tweakListOptions internalinterfaces . TweakListOptionsFunc ) cache . SharedIndexInformer {
return cache. NewSharedIndexInformer (
& cache . ListWatch {
ListFunc: func (options metav1 . ListOptions ) ( runtime . Object , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
return client. CoreV1 (). Pods (namespace). List (context. TODO (), options)
WatchFunc: func (options metav1 . ListOptions ) ( watch . Interface , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
return client. CoreV1 (). Pods (namespace). Watch (context. TODO (), options)
& corev1 . Pod {},
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func NewSharedIndexInformer (lw ListerWatcher , exampleObject runtime . Object , defaultEventHandlerResyncPeriod time . Duration , indexers Indexers ) SharedIndexInformer {
realClock := & clock . RealClock {}
sharedIndexInformer := & sharedIndexInformer {
processor: & sharedProcessor {clock: realClock},
indexer: NewIndexer (DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector (fmt. Sprintf ( " %T " , exampleObject)),
clock: realClock,
return sharedIndexInformer
5.3 执行informer.Run
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy // `*controller` implements Controller
type controller struct {
config Config
reflector * Reflector
reflectorMutex sync . RWMutex
clock clock . Clock
源码路径 :k8s.io\client-go\tools\cache\shared_informer.go
Copy func (s * sharedIndexInformer ) Run (stopCh <-chan struct {}) {
defer utilruntime. HandleCrash ()
fifo := NewDeltaFIFOWithOptions ( DeltaFIFOOptions {
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true ,
cfg := & Config {
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false ,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
func () {
s.startedLock. Lock ()
defer s.startedLock. Unlock ()
// 在这里进行cotroller的初始化操作
s.controller = New (cfg)
s.controller.( * controller ).clock = s.clock
s.started = true
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct {})
var wg wait . Group
defer wg. Wait () // Wait for Processor to stop
defer close (processorStopCh) // Tell Processor to stop
wg. StartWithChannel (processorStopCh, s.cacheMutationDetector.Run)
wg. StartWithChannel (processorStopCh, s.processor.run)
defer func () {
s.startedLock. Lock ()
defer s.startedLock. Unlock ()
s.stopped = true // Don't want any new listeners
s.controller. Run (stopCh)
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy func (c * controller ) Run (stopCh <-chan struct {}) {
defer utilruntime. HandleCrash ()
go func () {
<- stopCh
c.config.Queue. Close ()
r := NewReflector (
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
c.reflectorMutex. Lock ()
c.reflector = r
c.reflectorMutex. Unlock ()
var wg wait . Group
defer wg. Wait ()
wg. StartWithChannel (stopCh, r.Run)
wait. Until (c.processLoop, time.Second, stopCh)
源码路径 :k8s.io\client-go\tools\cache\reflector.go
Copy // Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r * Reflector ) Run (stopCh <-chan struct {}) {
klog. V ( 2 ). Infof ( "Starting reflector %s ( %s ) from %s " , r.expectedTypeName, r.resyncPeriod, r.name)
wait. BackoffUntil ( func () {
if err := r. ListAndWatch (stopCh); err != nil {
r. watchErrorHandler (r, err)
}, r.backoffManager, true , stopCh)
klog. V ( 2 ). Infof ( "Stopping reflector %s ( %s ) from %s " , r.expectedTypeName, r.resyncPeriod, r.name)
源码路径 :k8s.io\client-go\tools\cache\controller.go
Copy func (c * controller ) processLoop () {
for {
obj, err := c.config.Queue. Pop ( PopProcessFunc (c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue. AddIfNotPresent (obj)