跳转至

5.Client-go源码分析之ListerWatcher

一 背景

kubernetes所有API对象都存储在etcd中,并只能通过apiserver访问。如果很多客户端频繁的列举全量对象(比如列举所有的Pod),这会造成apiserver不堪重负。

ListerWatcher是Lister和Watcher的结合体,ListerWatcher负责列举全量对象,Watcher负责监视(本文将watch翻译为监视)对象的增量变化。

通过客户端缓存,至在没有任何状态变化的情况下只需要读取本地缓存即可,减少对API-server的压力效率提升显而易见。通过列举全量对象完成本地缓存,而监视增量则是为了及时的将apiserver的状态变化更新到本地缓存。所以,在apiserver与客户端之间绝大部分传输的是对象的增量变化,当然在异常的情况下还是要重新列举一次全量对象。

本文值得客户端本地缓存就是Indexer,client-go不仅实现了缓存,同时还加了索引,进一步提升了检索效率。

二 ListerWatcher

Kubernetes 控制面 (control plane) 的核心是 API 服务器 (API server)。API 服务器负责提供 HTTP API,以供用户,集群中的不同部分和集群外部组件相互通信。控制器也不例外,所有控制器都通过 API 获取集群的当前状态,也通过 API 对集群状态进行修改。

list-watch,作为k8s系统中统一的异步消息传递方式,对系统的性能、数据一致性起到关键性的作用。

值得一提的是,Kubernetes 提供了 watch 机制方便客户端实时获取集群状态,有了这个接口,控制器才得以无延迟(准确地说是低延迟)地对状态变更作出响应。这里指的 "状态变更",就是我们常说的**事件 (event)**。

2.1 EventType

// EventType defines the possible types of events.
type EventType string

const (
    Added    EventType = "ADDED"
    Modified EventType = "MODIFIED"
    Deleted  EventType = "DELETED"
    Bookmark EventType = "BOOKMARK"
    Error    EventType = "ERROR"
)

2.2 ListerWatcher定义

// 复制代码
// client-go/tools/cache/listwatch.go
// Lister is any object that knows how to perform an initial list.
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    Lister
    Watcher
}

2.3 创建ListWatcher对象

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    optionsModifier := func(options *metav1.ListOptions) {
        options.FieldSelector = fieldSelector.String()
    }
    return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

三 小试牛刀

$ kubectl proxy
# 进行listwatch default名称空间下的pods
$ curl "127.0.0.1:8001/api/v1/namespaces/default/pods?watch=1"
# 创建pod进行观察
$ kubectl run nginx --image=nginx
{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx","namespace":"default","uid":"8d0548ce-fb67-4b71-93ec-59ad67b429d9","resourceVersion":"2925331","creationTimestamp":"2022-01-20T07:32:22Z","labels":{"run":"nginx"},"managedFields":[{"manager":"kubectl-run","operation":"Update","apiVersion":"v1","time":"2022-01-20T07:32:22Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:run":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:terminationGracePeriodSeconds":{}}}}]},"spec":{"volumes":[{"name":"kube-api-access-nc5v8","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx","image":"nginx","resources":{},"volumeMounts":[{"name":"kube-api-access-nc5v8","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Pending","qosClass":"BestEffort"}}}

四 代码实现

编写代码对default名称空间下的configmap进行list watch。

package main

import (
    "fmt"
    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/meta"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "os"
    "os/signal"
    "path/filepath"
)

func Must(e interface{}) {
    if e != nil {
        panic(e)
    }
}

func InitClientSet() (*kubernetes.Clientset, error) {
    kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
    restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        return nil, err
    }
    return kubernetes.NewForConfig(restConfig)
}

func InitListerWatcher(clientSet *kubernetes.Clientset, resource, namespace string, fieldSelector fields.Selector) cache.ListerWatcher {
    restClient := clientSet.CoreV1().RESTClient()
    return cache.NewListWatchFromClient(restClient, resource, namespace, fieldSelector)
}

func main() {
    clientSet, err := InitClientSet()
    if err != nil {
        panic(err)
    }

    // 什么常量
    resource := "configmaps"
    namespace := "default"

    configMapListerWatcher := InitListerWatcher(clientSet, resource, namespace, fields.Everything())

    // 1. list操作
    listObj, err := configMapListerWatcher.List(metav1.ListOptions{})

    // meta 包封装了一些处理 runtime.Object 对象的方法,屏蔽了反射和类型转换的过程,
    // 提取出的 items 类型为 []runtime.Object
    items, err := meta.ExtractList(listObj)
    if err != nil {
        Must(err)
    }
    fmt.Println("list result:")
    for _, item := range items {
        configmaps, ok := item.(*v1.ConfigMap)
        if !ok {
            return
        }
        fmt.Printf("namespace: %s, resource name:%s\n", configmaps.Namespace, configmaps.Name)
    }

    // 2. watch 操作
    listMetaInterface, err := meta.ListAccessor(listObj)
    if err != nil {
        Must(err)
    }
    resourceVersion := listMetaInterface.GetResourceVersion()

    watchObj, err := configMapListerWatcher.Watch(metav1.ListOptions{
        ResourceVersion: resourceVersion,
    })

    // 接收信号
    stopCh := make(chan os.Signal)
    signal.Notify(stopCh, os.Interrupt)
    fmt.Println("Start watching...")
    for {
        select {
        case <-stopCh:
            fmt.Println("exit")
            return
        case event, ok := <-watchObj.ResultChan():
            if !ok {
                fmt.Println("Broken channel")
                break
            }
            configmaps, ok := event.Object.(*v1.ConfigMap)
            if !ok {
                return
            }
            fmt.Printf("eventType: %s, watch obj:%s\n", event.Type, configmaps.Name)
        }
    }
}

进行创建configmap测试

五 总结

  • ListerWatcher就是为SharedIndexInformer列举全量对象、监视对象增量变化设计的接口,实现就是Clientset的List和Watch函数;
  • SharedIndexInformer利用ListerWatcher实现了本地缓存与apiserver之间的状态一致性;
  • 不仅可以提升客户端访问API对象的效率,同时可以将对象的增量变化回调给使用者;
  • 从原理上讲,可以用etcd的clientv3.Client实现ListerWatcher,SharedIndexInformer同步etcd的对象,这样一些简单的醒目就可以复用SharedIndexInformer了,毕竟不是所有的项目都需要一个apiserver;

参考资料