kubelet源码分析 syncLoopIteration(二) plegCH
PLEG
全称是 Pod Lifecycle Event Generator
,它主要通过读取 container
的 Event
来更新 Pod
的状态,目前提供了 GenericPLEG
和 EventedPLEG
两种模式,不同点 GenericPLEG
是定时去读取事件,EventedPLEG
时通过主动接受事件的推送。
GenericPLEG
通用 PLEG
配置的是每秒中读取一次。
genericPlegRelistPeriod = time.Second * 1
调用 Start
方法之后就会进入间隔 1s
执行一次 g.Relist
这个方法中。
func (g *GenericPLEG) Start() {
if !g.isRunning {
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}
g.Relist
方法核心执行流程:
- 获取当前机器上的所有
Pod
, 主要通过CRI-API
中定义的ListPodSandbox
和ListContainers
两个方法获取。
// Get all the pods.
podList, err := g.runtime.GetPods(ctx, true)
- 更新当前的
podRecord
,podRecord 中两个对象,一个old
,一个current
.
g.podRecords.setCurrent(pods)
-
所有的新老
pod
的容器组到一起进行遍历。进行新老的对比,如果有event
变化,则记录.- 首先根据容器状态转换为
pleg
状态
func convertState(state kubecontainer.State) plegContainerState {
switch state {
case kubecontainer.ContainerStateCreated:
// kubelet doesn't use the "created" state yet, hence convert it to "unknown".
return plegContainerUnknown
case kubecontainer.ContainerStateRunning:
return plegContainerRunning
case kubecontainer.ContainerStateExited:
return plegContainerExited
case kubecontainer.ContainerStateUnknown:
return plegContainerUnknown
default:
panic(fmt.Sprintf("unrecognized container state: %v", state))
}
}b. 之后根据
old
和current
生成的pleg
状态生成PLEGEvent
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
switch newState {
case plegContainerRunning:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
case plegContainerExited:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
case plegContainerUnknown:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
case plegContainerNonExistent:
switch oldState {
case plegContainerExited:
// We already reported that the container died before.
return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
default:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
}
default:
panic(fmt.Sprintf("unrecognized container state: %v", newState))
}
} - 首先根据容器状态转换为
-
会遍历所有
Pod
的事件,然后发往eventChannel
中,之后就会被syncLoopIteration
接收到进行处理。
for pid, events := range eventsByPodID {
...
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
}
}
EventedPLEG
事件 PLEG
会创建一个 GenericPLEG
,在接受事件失败次数达到上限之后就会退回到 GenericPLEG
类型,但是它读取一次的时间间隔是 300s
.
调用 Start
方法之后就会创建两个协程一致接收数据,这里和GenericPLEG
的区别还有不需要调用 Watch
方法。
func (e *EventedPLEG) Start() {
...
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}
e.watchEventsChannel
方法核心执行流程 :
- 首先判断是否超过最大尝试次数,默认是
e.eventedPlegMaxStreamRetries=5
次。如果超过了会回退到GenericPLEG
上。
func (e *EventedPLEG) watchEventsChannel() {
go func() {
numAttempts := 0
for {
if numAttempts >= e.eventedPlegMaxStreamRetries {
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
e.genericPleg.Start()
break
}
err := e.runtimeService.GetContainerEvents(context.Background(), containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) {
metrics.EventedPLEGConn.Inc()
})
if err != nil {
...
numAttempts++
}
}
}()
}
-
runtimeService.GetContainerEvents
获取容器的Events
, 我们看proto
中的定义就明白,为什么这里可以主动监听这个事件的变化,因为它的返回值是一个steam
类型steam
在gRPC
中是一个双向流的过程,可以实现数据的双向传输。
rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}
func (r *remoteRuntimeService) GetContainerEvents(ctx context.Context, containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error {
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(ctx, &runtimeapi.GetEventsRequest{})
...
for {
resp, err := containerEventsStreamingClient.Recv()
if err == io.EOF {
r.logErr(err, "container events stream is closed")
return err
}
if err != nil {
r.logErr(err, "failed to receive streaming container event")
return err
}
if resp != nil {
containerEventsCh <- resp
r.log(4, "container event received", "resp", resp)
}
}
}
- 处理
Event
, 它接收到Event
之后会根据类型发送到GenericPLEG
结构中的eventChannel
chan 中。
func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeapi.ContainerEventResponse) {
for event := range containerEventsResponseCh {
...
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
shouldSendPLEGEvent = true
} else {
if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) {
shouldSendPLEGEvent = true
}
}
if shouldSendPLEGEvent {
e.processCRIEvent(event)
}
}
}
func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) {
switch event.ContainerEventType {
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
klog.V(4).InfoS("Received Container Created Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Started Event", "event", event.String())
case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
klog.V(4).InfoS("Received Container Deleted Event", "event", event)
}
}
syncLoopIteration
下面我们来看在 syncLoopIteration
中是如何处理这个 Event
的。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
}
return true
}
核心执行流程如下:
- 如果不是
event.Type != pleg.ContainerRemoved
移除事件,则进入执行,如果Pod
还在PodManager
中,则执行HandlePodSyncs
,否则跳过。 HandlePodSyncs
中主要是往podWorkers
中发送UpdatePod
的SyncPodSync
操作。- 如果是
e.Type == pleg.ContainerDied
退出事件,则清除容器。