Skip to main content

Kubelet 源码分析 volumeManager

· 8 min read
Softwore Developer

VolumeManager 运行一组异步循环,根据该节点上调度的 pod 确定需要 attached/mounted/unmounted/detached 哪些卷,并执行此操作。

下面我们来分析 VolumeManager 定义的接口,

type VolumeManager interface {
// Starts the volume manager and all the asynchronous loops that it controls
// 启动这个volume manager和异步循环控制器
Run(ctx context.Context, sourcesReady config.SourcesReady)

// WaitForAttachAndMount 处理指定 pod 中引用的卷并阻塞,直到它们全部连接并挂载(反映在实际状态中)。
// 如果未在 podAttachAndMountTimeout 定义的持续时间内连接并挂载所有卷,则会返回错误。
WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error

// WaitForUnmount 处理指定 pod 中引用的卷并阻塞,直到它们全部卸载(反映在实际状态中)。
// 如果未在 podAttachAndMountTimeout 定义的持续时间内卸载所有卷,则会返回错误。
WaitForUnmount(ctx context.Context, pod *v1.Pod) error

// GetMountedVolumesForPod 返回一个 VolumeMap,其中包含指定 pod 引用的已成功附加和挂载的卷。
// 映射中的键是 OuterVolumeSpecName(即 pod.Spec.Volumes[x].Name)。如果 pod 没有卷,则返回一个空的 VolumeMap。
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap

// GetPossiblyMountedVolumesForPod 返回一个 VolumeMap,其中包含指定 pod 引用的卷,
// 这些卷要么已成功连接并挂载,要么“不确定”,即卷插件可能正在挂载它们。映射中的键是 OuterVolumeSpecName(即 pod.Spec.Volumes[x].Name)。
// 如果 pod 没有卷,则返回一个空的 VolumeMap。
GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap

// GetExtraSupplementalGroupsForPod 返回 Pod 的额外补充组列表。
// 这些额外的补充组来自 Pod 所依赖的持久卷上的注释。
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64

// GetVolumesInUse 返回所有实现 volume.Attacher 接口的卷的列表,这些卷当前根据缓存的实际状态和期望状态处于使用状态。
// 卷一旦添加到期望的状态,即被视为“正在使用”,表明它*应该*连接到此节点并保持“正在使用”状态,直到它从期望的状态和实际的状态中移除,
// 或者它已被卸载(如实际的世界状态所示)。
GetVolumesInUse() []v1.UniqueVolumeName

// 仅当 kubelet 启动后协调器中的实际状态至少同步一次后,ReconcilerStatesHasBeenSynced 才会返回 true,
// 以便可以安全地更新从实际状态检索到的已挂载卷列表。
ReconcilerStatesHasBeenSynced() bool

// 如果给定的卷已附加到此节点,则 VolumeIsAttached 返回 true。
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool

// 将指定卷标记为已在节点的卷状态中成功报告为“正在使用”。
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}

启动 VolumeManager 是在 Kubelet.goRun 方法中:

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go kl.volumeManager.Run(ctx, kl.sourcesReady)
}

WaitForAttachAndMount 是在 Pod 同步的时候会调用,如果当前同步的 Pod 没有 AttachMount 成功,则会进行返回继续让 podWorkerLoop 进行执行。

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
}

WaitForUnmount 是在 Pod 已终止时进行调用,需要等待 Unmount . 这里会阻塞进行等待,直到成功或者错误,但是在 SyncTerminatedPod 并没有处理返回错误的类型,也就意味着即使 Unmount 失败了也会执行 Pod 删除。

func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
...
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
}

下面我们来分析下 volumeManager 结构体的定义,和具体的 VolumeManager 的实现

type volumeManager struct {

// volumePluginMgr 是用于访问卷插件的卷插件管理器。它必须预先初始化。
volumePluginMgr *volume.VolumePluginMgr

// desireStateOfWorld 是一个数据结构,包含管理器所定义的期望状态:
// 即应附加哪些卷以及哪些 pod 正在引用这些卷。数据结构由 kubelet pod 管理器使用的状态填充器填充。
desiredStateOfWorld cache.DesiredStateOfWorld

// actualStateOfWorld 是一个数据结构,包含管理器所定义的实际状态:
// 即哪些卷连接到此节点以及这些卷挂载到哪些 pod。协调器触发的挂载、分离、挂载和卸载操作成功完成后,将填充该数据结构。
actualStateOfWorld cache.ActualStateOfWorld

// operationExecutor 用于启动异步附加、分离、挂载和卸载操作。
operationExecutor operationexecutor.OperationExecutor

// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
reconciler reconciler.Reconciler

// desireStateOfWorldPopulator 运行一个异步周期循环,使用 kubelet PodManager 填充 desireStateOfWorld。
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

// csiMigratedPluginManager 跟踪插件的 CSI 迁移状态
csiMigratedPluginManager csimigration.PluginManager

// intreeToCSITranslator 将树内卷规范转换为 CSI
intreeToCSITranslator csimigration.InTreeToCSITranslator
}

在调用 volumeManager.run 之后会启动三个协程,分别是 List-Watch 资源 CSIDrivers , 一个是启动一个 dswp 的协程,去更新 desiredStateOfWorldactualStateOfWorld , 一个是协调协程,它会根据当前 Pod 期望的 Volume 状态和实际状态进行处理,决定是需要 Mount 还是 UnMount.

volumePluginMgr 需要通过注册的 Volume 插件进行初始化,默认注册的插件如下:

func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
allPlugins := []volume.VolumePlugin{}
allPlugins = append(allPlugins, emptydir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
if featureGate.Enabled(features.ImageVolume) {
allPlugins = append(allPlugins, image.ProbeVolumePlugins()...)
}
return allPlugins, nil
}

每个插件都需要实现 VolumePlugin 接口中的方法,特别是 Init 方法需要在初始化 VolumePluginMgr 时候调用 InitPlugins 循环的把所有注册的插件都初始化。

type VolumePlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any New* calls are made - implementations of plugins may
// depend on this.
Init(host VolumeHost) error
...
// NewMounter creates a new volume.Mounter from an API specification.
// Ownership of the spec pointer in *not* transferred.
// - spec: The v1.Volume spec
// - pod: The enclosing pod
NewMounter(spec *Spec, podRef *v1.Pod) (Mounter, error)

// NewUnmounter creates a new volume.Unmounter from recoverable state.
// - name: The volume name, as per the v1.Volume spec.
// - podUID: The UID of the enclosing pod
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
...
}

总结:

volumeManager 通过 actualStateOfWorlddesiredStateOfWorld 来表明当前的 volume 挂载状态和期望的 volume 挂载状态。然后由 desiredStateOfWorldPopulator 维护 desireedStateOfWorldpodManager 的一致性;

reconcile 维护 actualStateOfWorlddesiredStateOfWorld 的一致性及磁盘 volume 挂载和 actualStateOfWorld 的一致性。通过这些机制,volumeManager 完成了 volume 挂载生命周期的管理。