本文代码基于client-go release-14.0进行讲解(水平有限,共同探讨,不足之处欢迎指正)
workqueue指的是client-go项目下的util/workqueue
包。该包中主要有三个队列,分别是普通队列,延时队列,限速队列,后一个队列以前一个队列的实现为基础,层层添加新功能,我们按照 Queue、DelayingQueue、RateLimitingQueue 的顺序层层拨开来看各个队列是如何实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
|
将空struct作为map的值,就可以构造一个set
- has(item t) :查询key是否存在,直接使用map的返回值进行实现
- insert(item t):往set中添加元素,将map中相同的key用struct{}填充即可
- delete(item t):删除set中的key,直接调用delete()函数删除即可
1
2
3
4
5
6
7
8
|
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
|
- Add(…):添加一个元素
- Len():元素个数
- Get():获取一个元素,第二个返回值和 channel 类似,标记队列是否关闭了
- Done(…):标记一个元素已经处理完
- ShutDown():关闭队列
- ShuttingDown():是否正在关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
|
主要字段定义:
- queue:定义元素的处理顺序,里面所有元素都应该在 dirty set 中有,而不能出现在 processing set 中
- dirty:标记所有需要被处理的元素
- processing:当前正在被处理的元素,当处理完后需要检查该元素是否在 dirty set 中,如果有则添加到 queue 里
- cond:条件锁
- shuttingDown:是否正在关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
|
Add函数,将元素入队,运行流程如下:
- 进行锁操作
- 判断队列是否关闭
- 判断该元素是否在dirty中已经存在
- 在metrics和dirty中添加该元素
- 判断该元素是否在processing中已经存在(该元素正在被处理的时候,又来了一次该元素的添加请求,所以暂时不往queue中添加)
- 在queue中添加该元素
- 唤醒阻塞的协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
|
Get函数,将元素从queue队列出队,表示这个元素,正在处理中。dirty和queue保持一致,也会删除这个元素。运行流程如下:
- 进行锁操作
- 判断queue长度是否为0,并且判断队列是否关闭
- 元素出队
- 在processing中添加该元素,表示该元素正在处理中
- dirty与queue保持一致,删除该元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
|
Done函数,表明这个元素被处理完了,从processing队列删除。运行流程如下:
- 进行锁操作
- 将该元素在processing中删除
- 判断该元素在dirty是否存在(为什么需要这个判断呢?原因在于有一种请求是 itemA 正在处理,但是还没done,这个时候又来了一次 itemA。这个时候add 逻辑中,是直接返回的,不会添加itemA到queue的。所以这里要重新添加一次)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
}
|
主要字段定义:
- Interface ::上面的通用队列
- clock:时钟,用于获取时间
- stopCh:延时就意味着异步,就要有另一个协程处理,所以需要退出信号
- stopOnce:用来确保 ShutDown() 方法只执行一次
- heartbeat:定时器,在没有任何数据操作时可以定时的唤醒处理协程
- waitingForAddCh::所有延迟添加的元素封装成waitFor放到chan中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occurring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
|
waitFor主要字段定义:
- data ::准备添加到队列中的数据
- readyAt:应该被加入队列的时间
- index:在 heap 中的索引
相关函数定义:
- Len():heap需要实现的接口,获取队列长度
- Less():heap需要实现的接口,获取第i个元素是否比第j个元素小(改变可实现大根堆)
- Swap():heap需要实现的接口,交换第i和第j个元素,并且重置自己实现得索引
- Push():heap需要实现的接口,向队列中添加数据
- Pop():heap需要实现的接口,从队列中弹出最后一个元素
- Peek():返回第一个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
|
- 延迟队列的核心就是,根据加入队列的时间,构造一个最小堆,然后再到时间点后,将其加入queue中
- 上诉判断是否到时间点,不仅仅是一个for循环,还利用了心跳,channel机制
- 当某个对象处理的时候失败了,可以利用延迟队列的思想,等一会再重试,因为马上重试肯定是失败的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
|
主要字段定义:
- DelayingInterface ::延迟队列
- AddRateLimited():添加元素到队列,根据限速器决定延迟多久再处理
- Forget():成功处理完成,或者决定不再继续重试时,告诉限速器不再追踪该元素,清除它的重试计数
- NumRequeues():重试了几次
- rateLimiter:定时器,在没有任何数据操作时可以定时的唤醒处理协程
可以看出来,限速队列和延迟队列是一模一样的。延迟队列是自己决定 某个元素延迟多久,而限速队列是由限速器决定某个元素延迟多久。
1
2
3
4
5
6
7
8
9
|
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
|
主要字段定义:
- When() ::根据元素的失败历史,返回下次重试应等待的时间
- Forget():成功处理完成,或者决定不再继续重试时,告诉限速器不再追踪该元素,清除它的重试计数
- NumRequeues():重试了几次
这个接口有五个实现,分别为:
- BucketRateLimiter 令牌桶限速器
- ItemExponentialFailureRateLimiter 指数限速器
- ItemFastSlowRateLimiter 快慢限速器
- MaxOfRateLimiter 组合限速器
- WithMaxWaitRateLimiter 最大等待限速器(已删除)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
|
这个限速器是使用golang标准库的golang.org/x/time/rate.Limiter
实现的。BucketRateLimiter 实例化的时候比如传递一个 rate.NewLimiter(rate.Limit(10), 100) 进去,表示令牌桶里最多有 100 个令牌,每秒发放 10 个令牌。因为所有元素都是一样的,来几次都是一样,所以NumRequeues,Forget都没有意义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
|
Exponential
是指数的意思,从这个限速器的名字大概能猜到是失败次数越多,限速越长而且是指数级增长(2^n)的一种限速器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
|
快慢限速器主要字段定义:
- maxFastAttempts ::快速重试的次数
- fastDelay ::快重试间隔
- slowDelay ::慢重试间隔
快慢限速器,也就是先快后慢,定义一个阈值,超过了就慢慢重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
|
组合限速器,内部放多个限速器,然后返回限速最慢的一个延时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type WithMaxWaitRateLimiter struct {
limiter RateLimiter // 其他限速器
maxDelay time.Duration // 最大延时
}
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
}
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay // 已经超过了最大延时,直接返回最大延时
}
return delay
}
|
最大等待限速器就是在其他限速器上包装一个最大延迟的属性,如果到了最大延时,则直接返回。这样就能避免延迟时间不可控,万一一个对象失败了多次,那以后的时间会越来越大。在Kubernetes client-go v0.27+(即 Kubernetes 1.27 及以后版本)中,WithMaxWaitRateLimiter已被正式删除。
- workqueue:https://github.com/kubernetes/client-go/tree/release-14.0/util/workqueue
- workqueue解析:https://github.com/zoux86/learning-k8s-source-code/blob/master/k8s/client-go/8.%20client-go%E7%9A%84workqueue%E8%AF%A6%E8%A7%A3.md