Skip to main content

kubelet源码分析 syncLoopIteration(一) podUpdateCh

· 5 min read
Softwore Developer

podUpdateCh 主要是负责接收 Pod 的数据,分别从文件、URL、Kube-APIServer 三个地方来,下面将详细分析这个 chan 的读写流程。

Podsource 定义了四种表现方式,在代码 pod_update.go

// These constants identify the sources of pods.
const (
// Filesource idenitified updates from a file.
FileSource = "file"
// HTTPSource identifies updates from querying a web page.
HTTPSource = "http"
// ApiserverSource identifies updates from Kubernetes API Server.
ApiserverSource = "api"
// AllSource identifies updates from all sources.
AllSource = "*"
)

a

File

file Watch 功能只在 linux 操作系统上支持,其它操作系统都不支持。

会把 fsnotify 监听的文件事件转换为 Pod 的操作类型;

func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
var eventType podEventType
switch {
case (e.Op & fsnotify.Create) > 0:
eventType = podAdd
case (e.Op & fsnotify.Write) > 0:
eventType = podModify
case (e.Op & fsnotify.Chmod) > 0:
eventType = podModify
case (e.Op & fsnotify.Remove) > 0:
eventType = podDelete
case (e.Op & fsnotify.Rename) > 0:
eventType = podDelete
default:
// Ignore rest events
return nil
}

s.watchEvents <- &watchEvent{e.Name, eventType}
return nil
}

然后根据监听到的文件名的变化,来读取 Pod 的内容,之后更新 store 对象;这里是令我奇怪的,我以为它会往 chan 里面发送一个事件。

func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
switch e.eventType {
case podAdd, podModify:
pod, _ := s.extractFromFile(e.fileName)
return s.store.Add(pod)
case podDelete:
if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
pod, podExist, err := s.store.GetByKey(objKey)
if err = s.store.Delete(pod); err != nil {
return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
}
...
}
}
return nil
}

除了 Watch 文件变化之后,它还有一个 time.Ticker 每隔 10s 会执行一遍全量读取目录下 Pod 的操作;

func (s *sourceFile) run() {
listTicker := time.NewTicker(s.period) #默认配置是10s
go func() {
// 启动之后先读取一遍所有的
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.ErrorS(err, "Unable to process watch event")
}
}
}
}()
s.startWatch()
}

上面有一个疑问:这里是令我奇怪的,我以为它会往 chan 里面发送一个事件。发现它的 send 是写在 store.Add、store.Delete 里面的,它会在每次触发这个函数的时候,把存储起来的 pod 全量的发送到 chan 里面. undelta_store.go#45

func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Update(obj interface{}) error {
...
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Delete(obj interface{}) error {
...
u.PushFunc(u.Store.List())
return nil
}

URL

URL 的方式是通过用户配置的外部服务地址,然后每 20s 去访问一次外部服务。

func (s *sourceURL) run() {
if err := s.extractFromURL(); err != nil {
...
}
}

extractFromURL 会通过访问 HTTP GET 的方式解析返回值,返回当个对象或者是数组都可以解析成功,解析成功之后也是会发送到 merge 中进行合并处理。

API

API 的方式是通过 list-watch 去读取 kube-apiserver 的数据,并且通过 field 进行过滤当前的节点。

lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))

传递的 store 对象里面有 send 的功能,和 file 处理逻辑一样,一个 pod 发生变更,就会要发送所有的 pod 去执行 merge 操作。

r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)

Merge

merge 是为了把从不同的 source 过来的 pod 进一步合并,避免出现重复、冲突的操作,最终得到一个一致的操作,最终合并后会得到各种事件的结果:

adds, updates, deletes, removes, reconciles := s.merge(source, change)

合并的难点在于判断是不是要更新,添加、删除这种场景都比较简单,之前不存在就添加,之前存在,现在不存在就删除,但是要不要更新是比较难判断的,里面有一个 checkAndUpdatePod 方法。

func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {

1. 检查Spec、Labels、DeletionTimestamp、DeletionGracePeriodSeconds、Annotations这些是否发生变更
2. 如果都没有发生变更判断Status是否发生变更,如果有则更新,没有则不更新。
3. 如果DeletionTimestamp不为空,则需要删除
....
}