Kubelet 源码分析 volumeManager
VolumeManager
运行一组异步循环,根据该节点上调度的 pod
确定需要 attached/mounted/unmounted/detached
哪些卷,并执行此操作。
下面我们来分析 VolumeManager
定义的接口,
type VolumeManager interface {
// Starts the volume manager and all the asynchronous loops that it controls
// 启动这个volume manager和异步循环控制器
Run(ctx context.Context, sourcesReady config.SourcesReady)
// WaitForAttachAndMount 处理指定 pod 中引用的卷并阻塞,直到它们全部连接并挂载(反映在实际状态中)。
// 如果未在 podAttachAndMountTimeout 定义的持续时间内连接并挂载所有卷,则会返回错误。
WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error
// WaitForUnmount 处理指定 pod 中引用的卷并阻塞,直到它们全部卸载(反映在实际状态中)。
// 如果未在 podAttachAndMountTimeout 定义的持续时间内卸载所有卷,则会返回错误。
WaitForUnmount(ctx context.Context, pod *v1.Pod) error
// GetMountedVolumesForPod 返回一个 VolumeMap,其中包含指定 pod 引用的已成功附加和挂载的卷。
// 映射中的键是 OuterVolumeSpecName(即 pod.Spec.Volumes[x].Name)。如 果 pod 没有卷,则返回一个空的 VolumeMap。
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetPossiblyMountedVolumesForPod 返回一个 VolumeMap,其中包含指定 pod 引用的卷,
// 这些卷要么已成功连接并挂载,要么“不确定”,即卷插件可能正在挂载它们。映射中的键是 OuterVolumeSpecName(即 pod.Spec.Volumes[x].Name)。
// 如果 pod 没有卷,则返回一个空的 VolumeMap。
GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetExtraSupplementalGroupsForPod 返回 Pod 的额外补充组列表。
// 这些额外的补充组来自 Pod 所依赖的持久卷上的注释。
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
// GetVolumesInUse 返回所有实现 volume.Attacher 接口的卷的列表,这些卷当前根据缓存的实际状态和期望状态处于使用状态。
// 卷一旦添加到期望的状态,即被视为“正在使用”,表明它*应该*连接到此节点并保持“正在使用”状态,直到它从期望的状态和实际的状态中移除,
// 或者它已被卸载(如实际的世界状态所示)。
GetVolumesInUse() []v1.UniqueVolumeName
// 仅当 kubelet 启动后协调器中的实际状态至少同步一次后,ReconcilerStatesHasBeenSynced 才会返回 true,
// 以便可以安全地更新从实际状态检索到的已挂载卷列表。
ReconcilerStatesHasBeenSynced() bool
// 如果给定的卷已附加到此节点,则 VolumeIsAttached 返回 true。
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
// 将指定卷标记为已在节点的卷状态中成功报告为“正在使用”。
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
启动 VolumeManager
是在 Kubelet.go
的 Run
方法中:
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go kl.volumeManager.Run(ctx, kl.sourcesReady)
}
WaitForAttachAndMount
是在 Pod
同步的时候会调用,如果当前同步的 Pod
没有 Attach
和 Mount
成功,则会进行返回继续让 podWorkerLoop
进行执行。
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
}
WaitForUnmount
是在 Pod
已终止时进行调用,需要等待 Unmount
. 这里会阻塞进行等待,直到成功或者错误,但是在 SyncTerminatedPod
并没有处理返回错误的类型,也就意味着即使 Unmount
失败了也会执行 Pod
删除。
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
...
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
}
下面我们来分析下 volumeManager
结构体的定义,和具体的 VolumeManager
的实现
type volumeManager struct {
// volumePluginMgr 是用于访问卷插件的卷插件管理器。它必须预先初始化。
volumePluginMgr *volume.VolumePluginMgr
// desireStateOfWorld 是一个数据结构,包含管理器所定义的期望状态:
// 即应附加哪些卷以及哪些 pod 正在引用这些卷。数据结构由 kubelet pod 管理器使用的状态填充器填充。
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld 是一个数据结构,包含管理器所定义的实际状态:
// 即哪些卷连接到此节点以及这些卷挂载到哪些 pod。协调器触发的挂载、分离、挂载和卸载操作成功完成后,将填充该数据结构。
actualStateOfWorld cache.ActualStateOfWorld
// operationExecutor 用于启动异步附加、分离、挂载和卸载操作。
operationExecutor operationexecutor.OperationExecutor
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
reconciler reconciler.Reconciler
// desireStateOfWorldPopulator 运行一个异步周期循环,使用 kubelet PodManager 填充 desireStateOfWorld。
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// csiMigratedPluginManager 跟踪插件的 CSI 迁移状态
csiMigratedPluginManager csimigration.PluginManager
// intreeToCSITranslator 将树内卷规范转换为 CSI
intreeToCSITranslator csimigration.InTreeToCSITranslator
}
在调用 volumeManager.run
之后会启动三个协程,分别是 List-Watch
资源 CSIDrivers
, 一个是启动一个 dswp
的协程,去更新 desiredStateOfWorld
和 actualStateOfWorld
, 一个是协调协 程,它会根据当前 Pod
期望的 Volume
状态和实际状态进行处理,决定是需要 Mount
还是 UnMount
.
volumePluginMgr
需要通过注册的 Volume
插件进行初始化,默认注册的插件如下:
func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
allPlugins := []volume.VolumePlugin{}
allPlugins = append(allPlugins, emptydir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
if featureGate.Enabled(features.ImageVolume) {
allPlugins = append(allPlugins, image.ProbeVolumePlugins()...)
}
return allPlugins, nil
}
每个插件都需要实现 VolumePlugin
接口中的方法,特别是 Init
方法需要在初始化 VolumePluginMgr
时候调用 InitPlugins
循环的把所有注册的插件都初始化。
type VolumePlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any New* calls are made - implementations of plugins may
// depend on this.
Init(host VolumeHost) error
...
// NewMounter creates a new volume.Mounter from an API specification.
// Ownership of the spec pointer in *not* transferred.
// - spec: The v1.Volume spec
// - pod: The enclosing pod
NewMounter(spec *Spec, podRef *v1.Pod) (Mounter, error)
// NewUnmounter creates a new volume.Unmounter from recoverable state.
// - name: The volume name, as per the v1.Volume spec.
// - podUID: The UID of the enclosing pod
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
...
}
总结:
volumeManager
通过 actualStateOfWorld
和 desiredStateOfWorld
来表明当前的 volume
挂载状态和期望的 volume
挂载状态。然后由 desiredStateOfWorldPopulator
维护 desireedStateOfWorld
和 podManager
的一致性;
由 reconcile
维护 actualStateOfWorld
和 desiredStateOfWorld
的一致性及磁盘 volume
挂载和 actualStateOfWorld
的 一致性。通过这些机制,volumeManager
完成了 volume
挂载生命周期的管理。