Skip to main content

kubelet源码分析 syncLoopIteration(二) plegCH

· 7 min read
Softwore Developer

PLEG 全称是 Pod Lifecycle Event Generator ,它主要通过读取 containerEvent 来更新 Pod 的状态,目前提供了 GenericPLEGEventedPLEG 两种模式,不同点 GenericPLEG 是定时去读取事件,EventedPLEG 时通过主动接受事件的推送。

a

GenericPLEG

通用 PLEG 配置的是每秒中读取一次。

genericPlegRelistPeriod    = time.Second * 1

调用 Start 方法之后就会进入间隔 1s 执行一次 g.Relist 这个方法中。

func (g *GenericPLEG) Start() {
if !g.isRunning {
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}

g.Relist 方法核心执行流程:

  1. 获取当前机器上的所有 Pod, 主要通过 CRI-API 中定义的 ListPodSandboxListContainers 两个方法获取。
// Get all the pods.
podList, err := g.runtime.GetPods(ctx, true)
  1. 更新当前的 podRecord,podRecord 中两个对象,一个 old ,一个 current .
g.podRecords.setCurrent(pods)
  1. 所有的新老 pod 的容器组到一起进行遍历。进行新老的对比,如果有 event 变化,则记录.

    1. 首先根据容器状态转换为 pleg 状态
    func convertState(state kubecontainer.State) plegContainerState {
    switch state {
    case kubecontainer.ContainerStateCreated:
    // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
    return plegContainerUnknown
    case kubecontainer.ContainerStateRunning:
    return plegContainerRunning
    case kubecontainer.ContainerStateExited:
    return plegContainerExited
    case kubecontainer.ContainerStateUnknown:
    return plegContainerUnknown
    default:
    panic(fmt.Sprintf("unrecognized container state: %v", state))
    }
    }

    b. 之后根据 oldcurrent 生成的 pleg 状态生成 PLEGEvent

    func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
    switch newState {
    case plegContainerRunning:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
    case plegContainerExited:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
    case plegContainerUnknown:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
    case plegContainerNonExistent:
    switch oldState {
    case plegContainerExited:
    // We already reported that the container died before.
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
    default:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
    }
    default:
    panic(fmt.Sprintf("unrecognized container state: %v", newState))
    }
    }
  2. 会遍历所有 Pod 的事件,然后发往 eventChannel 中,之后就会被 syncLoopIteration 接收到进行处理。

for pid, events := range eventsByPodID {
...
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
}
}

EventedPLEG

事件 PLEG 会创建一个 GenericPLEG ,在接受事件失败次数达到上限之后就会退回到 GenericPLEG 类型,但是它读取一次的时间间隔是 300s.

调用 Start 方法之后就会创建两个协程一致接收数据,这里和GenericPLEG 的区别还有不需要调用 Watch 方法。

func (e *EventedPLEG) Start() {
...
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}

e.watchEventsChannel 方法核心执行流程:

  1. 首先判断是否超过最大尝试次数,默认是 e.eventedPlegMaxStreamRetries=5 次。如果超过了会回退到GenericPLEG 上。
func (e *EventedPLEG) watchEventsChannel() {
go func() {
numAttempts := 0
for {
if numAttempts >= e.eventedPlegMaxStreamRetries {
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
e.genericPleg.Start()
break
}
err := e.runtimeService.GetContainerEvents(context.Background(), containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) {
metrics.EventedPLEGConn.Inc()
})
if err != nil {
...
numAttempts++
}
}
}()
}
  1. runtimeService.GetContainerEvents 获取容器的 Events , 我们看 proto 中的定义就明白,为什么这里可以主动监听这个事件的变化,因为它的返回值是一个 steam 类型

    steamgRPC 中是一个双向流的过程,可以实现数据的双向传输。

rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}
func (r *remoteRuntimeService) GetContainerEvents(ctx context.Context, containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(ctx, &runtimeapi.GetEventsRequest{})
...
for {
resp, err := containerEventsStreamingClient.Recv()
if err == io.EOF {
r.logErr(err, "container events stream is closed")
return err
}
if err != nil {
r.logErr(err, "failed to receive streaming container event")
return err
}
if resp != nil {
containerEventsCh <- resp
r.log(4, "container event received", "resp", resp)
}
}
}
  1. 处理 Event , 它接收到 Event 之后会根据类型发送到 GenericPLEG 结构中的 eventChannel chan 中。
func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeapi.ContainerEventResponse) {
for event := range containerEventsResponseCh {
...
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
shouldSendPLEGEvent = true
} else {
if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) {
shouldSendPLEGEvent = true
}
}

if shouldSendPLEGEvent {
e.processCRIEvent(event)
}
}
}

func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) {
switch event.ContainerEventType {
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
}
}

syncLoopIteration

下面我们来看在 syncLoopIteration 中是如何处理这个 Event 的。

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}

if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
}
return true
}

核心执行流程如下:

  1. 如果不是 event.Type != pleg.ContainerRemoved 移除事件,则进入执行,如果 Pod 还在 PodManager 中,则执行 HandlePodSyncs ,否则跳过。
  2. HandlePodSyncs 中主要是往 podWorkers 中发送 UpdatePodSyncPodSync 操作。
  3. 如果是 e.Type == pleg.ContainerDied 退出事件,则清除容器。