Skip to main content

kubelet源码分析 podManager和podWorkers、workQueue

· 7 min read
Softwore Developer

Kubelet 源码分析工作原理中介绍过, Kubelet 中有两个核心的数据结构字段, podManagerpodWorkers, 现在我们来一探究竟。

podManagerpodWorkers 里面保存的 Pod 内容可能不一样,比如:

  • 强制删除 Pod 时,会把 podManager 中的记录删除掉,但是**podWorkers** 中的 Pod 会执行正常的删除流程;此时就出现 podManager 不存在**,podWorkers** 存在的问题**。**
  • 新创建的 Pod 会先保存到 podManager 中,启动时再保存到 podWorkers 中。

它们要做的就是无限趋近于一样。

MirrorPod

PodManager 中存在 podByFullNamemirrorPodByFullName 的两个字段,分别保存 Pod 信息和 MirrorPod 的。

	podByFullName       map[string]*v1.Pod
mirrorPodByFullName map[string]*v1.Pod

pod 来源分为 file http apiserver 三个 source , 其中 apiserver 中的 pod 称为普通pod,其他来源的 pod 称为 static pod, 为了管理 pod 方便, kubelet 会在 apiserver 上为static pod 生成对应的 pod,这类型 pod 称为 mirror pod .

即可以理解是这个 pod 的替身, 基本上跟 static pod一模一样,只有 uid 不一样和 annotations 里多了 kubernetes.io/config.mirror .

mirror pod 是保存在 apiserver 中的 static pod 的一个镜像,但是操作 mirror podstatic pod 的影响是不一样的。

image.png

如果要删除 static pod ,比如是从文件创建的,则需要删除文件才能删除这个 static podmirror pod .

static pod移除流程:

  1. podConfig 感知到 static 文件的移除,触发 SyncLoop REMOVE
  2. podWoker 执行 syncTerminatingPod(执行停止容器和 sandbox
  3. PLEG 感知到 sanboxcontainer 的退出
  4. podWoker 执行 syncTerminatedPod(移除 cgroup,更新 mirror podstatus,等待pod 的 volume umount 完成)
  5. 感知到 mirror podstatus 更新
  6. housekeeping 触发,执行清理工作( podWorker 的移除 mirror pod 的删除 pod volume 目录移除)
  7. 感知到 mirror pod 的删除和从 apiserver上移除
  8. garbageCollector 执行 sandbox 和容器的清理,并移除 pod 日志目录

image.png

podManager

podManager 主要是存储应该在 kubelet 上运行的 Pod 的集合,它包括 admitted podsmirror pods .

podManager 的数据由 kubelet 写入,主要是接收不同来源 API URL Static Fiel ,最后写入到 podUpdateCh 这个 chan.

我们把 Manager 这个接口的方法按功能划分

  • GetPodByFullName(podFullName string) (*v1.Pod, bool)
    GetPodByName(namespace, name string) (*v1.Pod, bool)
    GetPodByUID(types.UID) (*v1.Pod, bool)
    GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
    GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
    GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool)
    GetPods() []*v1.Pod
    GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)
    TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
    GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
  • SetPods(pods []*v1.Pod) #已经未使用了,不分析
    AddPod(pod *v1.Pod)
    UpdatePod(pod *v1.Pod)
    RemovePod(pod *v1.Pod)

我们主要来分析下写的这几个操作流程:

a

它从 PodUpdate 接收到操作之后会按照 PodOperation 类型来区分操作方法,目前使用了五种操作方法,最终对应到 podManager 上有三个操作方法。

主执行上述四种方法的时候会执行 podWorkersUpdatePod 方法,就是调用不同的 UpdatePodOptions 操作类型,分别对应四种 Option , 其中需要根据这个 Pod 是否是 MirrorPod 来决定它的操作类型。

a

podWorkers

podWorkers 简单说就是通过控制 Pod 的生命周期来更新 Pod 的真实状态,每个 Pod 会有一个协程执行 podWorkerLoop

我们来分析下 PodWorkers 的方法:

type PodWorkers interface {
// 通过 podworker pod 发送改变,然后pod会按照FIFO的顺序进入自己的协程处理,podUID是协程的key,
// 然后进入 syncPod 方法会直到被kubelet驱逐或者是标记deleted、Succeeded、Failed,一旦发生驱逐或者delete
// syncTerminatingPod 方法将被调用,后续的UpdatePod调用都将被忽略,直到它成功,终止的pod不会被启动。
UpdatePod(options UpdatePodOptions)
}

UpdatePodOptions 结构体分析:

type UpdatePodOptions struct {
//更新类型(create, update, sync, kill).
UpdateType kubetypes.SyncPodType
// Pod to update. Required.
Pod *v1.Pod
// MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType
// is kill or terminated.
MirrorPod *v1.Pod
// RunningPod 不再存在的运行时 pod,只有执行PodCleanup的时候才会不为空
RunningPod *kubecontainer.Pod
// KillPodOptions 用于覆盖 pod 的默认终止行为或在操作完成后更新 pod 状态。
KillPodOptions *KillPodOptions
}

我们来分析 podWorkers 的结构体:

type podWorkers struct {
//
podUpdates map[types.UID]chan struct{}
// WorkQueue 允许使用时间戳对项目进行排队。如果时间戳已过期,则项目将被视为已准备好进行处理。
workQueue queue.WorkQueue

// 这个函数名字就叫podSyncer
podSyncer podSyncer

}

a

syncPod 是整个 kubelet 的核心,pod 的创建与删除都由它来处理。由于 syncPod() 涉及到太多的 kubelet 中功能包,所以等 volumeManager, podManager 等介绍完毕再详细分析。现在只需知道 syncPod() 会处理传进来的 pod,维护物理机上的 podetcd 中的 pod 的一致性。

存在一个疑问:Pod 是被夺协程并行的处理的,那针对资源分配是否会存在竞争关系。这个留到后面去分析。

workQueue

workQueu 是一个允许使用时间戳对项目进行排队的 Queue , 如果时间戳已过期,则项目将被视为已准备好进行处理。

WorkQueue 只包括两个核心的方法, GetWork() 返回的是所有已经准备好的 Worker .

type WorkQueue interface {
// GetWork dequeues and returns all ready items.
GetWork() []types.UID
// Enqueue inserts a new item or overwrites an existing item.
Enqueue(item types.UID, delay time.Duration)
}

basicWorkQueue 的内部结构是一个 Map: key 是一个 UID, value 时一个 Time.

type basicWorkQueue struct {
clock clock.Clock
lock sync.Mutex
queue map[types.UID]time.Time
}