ListWatch 事件监听机制

发布时间: 更新时间: 总字数:407 阅读时间:1m 作者: IP上海 分享 网址

Kubernetes 中ListWatch机制通过websocket监听相关事件

Informer

kubernetes informer

图片来源

说明:

  • Informer 从 kube-apiserver 获取事件数据
    • List 接口获取全量事件
    • Watch 监听增量事件,通过用户的回调函数 AddFuncUpdateFuncDeleteFunc 处理事件
  • 流程
    • Reflector 将资源对象的事件添加进 Delta FIFO queue
    • InformerDelta FIFO queue 中的对象数据添加到本地 Local Store 中 和 resourceEventHandler
    • 使用 workqueue 处理业务逻辑,其实现 3 中队列
      • FIFO队列
      • 延时队列
      • 限速队列

示例

Golang

package main

import (
  "fmt"
  "time"

  "k8s.io/api/apps/v1"
  "k8s.io/apimachinery/pkg/labels"
  "k8s.io/client-go/informers"
  "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/tools/cache"
  "k8s.io/klog/v2"
  "sigs.k8s.io/controller-runtime/pkg/client/config"
)

func AddFunc(obj interface{}) {
  deployment := obj.(*v1.Deployment)
  klog.Infof("add a deployment: %#v", deployment)
}

func UpdateFunc(oldObj, newObj interface{}) {
  oldDeployment := oldObj.(*v1.Deployment)
  newDeployment := newObj.(*v1.Deployment)

  klog.Infof("update a oldDeployment: %#v, newDeployment: %#v", oldDeployment, newDeployment)
}

func DeleteFunc(obj interface{}) {
  deployment := obj.(*v1.Deployment)
  klog.Infof("delete a deployment: %#v", deployment.Name)
}

func main() {
  conf, err := config.GetConfig()
  if err != nil {
    fmt.Printf("`~/.kube/config` not found, exit 1")
    return
  }

  clientSet, err := kubernetes.NewForConfig(conf)
  if err != nil {
    fmt.Printf("get clientSet err %#v, skip", err)
    return
  }

  factory := informers.NewSharedInformerFactory(clientSet, 30*time.Second)
  deploymentInformer := factory.Apps().V1().Deployments().Informer()
  deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    AddFunc,
    UpdateFunc: UpdateFunc,
    DeleteFunc: DeleteFunc,
  })

  deploymentLister := factory.Apps().V1().Deployments().Lister()

  stopCh := make(chan struct{})
  defer close(stopCh)
  // start Informer List and Watch
  factory.Start(stopCh)

  // wait for all cache sync
  factory.WaitForCacheSync(stopCh)

  deployments, err := deploymentLister.Deployments("default").List(labels.Everything())

  // print Deployment List
  for index, dp := range deployments {
    fmt.Printf("%d: %#v\n", index, dp)
  }

  <-stopCh
}

// test case:
// kubectl create deployment nginx --image=nginx

shell

  • ListWatch namespaces 变化接口
curl -i http://172.20.0.81:8080/api/v1/watch/namespaces?watch=yes

k8s api nginx 代理

  • cat k8s.conf
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

    server {
        listen 8081;

        location / {
            proxy_pass https://172.20.0.81:6443;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;

            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }
    }

访问:

curl -i http://172.20.0.81:8081/api/v1/watch/pods?watch=yes
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数