Kubelet源码分析:PodAdmit
之前讲过 Kubelet
结构体中的 syncLoopIteration
方法接收多个 channel
数据源的信息,并调用相应的 handler
处理,其中 configCh
是 kubelet
获取 pod
的数据源,比如通过 informer
从 api
中拉取到一个新的 Pod
,对应的 handler
为 HandlePodAdditions
.
configCh
的数据来源有三个,其中一个就是 api
,通过配置 spec.nodeName FieldSelector
的 ListOptions
来向 api
获取所有 ns
下指定 nodeName
的 pod
.
HandlePodAdditions
接下来通过 canAdmitPod
方法判断该 Po
d是否可以被 kubelet
创建,其通过 kubelet
对象的 admitHandlers
获取注册好的 handler
对象,并逐一调用这些对象的 Admit
方法检查 pod
.
// pods为kubelet中 正在运行的 && 已经被admit确认过没问题的 && 不是terminated的 pod
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
// 如果开启原地更新,还需要把 pod 的资源进行用新数据更新一下。
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Use allocated resources values from checkpoint store (source of truth) to determine fit
otherPods := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
op := p.DeepCopy()
kl.updateContainerResourceAllocation(op)
otherPods = append(otherPods, op)
}
attrs.OtherPods = otherPods
}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
return false, result.Reason, result.Message
}
}
return true, "", ""
}
而 kubelet
对象的 admitHandlers
是在如下 NewMainKubelet
方法中注册,总计注册了六个 Admit
.
- Eviction Admit
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
- System Allowlist Admit
klet.admitHandlers.AddPodAdmitHandler(sysctlsAllowlist)
- Allocate Resources Admit
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
- Predicate Admit
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
- AppArmor Admit
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
- Shutdown Admit
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
Admit 接口
Admit
评估一个 pod
是否可以被接纳。
type PodAdmitHandler interface {
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
Admit
的参数结构如下:
type PodAdmitAttributes struct {
// 当前正则评估的 Pod
Pod *v1.Pod
// 除去评估之外 kubelet 接收的所有 Pod
OtherPods []*v1.Pod
}
Admit
的返回值如下:
// PodAdmitResult provides the result of a pod admission decision.
type PodAdmitResult struct {
// 如果true,则评估通过
Admit bool
// 简短地说明为什么该 Pod 不能被接纳.
Reason string
// 一条简短消息,解释为什么该 Pod 无法进入
Message string
}
Eviction Admit
如果为了节点稳定性而不允许进入,则 Admit
会拒绝该 pod
。
func (m *managerImpl) Admit(ctx context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
m.RLock()
defer m.RUnlock()
// node 没有 condition 就直接通过,可以不用判定 Readay 的 Condition 吗?
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
// 即使在资源压力下也要接纳关键 pod,因为它们是系统稳定性所必需的。
// 判定pod是否是static pod、高优先级pod,需要 >= 2000000000
if kubelettypes.IsCriticalPod(attrs.Pod) {
return lifecycle.PodAdmitResult{Admit: true}
}
// 除内存压力之外的其他条件会拒绝所有 Pod
nodeOnlyHasMemoryPressureCondition := hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure) && len(m.nodeConditions) == 1
if nodeOnlyHasMemoryPressureCondition {
notBestEffort := v1.PodQOSBestEffort != v1qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
// When node has memory pressure, check BestEffort Pod's toleration:
// admit it if tolerates memory pressure taint, fail for other tolerations, e.g. DiskPressure.
if corev1helpers.TolerationsTolerateTaint(attrs.Pod.Spec.Tolerations, &v1.Taint{
Key: v1.TaintNodeMemoryPressure,
Effect: v1.TaintEffectNoSchedule,
}) {
return lifecycle.PodAdmitResult{Admit: true}
}
}
return lifecycle.PodAdmitResult{
Admit: false,
Reason: Reason,
Message: fmt.Sprintf(nodeConditionMessageFmt, m.nodeConditions),
}
}
System Allowlist Admit
主要是看用户写的 Sysctls
是否是准入的操作:
func (w *patternAllowlist) Admit(_ context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod
// 如果没有设置SecurityContext 或者 SecurityContext.Sysctls 字段,则直接通过
if pod.Spec.SecurityContext == nil || len(pod.Spec.SecurityContext.Sysctls) == 0 {
return lifecycle.PodAdmitResult{
Admit: true,
}
}
for _, s := range pod.Spec.SecurityContext.Sysctls {
if err := w.validateSysctl(s.Name, pod.Spec.HostNetwork, pod.Spec.HostIPC); err != nil {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: ForbiddenReason,
Message: fmt.Sprintf("forbidden sysctl: %v", err),
}
}
}
return lifecycle.PodAdmitResult{
Admit: true,
}
}
设置的固定 Sysctls
的 Key
, 或者也可以通过配置的 prefix
来进行判定,比如: net.ipv6.neigh.*
这种。
var safeSysctls = []sysctl{
{
name: "kernel.shm_rmid_forced",
}, {
name: "net.ipv4.ip_local_port_range",
}, {
name: "net.ipv4.tcp_syncookies",
}, {
name: "net.ipv4.ping_group_range",
}, {
name: "net.ipv4.ip_unprivileged_port_start",
}, {
name: "net.ipv4.ip_local_reserved_ports",
kernel: utilkernel.IPLocalReservedPortsNamespacedKernelVersion,
}, {
name: "net.ipv4.tcp_keepalive_time",
kernel: utilkernel.TCPKeepAliveTimeNamespacedKernelVersion,
}, {
name: "net.ipv4.tcp_fin_timeout",
kernel: utilkernel.TCPFinTimeoutNamespacedKernelVersion,
},
{
name: "net.ipv4.tcp_keepalive_intvl",
kernel: utilkernel.TCPKeepAliveIntervalNamespacedKernelVersion,
},
{
name: "net.ipv4.tcp_keepalive_probes",
kernel: utilkernel.TCPKeepAliveProbesNamespacedKernelVersion,
},
}
Allocate Resources Admit
主要进行资源分配准入,比如设备插件、cpu、memory等,核心实现在 topologyManager
中:
- 添加设备插件
Allocate
准入校验
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager, tp)
cm.topologyManager.AddHintProvider(cm.deviceManager)
- 添加
CPU
准入校验
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.CPUManagerPolicy,
nodeConfig.CPUManagerPolicyOptions,
nodeConfig.CPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
cm.topologyManager.AddHintProvider(cm.cpuManager)
- 添加
Memory
准入校验
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
cm.topologyManager.AddHintProvider(cm.memoryManager)
因为 topologyManager
有三种 topology
策略配置, 分别是 none
pod
container
. 此处这里就不展开讲,我们默认按照 none
的维度来讨论。
每个容器(initContainer 和 containers )每个都需要进行资源准入校验,每个容器都需要最少进行 cpu
和 memory
校验,如果还涉及到 device-plugin
的话也需要校验。
func (s *scope) allocateAlignedResources(ctx context.Context, pod *v1.Pod, container *v1.Container) error {
for _, provider := range s.hintProviders {
err := provider.Allocate(ctx, pod, container)
if err != nil {
return err
}
}
return nil
}
这里涉及到的具体每个资源分配校验就不再进行展开,核心就是看用户请求的资源能不能分配成功,如果分配失败就会拒绝;
得出一个结论 Pod
的资源分配是在 kubelet
接收到之后进入 syncLoopIteration
之后就会同步进行分配,其次在 syncPod
的处理逻辑中会有一次补偿机制。
Predicate Admit
预选 Admit
,主要再次判定调度器预选的数据是否满足。
func (w *predicateAdmitHandler) Admit(_ context.Context, attrs *PodAdmitAttributes) PodAdmitResult {
node, err := w.getNodeAnyWayFunc()
if err != nil {
klog.ErrorS(err, "Cannot get Node info")
return PodAdmitResult{
Admit: false,
Reason: "InvalidNodeInfo",
Message: "Kubelet cannot get node info.",
}
}
admitPod := attrs.Pod
// node select 相关的这些label,进行判定是否满足,如果不满足,就进行拒绝。
if rejectPodAdmissionBasedOnOSSelector(admitPod, node)
// 判定 pod.Spec.OS.Name 这个字段是否满足要求
if rejectPodAdmissionBasedOnOSField(admitPod)
pods := attrs.OtherPods
nodeInfo := schedulerframework.NewNodeInfo(pods...)
nodeInfo.SetNode(node)
// TODO: Remove this after the SidecarContainers feature gate graduates to GA.
if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
// 如果没有开启Sidecar容器,就检查init容器的重启策略是否设置
for _, c := range admitPod.Spec.InitContainers {
if types.IsRestartableInitContainer(&c) {
message := fmt.Sprintf("Init container %q may not have a non-default restartPolicy", c.Name)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: "InitContainerRestartPolicyForbidden",
Message: message,
}
}
}
}
// 确保节点具有足够的插件资源,以满足 Pod 所需的资源
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)
fit := len(reasons) == 0
if !fit {
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
fit = len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
var reason string
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
return PodAdmitResult{
Admit: fit,
Reason: "UnknownReason",
Message: message,
}
}
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
case *InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
default:
reason = "UnexpectedPredicateFailureType"
message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
}
return PodAdmitResult{
Admit: fit,
Reason: reason,
Message: message,
}
}
return PodAdmitResult{
Admit: true,
}
}
AppArmor Admit
AppArmor 是 Linux 内核安全模块,允许系统管理员通过每个程序的配置文件限制程序的功能。
如果是 Linux
操作系统,则需要进行准入校验
func (a *appArmorAdmitHandler) Admit(_ context.Context, attrs *PodAdmitAttributes) PodAdmitResult {
// 如果pod是 running or terminated, 则不需要判定。
if attrs.Pod.Status.Phase != v1.PodPending {
return PodAdmitResult{Admit: true}
}
err := a.Validate(attrs.Pod)
if err == nil {
return PodAdmitResult{Admit: true}
}
return PodAdmitResult{
Admit: false,
Reason: "AppArmor",
Message: fmt.Sprintf("Cannot enforce AppArmor: %v", err),
}
}
Shutdown Admit
节点停机管理,如果 node
已经 shutdown
则拒绝所有的 Pod
.
func (m *managerImpl) Admit(ctx context.Context, attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
nodeShuttingDown := m.ShutdownStatus() != nil
if nodeShuttingDown {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: nodeShutdownNotAdmittedReason,
Message: nodeShutdownNotAdmittedMessage,
}
}
return lifecycle.PodAdmitResult{Admit: true}
}