Skip to main content

Kubelet源码分析 Configmap/Secret volume

· 12 min read
Softwore Developer

Pod 经常会出现将 Configmap/Secret 挂载到 Pod 中,今天我们来分析 Kubelet 是如何实现这个能力,特别是在 Configmap/Secret 内容发送改变、被删除等操作的时候如何影响 Pod 内挂载好的文件。

ConfigMap的Manager

Configmap 管理这块代码实现主要在 pkg/kubelet/configmap 目录下,提供了一个统一的管理接口 Manager ,下面我们来看下它定义的方法:

type Manager interface {
// Get configmap by configmap namespace and name.
GetConfigMap(namespace, name string) (*v1.ConfigMap, error)

// RegisterPod registers all configmaps from a given pod.
RegisterPod(pod *v1.Pod)

// UnregisterPod unregisters configmaps from a given pod that are not
// used by any other registered pod.
UnregisterPod(pod *v1.Pod)
}

这个 Manager 接口会根据配置的 ConfigMapAndSecretChangeDetectionStrategy 类型来决定使用那种方式管理,它提供了三种,分别是:

  • GET : 不会缓存,直接从 apiserver 出获取。
  • Cache : 会通过 apiserver 直接获取对象,并通过 TTL 缓存起来。
  • Watch: 会通过 list-watch 的方式监视资源的变更,并通过 TTL 缓存起来。

下图描述了对这三个方法调用的上层函数:

Secret 管理这块代码实现主要在 pkg/kubelet/secret 目录下,提供了一个统一的管理接口 Manager ,下面我们来看下它定义的方法:

它的定义和使用的原理都差不多。

下面我们来分析它是如何保存到宿主机上的,如何自动更新到容器中的;

首先我们要清楚容器里面的文件不是 kubelet 挂载到容器内部,是由容器运行时如 Containerd 这些组件在启动容器的时候 Mount 到容器内部的。

Kubelet 这边能做到就是修改 runtime spec 中的内容, 就是描述容器运行时启动时的 json 文件。

所以热更新这里面涉及到两个东西,一个是把修改到 ETCD 中的 Configmap 或者 Secret 文件更新到容器所在的节点;第二个就是把更新到节点上的文件更新到容器里面。

热更新到Node

这块主要是 VolumeManagerConfigMap Manager 协调处理的结果,但是最终实现更新还是由 SyncPod 触发的。

![]https://firebasestorage.googleapis.com/v0/b/notiondiagram.appspot.com/o/diagram%2Fm5smUNzLG7NDZmSPY82qhFvWnsu1%2Fcf250449-1337-455b-9459-55962081d92f.svg?alt=media&token=e5434e4c-3173-42b0-90ec-2e91600e4796

从这个流程上可以看出即使 ConfigMap 更新了也不会马上更新到宿主机上,主要还是受 SyncPod 的周期影响,每执行一次 SyncPod 就会让 reconciler 有一次更新的机会。

WaitForAttachAndMount 中主要就是把 processedPods 中的一个处理标志改为 false . MapKeyPod UID.

func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
...
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
}

func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed(
podName volumetypes.UniquePodName) {
dswp.pods.Lock()
dswp.pods.processedPods[podName] = false
dswp.pods.Unlock()
}

reconciler.Run 中定时进行协调,代码中配置的是 100ms 且不可修改,它会不断遍历 desiredStateOfWorldactualStateOfWorld ,然后执行一系列的 attach/detachmount(remount)/unmount等操作。

func (rc *reconciler) Run(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcile, 100ms, stopCh)
}

func (rc *reconciler) reconcile() {
...
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
}

接下来,我们挂载所需的卷。如果 kubelet 负责 Attach 卷,则此函数还可以触发 Attach 。如果在使用过程中调整了底层 PVC 的大小,则此函数还会处理卷大小调整。

VolumesToMount 会被 DesiredStateOfWorldPopulator.Run 方法改变,这个方法执行过程会被 processedPods 影响,它主要找出 desiredStateOfWorld 期望处理的 Volume 和实际 actualStateOfWorld 状态的 Volume

之后调用 PodExistsInVolume 函数,根据返回的 err 来决定后续流程,如果是更新 Mounte 流程就进入 mountAttachedVolumes 方法中。

func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
volumeToMount.DevicePath = devicePath
if cache.IsSELinuxMountMismatchError(err) {
// The volume is mounted, but with an unexpected SELinux context.
// It will get unmounted in unmountVolumes / unmountDetachDevices and
// then removed from actualStateOfWorld.
rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
continue
} else if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}

这里面创建了一个 NestedPendingOperations 执行器,它会启动一个协程调用 generatedOperations.Run 方法来处理 Mount.

func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error {

opExists, previousOpIndex := grm.isOperationExists(opKey)
if opExists {
//已经存在操作,
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
key: opKey,
operationPending: true,
operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
// 启动协程
go func() (eventErr, detailedErr error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(opKey, &detailedErr)
return generatedOperations.Run()
}()

return nil
}

GeneratedOperations 的结构体中定义了三个函数, OperationFunc 这是操作调用函数, EventRecorderFunc 事件记录函数, CompleteFunc 完成函数。

type GeneratedOperations struct {
// Name of operation - could be used for resetting shared exponential backoff
OperationName string
OperationFunc func() (context OperationContext)
EventRecorderFunc func(*error)
CompleteFunc func(CompleteFuncParam)
}
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
var context OperationContext
if o.CompleteFunc != nil {
c := CompleteFuncParam{
Err: &context.DetailedErr,
Migrated: &context.Migrated,
}
defer o.CompleteFunc(c)
}
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer runtime.RecoverFromPanic(&detailedErr)

context = o.OperationFunc()
return context.EventErr, context.DetailedErr
}

