avatar

朝花惜拾

Be a Real Engineer

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于
Home Client-go 源码分析之 Informer 机制
文章

Client-go 源码分析之 Informer 机制

Posted 2024-06-16 Updated 2024-11- 27
By Ray Lyu
30~39 min read

Client-go 是 Kubernetes 的 Go 语言官方客户端库,开发者可以基于它来实现与 API Server 的交互。Client-go 的核心功能之一是维护集群中各种资源对象的实时状态,使组件可以直接从本地读取资源对象信息,而不必每次都请求 API Server。Client-go 基于其 Informer 机制实现了这种实时维护,Informer 机制使得 Kubernetes 组件之间无需依赖任何中间件,而是仅通过 HTTP 协议的通信就保证了消息的实时性、可靠性、顺序性。

1. Client 对象

Informer 基于底层 Client 来与 API Server 进行交互,我们先来复习一下 Client 对象。Client-go 支持 4 种客户端对象,其中最基础的是 RestClient,常用的是 ClientSet。

RestClient 直接对应于底层的 RESTful 操作,适用于需要精细化控制请求的场景。

package main

import (
    "context"

    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
    if err != nil {
        panic(err)
    }
    config.APIPath = "api"
    config.GroupVersion = &corev1.SchemeGroupVersion
    config.NegotiatedSerializer = scheme.Codecs

    restClient, err := rest.RESTClientFor(config)
    if err != nil {
        panic(err)
    }

    result := &corev1.PodList{}
    err = restClient.Get().Namespace("").Resource("pods").VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).Do(context.Background()).Into(result)
    if err != nil {
        panic(err)
    }

    for _, d := range result.Items {
        println(d.Name)
    }
}

实际开发中一般不会直接使用 RestClient,而通常使用基于其封装的 ClientSet。ClientSet 是多个客户端的集合,它封装了一系列对于 Kubernetes 内置资源对象的操作方法。

package main

import (
    "context"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
    if err != nil {
        panic(err.Error())
    }

    clientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }
    podClient := clientSet.CoreV1().Pods("")
    list, err := podClient.List(context.Background(), metav1.ListOptions{Limit: 500})
    if err != nil {
        panic(err.Error())
    }
    for _, d := range list.Items {
        println(d.Name)
    }
}

不过原生的 ClientSet 对象只能访问 Kubernetes 的内置资源对象,如果需要访问 CRD 对象,需要通过代码生成器另外生成对应的 ClientSet 接口(e.g. Godel-scheduler)。

2. Informer 机制

代码版本为 c415c765,最新提交为 2024 年 6 月。

Infomer 基于 ClientSet 创建,每一种资源都会有对应的 informer,且可以为资源对象的事件注册对应的回调函数。Informer 的基本用法示例如下。

package main

import (
    "fmt"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    // 创建clientSet
    config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
    if err != nil {
        panic(err)
    }
    clientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    stopCh := make(chan struct{})
    defer close(stopCh)

    // informer基于clientSet创建
    sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
    informer := sharedInformers.Core().V1().Pods().Informer()

    // 为Pod资源对象注册event回调函数
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod, ok := obj.(*corev1.Pod)
            if !ok {
                return
            }

            fmt.Printf("add pod %s\n", pod.Name)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod, ok := oldObj.(*corev1.Pod)
            if !ok {
                return
            }
            newPod, ok := newObj.(*corev1.Pod)
            if !ok {
                return
            }
            fmt.Printf("update pod %s to pod %s\n", oldPod.Name, newPod.Name)
        },
        DeleteFunc: func(obj interface{}) {
            pod, ok := obj.(*corev1.Pod)
            if !ok {
                return
            }
            fmt.Printf("delete pod %s\n", pod.Name)
        },
    })

    informer.Run(stopCh)
}

Informer 是可以被共享的,使用同一类资源的 Informer 会共享一个 Reflector。因此尽量不要实例化多个相同资源的 Informer,避免给 API Server 造成负担。

2.1. 架构

Informer 机制的原理如下图所示。

  • Reflector:从 API Server 监听集群中发生的资源对象的变更事件,并将事件写入 DeltaFIFO 中。

    • DeltaFIFO:临时存储资源对象的变更事件,Reflector 中的消费工作流会异步处理其中的事件。

  • Indexer:自带索引功能的本地存储,Indexer 与 Etcd 中的数据完全保持一致。

2.2. Reflector

Reflector 模块的核心函数是 ListAndWatch,该函数执行一次初始 list,然后调用 watch 方法进行持续的监听。watchList 是流式地进行 list,主要是为了减少 API Server 侧的资源占用(暂不深究),Client-go 默认会走常规的 list 方法。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    // ...
    if useWatchList { // stream list
        w, err = r.watchList(stopCh)
        // ...
    }

    if fallbackToList { // normal list
        err = r.list(stopCh)
    // ...
    }

    // ...
    go r.startResync(stopCh, cancelCh, resyncerrc)
    return r.watch(w, stopCh, resyncerrc)
}

