Kubernetes 中ListWatch机制通过websocket监听相关事件
图片来源
说明:
- Informer 从 kube-apiserver 获取事件数据
- List 接口获取全量事件
- Watch 监听增量事件,通过用户的回调函数
AddFunc
、UpdateFunc
、DeleteFunc
处理事件
- 流程
Reflector
将资源对象的事件添加进 Delta FIFO queue
中Informer
将 Delta FIFO queue
中的对象数据添加到本地 Local Store
中 和 resourceEventHandler
- 使用 workqueue 处理业务逻辑,其实现 3 中队列
示例
Golang
package main
import (
"fmt"
"time"
"k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
func AddFunc(obj interface{}) {
deployment := obj.(*v1.Deployment)
klog.Infof("add a deployment: %#v", deployment)
}
func UpdateFunc(oldObj, newObj interface{}) {
oldDeployment := oldObj.(*v1.Deployment)
newDeployment := newObj.(*v1.Deployment)
klog.Infof("update a oldDeployment: %#v, newDeployment: %#v", oldDeployment, newDeployment)
}
func DeleteFunc(obj interface{}) {
deployment := obj.(*v1.Deployment)
klog.Infof("delete a deployment: %#v", deployment.Name)
}
func main() {
conf, err := config.GetConfig()
if err != nil {
fmt.Printf("`~/.kube/config` not found, exit 1")
return
}
clientSet, err := kubernetes.NewForConfig(conf)
if err != nil {
fmt.Printf("get clientSet err %#v, skip", err)
return
}
factory := informers.NewSharedInformerFactory(clientSet, 30*time.Second)
deploymentInformer := factory.Apps().V1().Deployments().Informer()
deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddFunc,
UpdateFunc: UpdateFunc,
DeleteFunc: DeleteFunc,
})
deploymentLister := factory.Apps().V1().Deployments().Lister()
stopCh := make(chan struct{})
defer close(stopCh)
// start Informer List and Watch
factory.Start(stopCh)
// wait for all cache sync
factory.WaitForCacheSync(stopCh)
deployments, err := deploymentLister.Deployments("default").List(labels.Everything())
// print Deployment List
for index, dp := range deployments {
fmt.Printf("%d: %#v\n", index, dp)
}
<-stopCh
}
// test case:
// kubectl create deployment nginx --image=nginx
shell
- ListWatch namespaces 变化接口
curl -i http://172.20.0.81:8080/api/v1/watch/namespaces?watch=yes
k8s api nginx 代理
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 8081;
location / {
proxy_pass https://172.20.0.81:6443;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
访问:
curl -i http://172.20.0.81:8081/api/v1/watch/pods?watch=yes