下面我们看下这三个函数是怎么定义的:

  • eventRecorderFunc : 主要就是把 Mount 的错误信息发送到 Pod 的事件记录中去。
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error())
}
}
  • CompleteFunc: 主要就是记录一些 Metric
func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) {
requestTime := time.Now()
opComplete := func(c types.CompleteFuncParam) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
status := statusSuccess
if *c.Err != nil {
// TODO: Establish well-known error codes to be able to distinguish
// user configuration errors from system errors.
status = statusFailUnknown
}
migrated := false
if c.Migrated != nil {
migrated = *c.Migrated
}
StorageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken)
}
return opComplete
}
  • OperationFunc : 这个函数里面逻辑很复杂,下面会删除很多不重要的代码,核心的就是调用 VolumeSetUp 方法。
mountVolumeFunc := func() volumetypes.OperationContext {
// Get mounter plugin
volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
...
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod)
...
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
FsUser: util.FsUserFrom(volumeToMount.Pod),
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
SELinuxLabel: volumeToMount.SELinuxLabel,
})
...
return volumetypes.NewOperationContext(nil, nil, migrated)
}

SetUp这个方法每个存储类型都有实现,这里我们重点看下 Configmap 是如何实现的;

在讨论实现之前我们先分析一下它存储在宿主机上什么位置?代码主要由下面的函数实现,它主要是由各种值组合而成,最后得到的存储路径是:

/var/lib/kubelet/pods/<pod的UUI>/volumes/kubernetes.io~configmap/<卷名>

func (sv *configMapVolume) GetPath() string {
return sv.plugin.host.GetPodVolumeDir(sv.podUID, utilstrings.EscapeQualifiedName(configMapPluginName), sv.volName)
}

最后我们再来看 SetUpAt 的实现

func (b *configMapVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)


configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name)
...
totalBytes := totalBytes(configMap)
...
payload, err := MakePayload(b.source.Items, configMap, b.source.DefaultMode, optional)
if err != nil {
return err
}

setupSuccess := false
defer func() {
if !setupSuccess {
// setup 失败就清理目录
unmounter, unmountCreateErr := b.plugin.NewUnmounter(b.volName, b.podUID)
tearDownErr := unmounter.TearDown()

}
}()

writerContext := fmt.Sprintf("pod %v/%v volume %v", b.pod.Namespace, b.pod.Name, b.volName)
// 这里创建了一个原子 Writer 对象,可以保证修改宿主机上的configmap文件是原子的。
writer, err := volumeutil.NewAtomicWriter(dir, writerContext)
setPerms := func(_ string) error {
// This may be the first time writing and new files get created outside the timestamp subdirectory:
// change the permissions on the whole volume and not only in the timestamp directory.
return volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
}
err = writer.Write(payload, setPerms)
setupSuccess = true
return nil
}

AtomicWriter 的实现代码路径 atomic_writer.go , 这里面保证了原子更新 Configmap ; 这里不再赘述具体实现,详细可以看代码注释;这里简单总结一下;

  1. /var/lib/kubelet/pods/<pod的UUI>/volumes/kubernetes.io~configmap/<卷名> 目录下看到的文件是软链到 ..data/ 目录下的真实文件;
  2. Write 在写的时候会写入到一个 ..data_tmp 目录下;
  3. 然后执行 os.Rename 操作,把 ..data_tmp 重命名为 ..data ; 因为这个动作是原子的,所以能保证 MountConfigmap 的文件是原子更新的。

只要更新到宿主机的 mount 目录下之后,容器内部就会自动获取最新的文件内容。

热更新到容器

把宿主机上的文件热更新到容器内部这是容器运行时具备的能力,这里先不深入赘述;下面我们以一个例子来说明这一功能。

按上图来看我们启动一个容器,把宿主机上的 /tmp/test.json 文件挂载到容器的 /config/test.json 文件上;

然后我们通过修改宿主机上的文件可以观察到容器内部的文件自动发送了改变。

  • /tmp/test.json
$ cat /tmp/test.json
{
"name":"test"
}
  • 启动容器
$ nerdctl run -it --mount type=bind,source=/tmp/test.json,target=/config/test.json docker.io/library/ubuntu:22.04 /bin/bash
$ cat /config/test.json
{
"name":"test"
}
  • 修改宿主机上的文件
$ vim /tmp/test.json
{
"name":"abc"
}
  • 再次查看容器中文件
$ cat /config/test.json
{
"name":"abc"
}

上面只是以 ConfigMap 为例讲了原理,但是 Secret 也是一样的实现;

ConfigMap 在Env使用

ConfigMapSecret 除了可以在 Volume 中使用,还可以在 Env 中使用,当时区别就是在 Env 中使用不能实现热更新。

它在生成运行时的 Option 的时候就会读取 ConfigMap 的值替换到 Env 变量中去,具体的看 makeEnvironmentVariables 这个方法就清楚了。