Client-go 源码分析之 Informer 机制
Client-go 是 Kubernetes 的 Go 语言官方客户端库,开发者可以基于它来实现与 API Server 的交互。Client-go 的核心功能之一是维护集群中各种资源对象的实时状态,使组件可以直接从本地读取资源对象信息,而不必每次都请求 API Server。Client-go 基于其 Informer 机制实现了这种实时维护,Informer 机制使得 Kubernetes 组件之间无需依赖任何中间件,而是仅通过 HTTP 协议的通信就保证了消息的实时性、可靠性、顺序性。
1. Client 对象
Informer 基于底层 Client 来与 API Server 进行交互,我们先来复习一下 Client 对象。Client-go 支持 4 种客户端对象,其中最基础的是 RestClient,常用的是 ClientSet。
RestClient 直接对应于底层的 RESTful 操作,适用于需要精细化控制请求的场景。
package main
import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
if err != nil {
panic(err)
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
result := &corev1.PodList{}
err = restClient.Get().Namespace("").Resource("pods").VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).Do(context.Background()).Into(result)
if err != nil {
panic(err)
}
for _, d := range result.Items {
println(d.Name)
}
}
实际开发中一般不会直接使用 RestClient,而通常使用基于其封装的 ClientSet。ClientSet 是多个客户端的集合,它封装了一系列对于 Kubernetes 内置资源对象的操作方法。
package main
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
if err != nil {
panic(err.Error())
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
podClient := clientSet.CoreV1().Pods("")
list, err := podClient.List(context.Background(), metav1.ListOptions{Limit: 500})
if err != nil {
panic(err.Error())
}
for _, d := range list.Items {
println(d.Name)
}
}
不过原生的 ClientSet 对象只能访问 Kubernetes 的内置资源对象,如果需要访问 CRD 对象,需要通过代码生成器另外生成对应的 ClientSet 接口(e.g. Godel-scheduler)。
2. Informer 机制
代码版本为 c415c765,最新提交为 2024 年 6 月。
Infomer 基于 ClientSet 创建,每一种资源都会有对应的 informer,且可以为资源对象的事件注册对应的回调函数。Informer 的基本用法示例如下。
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 创建clientSet
config, err := clientcmd.BuildConfigFromFlags("", "/home/rookie0080/.kube/config")
if err != nil {
panic(err)
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
// informer基于clientSet创建
sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
// 为Pod资源对象注册event回调函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
fmt.Printf("add pod %s\n", pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
return
}
fmt.Printf("update pod %s to pod %s\n", oldPod.Name, newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
fmt.Printf("delete pod %s\n", pod.Name)
},
})
informer.Run(stopCh)
}
Informer 是可以被共享的,使用同一类资源的 Informer 会共享一个 Reflector。因此尽量不要实例化多个相同资源的 Informer,避免给 API Server 造成负担。
2.1. 架构
Informer 机制的原理如下图所示。
Reflector:从 API Server 监听集群中发生的资源对象的变更事件,并将事件写入 DeltaFIFO 中。
DeltaFIFO:临时存储资源对象的变更事件,Reflector 中的消费工作流会异步处理其中的事件。
Indexer:自带索引功能的本地存储,Indexer 与 Etcd 中的数据完全保持一致。
2.2. Reflector
Reflector 模块的核心函数是 ListAndWatch,该函数执行一次初始 list,然后调用 watch 方法进行持续的监听。watchList 是流式地进行 list,主要是为了减少 API Server 侧的资源占用(暂不深究),Client-go 默认会走常规的 list 方法。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// ...
if useWatchList { // stream list
w, err = r.watchList(stopCh)
// ...
}
if fallbackToList { // normal list
err = r.list(stopCh)
// ...
}
// ...
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
list 方法会更新 Reflector 的 lastSyncResourceVersion 字段记录下此次获取的资源列表的 ResourceVerision,后续 watch 时会携带这个版本号,告知 API Server 这边关心的是该版本号之后的事件。并且在 watch 到新的事件时还会继续更新 lastSyncResourceVersion 变量。通过添加日志进行观察,发现 list 操作得到的资源列表整体的 ResoureVersion 并不直接等于所有资源对象的 ResourceVersion 的最大值,但总是比它们都要大。资源对象本身的 ResourceVersion 也并不是每次增加 1,而是一个不固定的数。
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
// ...
for {
// ...
if w == nil {
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
// 携带最新的资源版本号
ResourceVersion: r.LastSyncResourceVersion(),
// ...
}
// Informer调用资源对应的底层client(clientSet->restClient)的Watch方法,该方法执行watch请求(长连接)并返回一个channel,
// informer通过监听该channel实时地获取相关event。
w, err = r.listerWatcher.Watch(options)
// ...
}
// 监听channel,实时获取资源对象的event并处理
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
// ...
}
}
ResourceVersion 和 lastSyncResourceVersion 相关的日志信息如下。
上面的 watchHandler 函数会实时获取资源对象的 event 进行处理,并将对应的 event 写入 DeltaFIFO 队列中。
2.3. DeltaFIFO
DeltaFIFO 意即“存放变化情况的先进先出队列”,它的核心数据结构与示意图如下。
type DeltaFIFO struct {
// ...
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// ...
}
DeltaFIFO 在写侧的关键函数是 queueActionInternalLocked,读侧的函数则是 Pop。
func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// ...
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
}
// ...
return nil
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
// ...
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
这里有一个问题:由于 DeltaFIFO 中的事件是按照 key 聚合存储的,若事件到达的顺序是 add a、add b、delete a,则由于实际 a 的事件会被集中处理,那么看起来实际的处理顺序会是 add a、delete a、add b。是否 client-go 并不会保证事件处理的顺序与事件到来的顺序一致?如果是,这会导致什么问题?
process 函数最终会调用 processDeltas,其中主要有两件事:1)把资源对象更新到 Client-go 的本地存储中;2)分发事件,触发用户注册的回调函数。
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
// 更新 indexer
if err := clientState.Update(obj); err != nil {
return err
}
// 触发回调函数
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
// 更新 indexer
if err := clientState.Delete(obj); err != nil {
return err
}
// 触发回调函数
handler.OnDelete(obj)
}
}
return nil
}
2.4. Indexer
Indexer 在代码中是一个 Store 接口,它定义了如下方法。
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
实现 Store 接口的核心数据结构是 threadSafeMap,它主要就是一个 map。除了 Store 定义的方法之外,threadSafeMap 还定义了 indexer 相关的方法,它们与本地存储的索引机制有关。索引通常是基于对象的某个字段(e.g. label)构建的,这样就可以快速查找具有特定字段值的所有对象。
// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
type ThreadSafeStore interface {
// 和 Store 相同的方法...
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
AddIndexers(newIndexers Indexers) error
// ...
}
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// index implements the indexing functionality
index *storeIndex
}
3. 参考资料
《Kubernetes 源码剖析》