/ Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
//用来存储元素的地方,slice结构,保证元素有序
queue []t
// dirty defines all of the items that need to be processed.
//dirty很重要,除了能够去重,还能保证在处理一个元素之前哪怕其被添加多次(并发),但也只会被处理一次
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
//用于标记机制,标记一个元素是否正在被处理
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
// set是由map构造的
type empty struct{}
type t interface{}
type set map[t]empty
// Get方法
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
//如果队列为空且未关闭,则阻塞
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
//获取第一个元素queue首个元素,并从queue中移除
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
//将插入到processing字段中
q.processing.insert(item)
//将item从dirty字段中移除
q.dirty.delete(item)
return item, false
}
//==========================================================================================
//Add方法
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
//如果dirty中含有item,则返回,防止重复写入
if q.dirty.has(item) {
return
}
q.metrics.add(item)
//将item写入到dirty字段中
q.dirty.insert(item)
//如果processing字段中存在该元素,则直接返回
if q.processing.has(item) {
return
}
//否则,才将其插入到queue尾部
q.queue = append(q.queue, item)
//唤醒其他线程
q.cond.Signal()
}
//==========================================================================================
//Done方法
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
//将处理完的item从processing中移除
q.processing.delete(item)
//判断dirty字段中是否含有item,如果有则将其添加到queue尾部
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
//AddAfter方法插入一个item,并附带duration参数,该参数用于指定元素延迟插入FIFO队列的时间,如果小于0,则直接插入
AddAfter(item interface{}, duration time.Duration)
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
//主要字段,默认初始大小为1000;当插入的元素大于或等于1000时,延迟队列才会处于阻塞状态,
//该字段的数据通过goroutine运行的waitingLoop函数持久运行
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
//如果延迟时间小于等于0,则直接插入到queue中
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
// 在当前时间增加duration时间,构造waitFor类型放入q.waitingForAddCh中
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// 延迟队列初始化函数
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
//goroutine运行waitingLoop函数
go ret.waitingLoop()
return ret
}
// =================================================================================================
//看一下waitingLoop函数核心代码
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
......
//初始化waitForPriorityQueue数组
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
//如果entry的延迟时间晚于now,说明还需延迟等待
break
}
//否则,将其从waitingForQueue中移出并插入到队列q中,同时删除map waitingEntryByData中的该元素的数据
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
//从上面循环跳出后,获取waitingForQueue中第一个元素,判断其时间
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
//这两个事件还不知道有什么作用???!!!
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
//q.waitingForAddCh有数据输出时,获取该数据,并判断该数据是否需要延迟
if waitEntry.readyAt.After(q.clock.Now()) {
//若需要延迟,则插入到waitingForQueue,及waitingEntryByData
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
//否则,插入到队列q中
q.Add(waitEntry.data)
}
// 不知用意,是否全部耗尽?否则循环等待?
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
//统计元素的排队数
failures map[interface{}]int
//控制从fast速率转换到slow速率
maxFastAttempts int
//fast速率
fastDelay time.Duration
//slow速率
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
//排队数加1
r.failures[item] = r.failures[item] + 1
//判断排队数是否超过限制fast速率的最大值,如果超过则返回slow速率
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}