list 方法会更新 Reflector 的 lastSyncResourceVersion 字段记录下此次获取的资源列表的 ResourceVerision,后续 watch 时会携带这个版本号,告知 API Server 这边关心的是该版本号之后的事件。并且在 watch 到新的事件时还会继续更新 lastSyncResourceVersion 变量。通过添加日志进行观察,发现 list 操作得到的资源列表整体的 ResoureVersion 并不直接等于所有资源对象的 ResourceVersion 的最大值,但总是比它们都要大。资源对象本身的 ResourceVersion 也并不是每次增加 1,而是一个不固定的数。

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
    // ...
    for {
        // ...
        if w == nil {
            timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
            options := metav1.ListOptions{
                // 携带最新的资源版本号
                ResourceVersion: r.LastSyncResourceVersion(),
                // ...
            }

            // Informer调用资源对应的底层client(clientSet->restClient)的Watch方法,该方法执行watch请求(长连接)并返回一个channel,
            // informer通过监听该channel实时地获取相关event。
            w, err = r.listerWatcher.Watch(options)
            // ...
        }

        // 监听channel,实时获取资源对象的event并处理
        err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
        // ...
    }
}

ResourceVersion 和 lastSyncResourceVersion 相关的日志信息如下。

上面的 watchHandler 函数会实时获取资源对象的 event 进行处理,并将对应的 event 写入 DeltaFIFO 队列中。

2.3. DeltaFIFO

DeltaFIFO 意即“存放变化情况的先进先出队列”,它的核心数据结构与示意图如下。

type DeltaFIFO struct {
    // ...
    // `items` maps a key to a Deltas.
    // Each such Deltas has at least one Delta.
    items map[string]Deltas

    // `queue` maintains FIFO order of keys for consumption in Pop().
    // There are no duplicates in `queue`.
    // A key is in `queue` if and only if it is in `items`.
    queue []string
    // ...
}

DeltaFIFO 在写侧的关键函数是 queueActionInternalLocked,读侧的函数则是 Pop。

func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // ...

    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } 
    // ...
    return nil
}

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.closed {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        isInInitialList := !f.hasSynced_locked()
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // This should never happen
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        // ...
        err := process(item, isInInitialList)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

这里有一个问题:由于 DeltaFIFO 中的事件是按照 key 聚合存储的,若事件到达的顺序是 add a、add b、delete a,则由于实际 a 的事件会被集中处理,那么看起来实际的处理顺序会是 add a、delete a、add b。是否 client-go 并不会保证事件处理的顺序与事件到来的顺序一致?如果是,这会导致什么问题?

process 函数最终会调用 processDeltas,其中主要有两件事:1)把资源对象更新到 Client-go 的本地存储中;2)分发事件,触发用户注册的回调函数。


func processDeltas(
    // Object which receives event notifications from the given deltas
    handler ResourceEventHandler,
    clientState Store,
    deltas Deltas,
    isInInitialList bool,
) error {
    // from oldest to newest
    for _, d := range deltas {
        obj := d.Object

        switch d.Type {
        case Sync, Replaced, Added, Updated:
            if old, exists, err := clientState.Get(obj); err == nil && exists {
                // 更新 indexer
                if err := clientState.Update(obj); err != nil {
                    return err
                }
                // 触发回调函数
                handler.OnUpdate(old, obj)
            } else {
                if err := clientState.Add(obj); err != nil {
                    return err
                }
                handler.OnAdd(obj, isInInitialList)
            }
        case Deleted:
            // 更新 indexer
            if err := clientState.Delete(obj); err != nil {
                return err
            }
            // 触发回调函数
            handler.OnDelete(obj)
        }
    }
    return nil
}

2.4. Indexer

Indexer 在代码中是一个 Store 接口,它定义了如下方法。

type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
}

实现 Store 接口的核心数据结构是 threadSafeMap,它主要就是一个 map。除了 Store 定义的方法之外,threadSafeMap 还定义了 indexer 相关的方法,它们与本地存储的索引机制有关。索引通常是基于对象的某个字段(e.g. label)构建的,这样就可以快速查找具有特定字段值的所有对象。

// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore
    // keyFunc is used to make the key for objects stored in and retrieved from items, and
    // should be deterministic.
    keyFunc KeyFunc
}

type ThreadSafeStore interface {
    // 和 Store 相同的方法...
    
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
    AddIndexers(newIndexers Indexers) error
    // ...
}

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // index implements the indexing functionality
    index *storeIndex
}

3. 参考资料

  1. 《Kubernetes 源码剖析》

Kubernetes
Kubernetes
License:  CC BY 4.0
Share

Further Reading

Jun 16, 2024

Client-go 源码分析之 Informer 机制

Client-go 是 Kubernetes 的 Go 语言官方客户端库,开发者可以基于它来实现与 API Server 的交互。Client-go 的核心功能之一是维护集群中各种资源对象的实时状态,使组件可以直接从本地读取资源对象信息,而不必每次都请求 API Server。Client-go 基

Jan 28, 2024

Kube-scheduler 源码分析之调度队列

源码基于 v1.27 子队列 调度器的 SchedulingQueue 实现是一个 PriorityQueue 结构体,其中有三个子队列。 ActiveQ(heap):存放就绪的 pod,调度流程会从中取出 pod 进行调度; BackOffQ(heap):存放调度失败的 pod,这里的 pod 各

OLDER

CSAPP Ch2. 信息的表示和处理

NEWER

Go 语言基础

Recently Updated

  • 6.824 Lab1: MapReduce
  • 服务架构演进小结
  • ChineseChess 程序:Minimax 算法与 Alpha-Beta 剪枝
  • 2024年终总结
  • 初识 RPC 与 REST

Trending Tags

算法 架构 分布式系统 Golang Linux 系统编程 Kubernetes 搜索

Contents

©2025 朝花惜拾. Some rights reserved.

Using the Halo theme Chirpy