Skip to main content

kubelet源码分析 syncLoopIteration(三) probeCH

· 4 min read
Softwore Developer

在代码中是没有 probeCh 这个 chan 的,主要是由 livenessManager readinessManager startupManager 这三个探针的 chan 构成的。

它们的核心实现在 probeManager prober.Manager 上。

probch.png

probeCh 是分别代表三个 chan ,分别是 livenessCh readinessCh startupCh ,在图中进行了统一,但是 code 中是三个通道。

ProbeManager

Manager会为每个指定了探测的容器创建一个探测“worker”(AddPod方法)。这些worker会定期对其分配的容器进行探测,并将结果缓存起来。当需要更新Pod的状态(PodStatus)时,Manager会利用这些缓存的探测结果来设置Pod的Ready状态(UpdatePodStatus方法)。需要注意的是,目前不支持更新探测参数。

Manager 会为每个指定来探测的容器都创建一个 worker, 这些 worker 会定期对其分配的容器进行探测,并缓存结果,更新 pod 状态时会利用这些参数来更新。

type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *v1.Pod)

// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)

// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *v1.Pod)

// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[types.UID]sets.Empty)

// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(*v1.Pod, *v1.PodStatus)
}

AddPod 方法中实现了遍历所有 container ,然后挣对配置的不同探针,启动 work,每个探针使用一个协程运行。

func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()

key := probeKey{podUID: pod.UID}
for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
key.containerName = c.Name

if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok {

return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}

if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {

return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}

if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}

往下就是根据配置的探针类型调用探测逻辑:

func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
switch {
case p.Exec != nil:
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))

case p.HTTPGet != nil:
req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
return pb.http.Probe(req, timeout)

case p.TCPSocket != nil:
port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
return pb.tcp.Probe(host, port, timeout)

case p.GRPC != nil:
host := status.PodIP
return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
default:
klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
}

livenessManager、readinessManager、startupManager

这三个探针的 result 都是从 prober.Manager 中写入的,只是每个探针有一个 chan , syncLoopIteration 接收到就是执行 handler.HandlePodSyncs 操作。

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
}
return true
}