Skip to main content

Kubelet源码分析:PodAdmit

· 11 min read
Softwore Developer

之前讲过 Kubelet 结构体中的 syncLoopIteration 方法接收多个 channel 数据源的信息,并调用相应的 handler 处理,其中 configChkubelet 获取 pod 的数据源,比如通过 informerapi 中拉取到一个新的 Pod ,对应的 handlerHandlePodAdditions .

configCh 的数据来源有三个,其中一个就是 api ,通过配置 spec.nodeName FieldSelectorListOptions 来向 api 获取所有 ns 下指定 nodeNamepod.

HandlePodAdditions 接下来通过 canAdmitPod 方法判断该 Po d是否可以被 kubelet 创建,其通过 kubelet 对象的 admitHandlers 获取注册好的 handler 对象,并逐一调用这些对象的 Admit方法检查 pod .

// pods为kubelet中 正在运行的 && 已经被admit确认过没问题的 && 不是terminated的 pod
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
// 如果开启原地更新,还需要把 pod 的资源进行用新数据更新一下。
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
op := p.DeepCopy()
kl.updateContainerResourceAllocation(op)

otherPods = append(otherPods, op)
}
attrs.OtherPods = otherPods
}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {

klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)

return false, result.Reason, result.Message
}
}

return true, "", ""
}

image.png

kubelet 对象的 admitHandlers 是在如下 NewMainKubelet 方法中注册,总计注册了六个 Admit .

  • Eviction Admit
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  • System Allowlist Admit
klet.admitHandlers.AddPodAdmitHandler(sysctlsAllowlist)
  • Allocate Resources Admit
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
  • Predicate Admit
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
  • AppArmor Admit
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
  • Shutdown Admit
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)

Admit 接口

Admit 评估一个 pod 是否可以被接纳。

type PodAdmitHandler interface {
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}

Admit 的参数结构如下:

type PodAdmitAttributes struct {
// 当前正则评估的 Pod
Pod *v1.Pod
// 除去评估之外 kubelet 接收的所有 Pod
OtherPods []*v1.Pod
}

Admit 的返回值如下:

// PodAdmitResult provides the result of a pod admission decision.
type PodAdmitResult struct {
// 如果true,则评估通过
Admit bool
// 简短地说明为什么该 Pod 不能被接纳.
Reason string
// 一条简短消息,解释为什么该 Pod 无法进入
Message string
}

Eviction Admit

如果为了节点稳定性而不允许进入,则 Admit 会拒绝该 pod

func (m *managerImpl) Admit(ctx context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
m.RLock()
defer m.RUnlock()
// node 没有 condition 就直接通过,可以不用判定 Readay 的 Condition 吗?
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
// 即使在资源压力下也要接纳关键 pod,因为它们是系统稳定性所必需的。
// 判定pod是否是static pod、高优先级pod,需要 >= 2000000000
if kubelettypes.IsCriticalPod(attrs.Pod) {
return lifecycle.PodAdmitResult{Admit: true}
}

// 除内存压力之外的其他条件会拒绝所有 Pod
nodeOnlyHasMemoryPressureCondition := hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
if nodeOnlyHasMemoryPressureCondition {
notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}

// When node has memory pressure, check BestEffort Pod's toleration:
// admit it if tolerates memory pressure taint, fail for other tolerations, e.g. DiskPressure.
if corev1helpers.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule,
}) {
return lifecycle.PodAdmitResult{Admit: true}
}
}

return lifecycle.PodAdmitResult{
Admit: false,
Reason: Reason,
Message: fmt.Sprintf(nodeConditionMessageFmt, m.nodeConditions),
}
}

System Allowlist Admit

主要是看用户写的 Sysctls 是否是准入的操作:

func (w *patternAllowlist) Admit(_ context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
// 如果没有设置SecurityContext 或者 SecurityContext.Sysctls 字段,则直接通过
if pod.Spec.SecurityContext == nil || len(pod.Spec.SecurityContext.Sysctls) == 0 {
return lifecycle.PodAdmitResult{
Admit: true,
}
}

for _, s := range pod.Spec.SecurityContext.Sysctls {
if err := w.validateSysctl(s.Name, pod.Spec.HostNetwork, pod.Spec.HostIPC); err != nil {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: ForbiddenReason,
Message: fmt.Sprintf("forbidden sysctl: %v", err),
}
}
}

return lifecycle.PodAdmitResult{
Admit: true,
}
}

设置的固定 SysctlsKey, 或者也可以通过配置的 prefix 来进行判定,比如: net.ipv6.neigh.* 这种。

