跳转至

x-Informer实战之持久化K8s事件至ElasticSearch

一 前言

在系列文章中详细讲解了Informer的相关知识,本届番外获取K8s的事件,将其存储到Elasticsearch,可以利用inforrmer机制回去到应用的时间进行外部持久化存储,或者进行过滤分类展示,或进行数据应用分析告警等。

二 ES部署

为了测试简单,采用Docker启动es。

docker run --name es01 -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e  "ES_JAVA_OPTS=-Xms512m -Xmx512m" elasticsearch:latest

# 使用head客户端链接
docker pull mobz/elasticsearch-head:5
# 启动header 容器
docker run -d --name my-es_admin -p 9100:9100 mobz/elasticsearch-head:5

启动后,es正常,为了更方便操作es,使用head插件来,插件链接异常,需要修改es配置,并重启容器

# 进入容器
$ docker exec -it es01 /bin/bash

# 进入容器后设置参数
# http.cors.enabled: true
# http.cors.allow-origin: "*"
echo 'http.cors.enabled: true' >> config/elasticsearch.yml
echo 'http.cors.allow-origin: "*"' >> config/elasticsearch.yml
# 设置完成,退出后重启容器
docker restart es01

修改配置重启后,可以已经可以通过head组件正常链接es集群

image-20210927143317543

创建索引:

三 代码

var client *elastic.Client
var host = "http://127.0.0.1:9200/"

//初始化
func init() {
    errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
    var err error
    // 这个地方有个小坑 不加上elastic.SetSniff(false) 会连接不上
    client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetErrorLog(errorlog), elastic.SetURL(host))
    if err != nil {
        panic(err)
    }
    info, code, err := client.Ping(host).Do(context.Background())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)

    esversion, err := client.ElasticsearchVersion(host)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Elasticsearch version %s\n", esversion)

}

func Must(err error) {
    if err != nil {
        panic(err)
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    config, err := clientcmd.BuildConfigFromFlags("", "/Users/xuel/.kube/config")
    Must(err)

    clientset, err := kubernetes.NewForConfig(config)
    Must(err)
    sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
    stopChan := make(chan struct{})
    defer close(stopChan)

  // 在此使用event informer,
    eventInformer := sharedInformers.Events().V1beta1().Events().Informer()
    addChan := make(chan v1beta1.Event)
    deleteChan := make(chan v1beta1.Event)
    eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
            Must(err)
            event := &v1beta1.Event{}
            err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event)
            Must(err)
            addChan <- *event
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
        },
        DeleteFunc: func(obj interface{}) {
        },
    }, 0)

    go func() {
        for {
            select {
            case event := <-addChan:
                str, err := json.Marshal(&event)
                Must(err)
                fmt.Printf("插入k8s事件内容:%s", string(str))
                esinsert(str)
                break
            case <-deleteChan:
                break
            }
        }
    }()
    eventInformer.Run(stopChan)
}

func esinsert(str []byte) {
    index := "k8s_informer"
    dbtype := "doc"

    put1, err := client.Index().
        Index(index).
        Type(dbtype).
        Id("1").BodyString(string(str)).
        Do(context.Background())
    if err != nil {
        fmt.Println("Insert es error: %s", err)
    }
    fmt.Println("insert success", put1)
}

四 测试

插入数据后使用es查询:

curl  -H "Content-Type: application/json" -XGET 'http://127.0.0.1:9200/k8s_informer/doc/_search?pretty' -d '{"query":{"match_all":{}}}'

触发k8s事件,会自动记录下来

其他

可以利用inforrmer机制回去到应用的时间进行外部持久化存储,或者进行过滤分类进行图像话展示,或进行数据应用分析告警等。

在本示例中仅仅使用了event 事件,当然你也可以使用其他事件,且仅关注了addfunc,你也可以关注update/delete等操作。