Kubelet源码分析 Configmap/Secret volume
在 Pod
经常会出现将 Configmap/Secret
挂载到 Pod
中,今天我们来分析 Kubelet
是如何实现这个能力,特别是在 Configmap/Secret
内容发送改变、被删除等操作的时候如何影响 Pod
内挂载好的文件。
ConfigMap的Manager
Configmap
管理这块代码实现主要在 pkg/kubelet/configmap
目录下,提供了一个统一的管理接口 Manager
,下面我们来看下它定义的方法:
type Manager interface {
// Get configmap by configmap namespace and name.
GetConfigMap(namespace, name string) (*v1.ConfigMap, error)
// RegisterPod registers all configmaps from a given pod.
RegisterPod(pod *v1.Pod)
// UnregisterPod unregisters configmaps from a given pod that are not
// used by any other registered pod.
UnregisterPod(pod *v1.Pod)
}
这个 Manager
接口会根据配置的 ConfigMapAndSecretChangeDetectionStrategy
类型来决定使用那种方式管理,它提供了三种,分别是:
GET
: 不会缓存,直接从apiserver
出获取。Cache
: 会通过apiserver
直接获取对象,并通过TTL
缓存起来。Watch
: 会通过list-watch
的方式监视资源的变更,并通过TTL
缓存起来。
下图描述了对这三个方法调用的上层函数:
Secret
管理这块代码实现主要在 pkg/kubelet/secret
目录下,提供了一个统一的管理接口 Manager
,下面我们来看下它定义的方法:
它的定义和使用的原理都差不多。
下面我们来分析它是如何保存到宿主机上的,如何自动更新到容器中的;
首先我们要清楚容器里面的文件不是 kubelet
挂载到容器内部,是由容器运行时如 Containerd
这些组件在启动容器的时候 Mount
到容器内部的。
Kubelet
这边能做到就是修改 runtime spec
中的内容, 就是描述容器运行时启动时的 json
文件。
所以热更新这里面涉及到两个东西,一个是把修改到 ETCD
中的 Configmap
或者 Secret
文件更新到容器所在的节点;第二个就是把更新到节点上的文件更新到容器里面。
热更新到Node
这块主要是 VolumeManager
和 ConfigMap Manager
协调处理的结果,但是最终实现更新还是由 SyncPod
触发的。
从这个流程上可以看出即使 ConfigMap
更新了也不会马上更新到宿主机上,主要还是受 SyncPod
的周期影响,每执行一次 SyncPod
就会让 reconciler
有一次更新的机会。
在 WaitForAttachAndMount
中主要就是把 processedPods
中的一个处理标志改为 false
. Map
的 Key
是 Pod UID
.
func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
...
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
}
func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed(
podName volumetypes.UniquePodName) {
dswp.pods.Lock()
dswp.pods.processedPods[podName] = false
dswp.pods.Unlock()
}
在 reconciler.Run
中定时进行协调,代码中配置的是 100ms
且不可修改,它会不断遍历 desiredStateOfWorld
和 actualStateOfWorld
,然后执行一系列的 attach/detach
和 mount(remount)/unmount
等操作。
func (rc *reconciler) Run(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcile, 100ms, stopCh)
}
func (rc *reconciler) reconcile() {
...
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
}
接下来,我们挂载所需的卷。如果 kubelet
负责 Attach
卷,则此函数还可以触发 Attach
。如果在使用过程中调整了底层 PVC
的大小,则此函数还会处理卷大小调整。
VolumesToMount
会被 DesiredStateOfWorldPopulator.Run
方法改变,这个方法执行过程会被 processedPods
影响,它主要找出 desiredStateOfWorld
期望处理的 Volume
和实际 actualStateOfWorld
状态的 Volume
。
之后调用 PodExistsInVolume
函数,根据返回的 err
来决定后续流程,如果是更新 Mounte
流程就进入 mountAttachedVolumes
方法中。
func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
volumeToMount.DevicePath = devicePath
if cache.IsSELinuxMountMismatchError(err) {
// The volume is mounted, but with an unexpected SELinux context.
// It will get unmounted in unmountVolumes / unmountDetachDevices and
// then removed from actualStateOfWorld.
rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
continue
} else if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
这里面创建了一个 NestedPendingOperations
执行器,它会启动一个协程调用 generatedOperations.Run
方法来处理 Mount
.
func (grm *nestedPendingOperations) Run(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName,
nodeName types.NodeName,
generatedOperations volumetypes.GeneratedOperations) error {
opExists, previousOpIndex := grm.isOperationExists(opKey)
if opExists {
//已经存在操作,
} else {
// Create a new operation
grm.operations = append(grm.operations,
operation{
key: opKey,
operationPending: true,
operationName: generatedOperations.OperationName,
expBackoff: exponentialbackoff.ExponentialBackoff{},
})
}
// 启动协程
go func() (eventErr, detailedErr error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(opKey, &detailedErr)
return generatedOperations.Run()
}()
return nil
}
在 GeneratedOperations
的结构体中定义了三个函数, OperationFunc
这是操作调用函数, EventRecorderFunc
事件记录函数, CompleteFunc
完成函数。
type GeneratedOperations struct {
// Name of operation - could be used for resetting shared exponential backoff
OperationName string
OperationFunc func() (context OperationContext)
EventRecorderFunc func(*error)
CompleteFunc func(CompleteFuncParam)
}
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
var context OperationContext
if o.CompleteFunc != nil {
c := CompleteFuncParam{
Err: &context.DetailedErr,
Migrated: &context.Migrated,
}
defer o.CompleteFunc(c)
}
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer runtime.RecoverFromPanic(&detailedErr)
context = o.OperationFunc()
return context.EventErr, context.DetailedErr
}
下面我们看下这三个函数是怎么定义的:
eventRecorderFunc
: 主要就是把Mount
的错误信息发送到Pod
的事件记录中去。
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error())
}
}
CompleteFunc
: 主要就是记录一些Metric
func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) {
requestTime := time.Now()
opComplete := func(c types.CompleteFuncParam) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
status := statusSuccess
if *c.Err != nil {
// TODO: Establish well-known error codes to be able to distinguish
// user configuration errors from system errors.
status = statusFailUnknown
}
migrated := false
if c.Migrated != nil {
migrated = *c.Migrated
}
StorageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken)
}
return opComplete
}
OperationFunc
: 这个函数里面逻辑很复杂,下面会删除很多不重要的代码,核心的就是调用Volume
的SetUp
方法。
mountVolumeFunc := func() volumetypes.OperationContext {
// Get mounter plugin
volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
...
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod)
...
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
FsUser: util.FsUserFrom(volumeToMount.Pod),
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
SELinuxLabel: volumeToMount.SELinuxLabel,
})
...
return volumetypes.NewOperationContext(nil, nil, migrated)
}
SetUp
这个方法每个存储类型都有实现,这里我们重点看下 Configmap
是如何实现的;
在讨论实现之前我们先分析一下它存储在宿主机上什么位置?代码主要由下面的函数实现,它主要是由各种值组合而成,最后得到的存储路径是:
/var/lib/kubelet/pods/<pod的UUI>/volumes/kubernetes.io~configmap/<卷名>
func (sv *configMapVolume) GetPath() string {
return sv.plugin.host.GetPodVolumeDir(sv.podUID, utilstrings.EscapeQualifiedName(configMapPluginName), sv.volName)
}
最后我们再来看 SetUpAt
的实现
func (b *configMapVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
klog.V(3).Infof("Setting up volume %v for pod %v at %v", b.volName, b.pod.UID, dir)
configMap, err := b.getConfigMap(b.pod.Namespace, b.source.Name)
...
totalBytes := totalBytes(configMap)
...
payload, err := MakePayload(b.source.Items, configMap, b.source.DefaultMode, optional)
if err != nil {
return err
}
setupSuccess := false
defer func() {
if !setupSuccess {
// setup 失败就清理目录
unmounter, unmountCreateErr := b.plugin.NewUnmounter(b.volName, b.podUID)
tearDownErr := unmounter.TearDown()
}
}()
writerContext := fmt.Sprintf("pod %v/%v volume %v", b.pod.Namespace, b.pod.Name, b.volName)
// 这里创建了一个原子 Writer 对象,可以保证修改宿主机上的configmap文件是原子的。
writer, err := volumeutil.NewAtomicWriter(dir, writerContext)
setPerms := func(_ string) error {
// This may be the first time writing and new files get created outside the timestamp subdirectory:
// change the permissions on the whole volume and not only in the timestamp directory.
return volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, nil /*fsGroupChangePolicy*/, volumeutil.FSGroupCompleteHook(b.plugin, nil))
}
err = writer.Write(payload, setPerms)
setupSuccess = true
return nil
}
AtomicWriter
的实现代码路径 atomic_writer.go
, 这里面保证了原子更新 Configmap
; 这里不再赘述具体实现,详细可以看代码注释;这里简单总结一下;
- 在
/var/lib/kubelet/pods/<pod的UUI>/volumes/kubernetes.io~configmap/<卷名>
目录下看到的文件是软链到..data/
目录下的真实文件; Write
在写的时候会写入到一个..data_tmp
目录下;- 然后执行
os.Rename
操作,把..data_tmp
重命名为..data
; 因为这个动作是原子的,所以能保证Mount
的Configmap
的文件是原子更新的。
只要更新到宿主机的 mount
目录下之后,容器 内部就会自动获取最新的文件内容。
热更新到容器
把宿主机上的文件热更新到容器内部这是容器运行时具备的能力,这里先不深入赘述;下面我们以一个例子来说明这一功能。
按上图来看我们启动一个容器,把宿主机上的 /tmp/test.json
文件挂载到容器的 /config/test.json
文件上;
然后我们通过修改宿主机上的文件可以观察到容器内部的文件自动发送了改变。
/tmp/test.json
$ cat /tmp/test.json
{
"name":"test"
}
- 启动容器
$ nerdctl run -it --mount type=bind,source=/tmp/test.json,target=/config/test.json docker.io/library/ubuntu:22.04 /bin/bash
$ cat /config/test.json
{
"name":"test"
}
- 修改宿主机上的文件
$ vim /tmp/test.json
{
"name":"abc"
}
- 再次查看容器中文件
$ cat /config/test.json
{
"name":"abc"
}
上面只是以 ConfigMap
为例讲了原理,但是 Secret
也是一样的实现;
ConfigMap 在Env使用
ConfigMap
和 Secret
除了可以在 Volume
中使用,还可以在 Env
中使用,当时区别就是在 Env
中使用不能实现热更新。
它在生成运行时的 Option
的时候就会读取 ConfigMap
的值替换到 Env
变量中去,具体的看 makeEnvironmentVariables
这个方法就清楚了。