var safeSysctls = []sysctl{
{
name: "kernel.shm_rmid_forced",
}, {
name: "net.ipv4.ip_local_port_range",
}, {
name: "net.ipv4.tcp_syncookies",
}, {
name: "net.ipv4.ping_group_range",
}, {
name: "net.ipv4.ip_unprivileged_port_start",
}, {
name: "net.ipv4.ip_local_reserved_ports",
kernel: utilkernel.IPLocalReservedPortsNamespacedKernelVersion,
}, {
name: "net.ipv4.tcp_keepalive_time",
kernel: utilkernel.TCPKeepAliveTimeNamespacedKernelVersion,
}, {
name: "net.ipv4.tcp_fin_timeout",
kernel: utilkernel.TCPFinTimeoutNamespacedKernelVersion,
},
{
name: "net.ipv4.tcp_keepalive_intvl",
kernel: utilkernel.TCPKeepAliveIntervalNamespacedKernelVersion,
},
{
name: "net.ipv4.tcp_keepalive_probes",
kernel: utilkernel.TCPKeepAliveProbesNamespacedKernelVersion,
},
}

Allocate Resources Admit

主要进行资源分配准入,比如设备插件、cpu、memory等,核心实现在 topologyManager 中:

  • 添加设备插件 Allocate 准入校验
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager, tp)
cm.topologyManager.AddHintProvider(cm.deviceManager)
  • 添加 CPU 准入校验
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
cm.topologyManager.AddHintProvider(cm.cpuManager)
  • 添加 Memory 准入校验
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
cm.topologyManager.AddHintProvider(cm.memoryManager)

因为 topologyManager 有三种 topology 策略配置, 分别是 none pod container . 此处这里就不展开讲,我们默认按照 none 的维度来讨论。

每个容器(initContainer 和 containers )每个都需要进行资源准入校验,每个容器都需要最少进行 cpumemory 校验,如果还涉及到 device-plugin 的话也需要校验。

func (s *scope) allocateAlignedResources(ctx context.Context, pod *v1.Pod, container *v1.Container) error {
for _, provider := range s.hintProviders {
err := provider.Allocate(ctx, pod, container)
if err != nil {
return err
}
}
return nil
}

这里涉及到的具体每个资源分配校验就不再进行展开,核心就是看用户请求的资源能不能分配成功,如果分配失败就会拒绝;

得出一个结论 Pod 的资源分配是在 kubelet 接收到之后进入 syncLoopIteration 之后就会同步进行分配,其次在 syncPod 的处理逻辑中会有一次补偿机制。

Predicate Admit

预选 Admit ,主要再次判定调度器预选的数据是否满足。

func (w *predicateAdmitHandler) Admit(_ context.Context, attrs *PodAdmitAttributes) PodAdmitResult {
node, err := w.getNodeAnyWayFunc()
if err != nil {
klog.ErrorS(err, "Cannot get Node info")
return PodAdmitResult{
Admit: false,
Reason: "InvalidNodeInfo",
Message: "Kubelet cannot get node info.",
}
}
admitPod := attrs.Pod

// node select 相关的这些label,进行判定是否满足,如果不满足,就进行拒绝。
if rejectPodAdmissionBasedOnOSSelector(admitPod, node)
// 判定 pod.Spec.OS.Name 这个字段是否满足要求
if rejectPodAdmissionBasedOnOSField(admitPod)

pods := attrs.OtherPods
nodeInfo := schedulerframework.NewNodeInfo(pods...)
nodeInfo.SetNode(node)

// TODO: Remove this after the SidecarContainers feature gate graduates to GA.
if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
// 如果没有开启Sidecar容器,就检查init容器的重启策略是否设置
for _, c := range admitPod.Spec.InitContainers {
if types.IsRestartableInitContainer(&c) {
message := fmt.Sprintf("Init container %q may not have a non-default restartPolicy", c.Name)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: "InitContainerRestartPolicyForbidden",
Message: message,
}
}
}
}

// 确保节点具有足够的插件资源,以满足 Pod 所需的资源
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}

// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)

reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)
fit := len(reasons) == 0
if !fit {
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
fit = len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
var reason string
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
return PodAdmitResult{
Admit: fit,
Reason: "UnknownReason",
Message: message,
}
}
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
case *InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
default:
reason = "UnexpectedPredicateFailureType"
message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
}
return PodAdmitResult{
Admit: fit,
Reason: reason,
Message: message,
}
}
return PodAdmitResult{
Admit: true,
}
}

AppArmor Admit

AppArmor 是 Linux 内核安全模块,允许系统管理员通过每个程序的配置文件限制程序的功能。

如果是 Linux 操作系统,则需要进行准入校验

func (a *appArmorAdmitHandler) Admit(_ context.Context, attrs *PodAdmitAttributes) PodAdmitResult {
// 如果pod是 running or terminated, 则不需要判定。
if attrs.Pod.Status.Phase != v1.PodPending {
return PodAdmitResult{Admit: true}
}

err := a.Validate(attrs.Pod)
if err == nil {
return PodAdmitResult{Admit: true}
}
return PodAdmitResult{
Admit: false,
Reason: "AppArmor",
Message: fmt.Sprintf("Cannot enforce AppArmor: %v", err),
}
}

Shutdown Admit

节点停机管理,如果 node 已经 shutdown 则拒绝所有的 Pod.

func (m *managerImpl) Admit(ctx context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
nodeShuttingDown := m.ShutdownStatus() != nil

if nodeShuttingDown {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: nodeShutdownNotAdmittedReason,
Message: nodeShutdownNotAdmittedMessage,
}
}
return lifecycle.PodAdmitResult{Admit: true}
}