/ Type is a work queue (see the packagecomment).typeTypestruct {// queue defines the order in which we will work on items. Every// element of queue should be in the dirty set and not in the// processing set.//用来存储元素的地方,slice结构,保证元素有序 queue []t// dirty defines all of the items that need to be processed.//dirty很重要,除了能够去重,还能保证在处理一个元素之前哪怕其被添加多次(并发),但也只会被处理一次 dirty set// Things that are currently being processed are in the processing set.// These things may be simultaneously in the dirty set. When we finish// processing something and remove it from this set, we'll check if// it's in the dirty set, and if so, add it to the queue.//用于标记机制,标记一个元素是否正在被处理 processing set cond *sync.Cond shuttingDown bool metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.Clock}// set是由map构造的typeemptystruct{}typetinterface{}typesetmap[t]empty
// Get方法// Get blocks until it can return an item to be processed. If shutdown = true,// the caller should end their goroutine. You must call Done with item when you// have finished processing it.func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock()defer q.cond.L.Unlock()//如果队列为空且未关闭,则阻塞forlen(q.queue) ==0&&!q.shuttingDown { q.cond.Wait() }iflen(q.queue) ==0 {// We must be shutting down.returnnil, true }//获取第一个元素queue首个元素,并从queue中移除 item, q.queue = q.queue[0], q.queue[1:] q.metrics.get(item)//将插入到processing字段中 q.processing.insert(item)//将item从dirty字段中移除 q.dirty.delete(item)return item, false}//==========================================================================================//Add方法// Add marks item as needing processing.func (q *Type) Add(item interface{}) { q.cond.L.Lock()defer q.cond.L.Unlock()if q.shuttingDown {return }//如果dirty中含有item,则返回,防止重复写入if q.dirty.has(item) {return } q.metrics.add(item)//将item写入到dirty字段中 q.dirty.insert(item)//如果processing字段中存在该元素,则直接返回if q.processing.has(item) {return }//否则,才将其插入到queue尾部 q.queue =append(q.queue, item)//唤醒其他线程 q.cond.Signal()}//==========================================================================================//Done方法// Done marks item as done processing, and if it has been marked as dirty again// while it was being processed, it will be re-added to the queue for// re-processing.func (q *Type) Done(item interface{}) { q.cond.L.Lock()defer q.cond.L.Unlock() q.metrics.done(item)//将处理完的item从processing中移除 q.processing.delete(item)//判断dirty字段中是否含有item,如果有则将其添加到queue尾部if q.dirty.has(item) { q.queue =append(q.queue, item) q.cond.Signal() }}
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to// requeue items after failures without ending up in a hot-loop.typeDelayingInterfaceinterface {Interface// AddAfter adds an item to the workqueue after the indicated duration has passed//AddAfter方法插入一个item,并附带duration参数,该参数用于指定元素延迟插入FIFO队列的时间,如果小于0,则直接插入AddAfter(item interface{}, duration time.Duration)}// delayingType wraps an Interface and provides delayed re-enquingtypedelayingTypestruct {Interface// clock tracks time for delayed firing clock clock.Clock// stopCh lets us signal a shutdown to the waiting loop stopCh chanstruct{}// stopOnce guarantees we only signal shutdown a single time stopOnce sync.Once// heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker// waitingForAddCh is a buffered channel that feeds waitingForAdd//主要字段,默认初始大小为1000;当插入的元素大于或等于1000时,延迟队列才会处于阻塞状态,//该字段的数据通过goroutine运行的waitingLoop函数持久运行 waitingForAddCh chan*waitFor// metrics counts the number of retries metrics retryMetrics}
typeRateLimiterinterface {// When gets an item and gets to decide how long that item should waitWhen(item interface{}) time.Duration// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing// or for success, we'll stop tracking itForget(item interface{})// NumRequeues returns back how many failures the item has hadNumRequeues(item interface{}) int}
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after thattypeItemFastSlowRateLimiterstruct { failuresLock sync.Mutex//统计元素的排队数 failures map[interface{}]int//控制从fast速率转换到slow速率 maxFastAttempts int//fast速率 fastDelay time.Duration//slow速率 slowDelay time.Duration}func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()defer r.failuresLock.Unlock()//排队数加1 r.failures[item] = r.failures[item] +1//判断排队数是否超过限制fast速率的最大值,如果超过则返回slow速率if r.failures[item] <= r.maxFastAttempts {return r.fastDelay }return r.slowDelay}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponentialfuncDefaultControllerRateLimiter() RateLimiter {returnNewMaxOfRateLimiter(NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, )}