controller-runtime 实现分区 WorkQueue
目前统一在 client-go 中实现了三个 Work Queue。包括通用队列、限速队列、延时队列。源码统一都在 staging/src/k8s.io/client-go 包下面。
我们最近遇到一个需求,即希望能并发协调资源,又希望一组相关的资源在一个协程里面处理;这就有点类似于 Kafka 中的分区队列;既可以并发又可以保证一组资源在同一个线程中串行处理。
我的案例是因为我需要计算节点维度的资源,但是资源都是通过Pod触发的,所以我需要保证同一个 Node 的资源在一个协程内进行计算,否则并发会较难处理。
设计
我通过自定义了一个 Shard Queue,来实现类似的逻辑,每个 Pod 在进入 Queue 之前通过计算一个 Hash 值来决定需要放入那个子队列。
核心结构定义如下,
- 主要是定义了一个限速队列 shards 数组,通过计算出的 Hash 值对并发数进行求余得出具体应该在那个队列。
- HashFn 就是通过请求参数 reconcile.Request 和 分区数量进行计算,得出一个 hash 值。
- CurrentShard 是用来记录分配协程到分片ID的最新信息。
- GoroutineSharding 是用来记录 goroutineID 和分片 ID的映射关系。
type CacheKey[T reconcile.Request] func(T reconcile.Request, shardCount int) int
type TypedShardedQueueConfig[T reconcile.Request] struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// shards is queue arrau
shards []workqueue.TypedRateLimitingInterface[reconcile.Request]
// CacheKey is get key hash value
HashFn CacheKey[reconcile.Request]
// CurrentShard is record current goroutineID assign shard
CurrentShard int
// GoroutineSharding is record goroutineID map to shard index
GoroutineSharding map[int64]int
//
shardCount int
// mu is lock
mu sync.RWMutex
}
我们先看之前的限速队列是如何实现并发的,在之前只有一个 Work Queue,然后如果设置了并发,会启动多个协程,安全的从 Work Queue 中取出元素,所以协程 Get 的时候是没办法和分区队列对应上。
HashFn
这是用户需要实现的一个函数,不同的资源可能计算 Hash 的逻辑不一样,我这里以 Pod 为例。核心是在函数内部需要使用 Client 进行 Get 获取完整的资源。
func (r *CustomController) Hash(value reconcile.Request, shardCount int) int {
var pod corev1.Pod
if err := r.Client.Get(context.Background(), client.ObjectKey{Namespace: value.Namespace, Name: value.Name}, &pod); err != nil {
return 0
}
if pod.Spec.NodeName == "" {
return 0
}
h := fnv.New32a()
h.Write([]byte(pod.Spec.NodeName))
if shardCount > 0 {
return int(h.Sum32() % uint32(shardCount))
}
return 0
}
那我们为什么可以在这里执行 Get,并获取到完整的 Object 呢,核心原理就在 client-go 的流程里面;我们可以看到第5步就把对象放入到 Store 中了,在第7步中才是进行 Work Queue 操作;所以在这里已经可以获取到对象了。
Get 方法
解决办法就是第一个协程调用 Get 的时候,看 GoroutineSharding 结构中是否保存了当前 GoroutineID 对应的 Shard Index,如果没有则存储,如果有则直接使用。
这样就可以实现把队友协程绑定到队友的分片上了。
func (sq *TypedShardedQueueConfig[T]) Get() (T, bool) {
goroutineID := GetGoroutineID()
if goroutineID == -1 {
panic("goroutine ID is invelida")
}
var zero T
sq.mu.RLock()
if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
sq.mu.RUnlock()
if item, shutdown := sq.shards[shads].Get(); !shutdown {
return T(item), shutdown
}
return zero, true
}
sq.mu.RUnlock()
sq.mu.Lock()
sq.GoroutineSharding[goroutineID] = sq.CurrentShard
sq.CurrentShard += 1
sq.mu.Unlock()
sq.mu.RLock()
if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
sq.mu.RUnlock()
if item, shutdown := sq.shards[shads].Get(); !shutdown {
return T(item), shutdown
}
return zero, true
}
return zero, true
}
Add
添加逻辑比较简单,只需要通过 HashFn 计 算出队友的分区 index,然后把元素 item 放入到对应到子 Work queue 中即可。
func (sq *TypedShardedQueueConfig[T]) Add(item T) {
hashKey := sq.HashFn(reconcile.Request(item), sq.shardCount)
klog.V(5).InfoS("Add", "hashKey", hashKey)
sq.shards[hashKey].Add(reconcile.Request(item))
}
其它的方法都比较简单,这里就不一一介绍了,完整的代码可以查看 GitHub:undefined