kubelet源码分析 syncLoopIteration(三) probeCH
· 4 min read
在代码中是没有 probeCh
这个 chan
的,主要是由 livenessManager
readinessManager
startupManager
这三个探针的 chan
构成的。
它们的核心实现在 probeManager prober.Manager
上。
probeCh
是分别代表三个 chan
,分别是 livenessCh
readinessCh
startupCh
,在图中进行了统一,但是 code
中是三个通道。
ProbeManager
Manager会为每个指定了探测的容器创建一个探测“worker”(AddPod方法)。这些worker会定期对其分配的容器进行探测,并将结果缓存起来。当需要更新Pod的状态(PodStatus)时,Manager会利用这些缓存的探测结果来设置Pod的Ready状态(UpdatePodStatus方法)。需要注意的是,目前不支持更新探测参数。
Manager
会为每个指定来探测的容器都创建一个 worker
, 这些 worker
会定期对其分配的容器进行探测,并缓存结果,更新 pod
状态时会利用这些参数来更新。
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *v1.Pod)
// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *v1.Pod)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[types.UID]sets.Empty)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(*v1.Pod, *v1.PodStatus)
}
在 AddPod
方法中实现了遍历所有 container
,然后挣对配置的不同探针,启动 work
,每个探针使用一个协程运行。
func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
key.containerName = c.Name
if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok {
return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
往下就是根据配置的探针类型调用探测逻辑:
func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
switch {
case p.Exec != nil:
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
case p.HTTPGet != nil:
req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
return pb.http.Probe(req, timeout)
case p.TCPSocket != nil:
port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
return pb.tcp.Probe(host, port, timeout)
case p.GRPC != nil:
host := status.PodIP
return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
default:
klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
}
livenessManager、readinessManager、startupManager
这三个探针的 result
都是从 prober.Manager
中写入的,只是每个探针有一个 chan
, syncLoopIteration
接收到就是执行 handler.HandlePodSyncs
操作。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
}
return true
}