controller-runtime 是一套用于构建 kubernetes 控制器的 go 库。它被Kubebuilder和OperatorSDK所引用
介绍
- 第三方开发时绕开复杂的
client-go
实现 - kubernetes-sigs/controller-runtime 代码架构如下

上图参考

上图参考
Managers
每个 控制器(controller)
和 网络钩子(webhook)
最终都由一个管理器(Manager, pkg/manager)运行。管理器(Manager)
负责运行 控制器(controller)
和 网络钩子(webhook)
,设置 共享缓存(cache,Infomer实现)
和客户端等常见依赖项,以及管理领导者选举(pkg/leaderelection)。管理器通常会通过连接信号处理程序(pkg/manager/signals),在 pod 终止时优雅地关闭控制器。
Controllers
控制器(pkg/controller)使用事件(pkg/event)来最终触发调和请求。它们可以手动构建,但通常使用构建器(pkg/builder)来构建,该构建器可简化事件源(pkg/source)(如 Kubernetes API 对象变更)与事件处理程序(pkg/handler)的连接,如 “为对象所有者queue 一个对账请求”。谓词(pkg/predicate)可用于过滤哪些事件会真正触发调节器(reconciles)。常用情况下有预写的实用程序,高级情况下有接口和帮助程序。
PS: Controller 一方面会向 Informer 注册 eventHandler,另一方面会从队列中拿数据并执行用户侧 Reconciler 的函数
Reconcilers
控制器逻辑是通过调节器(pkg/reconcile)实现的。调节器实现了一个函数,该函数接收包含要调节对象名称和命名空间的调节请求,调节对象,并返回一个 Response 或一个错误信息,表明是否重新请求进行第二轮处理。
Clients and Caches
调和器使用客户端(pkg/client)访问 API 对象。管理器提供的默认客户端从本地共享缓存(pkg/cache)中读取,并直接写入 API 服务器,但也可以构建只与 API 服务器对话的客户端,而不使用缓存。缓存会自动填充监视对象,并在请求其他结构化对象时自动填充。默认的拆分客户端不承诺在写入过程中使缓存失效(也不承诺按顺序创建/获取一致性),代码不应假定在创建/更新后立即获取会返回更新的资源。缓存还可以有索引,索引可以通过从管理器获取的 FieldIndexer (pkg/client) 创建。索引可用于快速、轻松地查找设置了某些字段的所有对象。调节器可检索事件记录器(pkg/recorder),以便使用管理器发出事件。
PS:Cache(缓存)
用于建立 Informer 对 ApiServer 进行连接 watch 资源,并将 watch 到的 object 推入队列
Schemes
Kubernetes中的客户端、缓存和许多其他东西都使用方案(pkg/scheme)将 Go 类型与 Kubernetes API Kinds(具体来说是 Group-Version-Kinds)相关联。
Webhooks
同样,网络钩子(pkg/webhook/admission)也可以直接实现,但通常使用构建器(pkg/webhook/admission/builder)来构建。它们通过由管理器管理的服务器(pkg/webhook)运行。
Logging and Metrics
controller-runtime 中的日志 (pkg/log) 是通过结构化日志完成的,使用的是名为 logr 的日志接口集 (https://pkg.go.dev/github.com/go-logr/logr)。虽然 controller-runtime 提供了使用 Zap(https://go.uber.org/zap, pkg/log/zap)的简易设置,但你也可以提供任何 logr 实现作为 controller-runtime 的基础日志记录器。
controller-runtime 提供的指标 (pkg/metrics) 会注册到特定于 controller-runtime 的 Prometheus 指标注册表中。管理器可通过 HTTP 端点为这些指标提供服务,其他指标也可正常注册到该注册表中。
示例
- 重要方法说明
- sigs.k8s.io/controller-runtime 中通过变量定义入口,包括
- GetConfigOrDie 创建
*rest.Config
,用于调用 Kubernetes apiserver
- GetConfig
- 支持通过
--kubeconfig
指定,默认为 $HOME/.kube/config
- NewControllerManagedBy
- NewWebhookManagedBy
- NewManager 等
- builder
- ControllerManagedBy 返回一个新的
控制器构建器(controller builder)
,该构建器将由提供的管理器启动 - Complete 构建Application Controller
- For 定义了 reconciled 的对象类型,并配置
ControllerManagedBy
通过 reconciling the object 来响应创建/删除/更新事件。这等同于调用 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
- Watches 定义要监控的对象类型,配置 ControllerManagedBy,使其通过 reconciling the object 与给定的 EventHandler 来响应创建/删除/更新事件
- WatchesMetadata 与 Watches 相同,仅监控 Metadata
- 当观察大量对象、非常大的对象或只知道 GVK 但不知道其结构的对象时,这非常有用
- 优化技巧
Reconcile 用法
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.17.3/example_test.go#L47
package main
import (
"context"
"fmt"
"os"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// This example creates a simple application Controller that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
// ReplicaSetReconciler.
//
// * Start the application.
func main() {
log := ctrl.Log.WithName("builder-examples")
manager, err := ctrl.NewManager(
ctrl.GetConfigOrDie(),
ctrl.Options{}, // arguments for creating a new Manager, like Scheme, Cache, Client, Election
)
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
err = ctrl.
NewControllerManagedBy(manager). // Create the Controller
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}). // ReplicaSet owns Pods created by it
Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}
if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}
// ReplicaSetReconciler is a simple Controller example implementation.
type ReplicaSetReconciler struct {
client.Client
}
// Implement the business logic:
// This function will be called when there is a change to a ReplicaSet or a Pod with an OwnerReference
// to a ReplicaSet.
//
// * Read the ReplicaSet
// * Read the Pods
// * Set a Label on the ReplicaSet with the Pod count.
func (a *ReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := a.Get(ctx, req.NamespacedName, rs)
if err != nil {
return ctrl.Result{}, err
}
// List the Pods matching the PodTemplate Labels
pods := &corev1.PodList{}
err = a.List(ctx, pods, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return ctrl.Result{}, err
}
// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(pods.Items))
err = a.Update(ctx, rs)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
EventHandler 用法
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/controller#example-New
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/handler#example-Funcs
package main
import (
"context"
"os"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
func main() {
log := ctrl.Log.WithName("builder-examples")
manager, err := ctrl.NewManager(
ctrl.GetConfigOrDie(),
ctrl.Options{}, // arguments for creating a new Manager, like Scheme, Cache, Client, Election
)
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
c, err := controller.New("pod-controller", manager, controller.Options{
Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
// Your business logic to implement the API by creating, updating, deleting objects goes here.
return reconcile.Result{}, nil
}),
})
if err != nil {
log.Error(err, "unable to create pod-controller")
os.Exit(1)
}
// controller is a controller.controller
err = c.Watch(
source.Kind(manager.GetCache(), &corev1.Pod{}),
handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.ObjectNew.GetName(),
Namespace: e.ObjectNew.GetNamespace(),
}})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
},
)
if err != nil {
// handle it
panic(err)
}
if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}
Cache selector 用法
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.17.3/example_test.go#L47
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/manager#example-New-LimitToNamespaces
package main
import (
"context"
"fmt"
"os"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// This example creates a simple application Controller that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
// ReplicaSetReconciler.
//
// * Start the application.
func main() {
log := ctrl.Log.WithName("builder-examples")
manager, err := ctrl.NewManager(
ctrl.GetConfigOrDie(),
ctrl.Options{
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.DefaultNamespaces = map[string]cache.Config{
"namespace1": {},
"namespace2": {},
}
return cache.New(config, opts)
},
}, // arguments for creating a new Manager, like Scheme, Cache, Client, Election
)
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
err = ctrl.
NewControllerManagedBy(manager). // Create the Controller
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}). // ReplicaSet owns Pods created by it
Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}
if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}
// ReplicaSetReconciler is a simple Controller example implementation.
type ReplicaSetReconciler struct {
client.Client
}
// Implement the business logic:
// This function will be called when there is a change to a ReplicaSet or a Pod with an OwnerReference
// to a ReplicaSet.
//
// * Read the ReplicaSet
// * Read the Pods
// * Set a Label on the ReplicaSet with the Pod count.
func (a *ReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := a.Get(ctx, req.NamespacedName, rs)
if err != nil {
return ctrl.Result{}, err
}
// List the Pods matching the PodTemplate Labels
pods := &corev1.PodList{}
err = a.List(ctx, pods, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return ctrl.Result{}, err
}
// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(pods.Items))
err = a.Update(ctx, rs)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
Custom Handler 用法
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3#example-package-CustomHandler
package main
import (
"context"
"encoding/json"
"os"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
// since we invoke tests with -ginkgo.junit-report we need to import ginkgo.
_ "github.com/onsi/ginkgo/v2"
)
type ExampleCRDWithConfigMapRef struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
ConfigMapRef corev1.LocalObjectReference `json:"configMapRef"`
}
func deepCopyObject(arg any) runtime.Object {
argBytes, err := json.Marshal(arg)
if err != nil {
panic(err)
}
out := &ExampleCRDWithConfigMapRefList{}
if err := json.Unmarshal(argBytes, out); err != nil {
panic(err)
}
return out
}
// DeepCopyObject implements client.Object.
func (in *ExampleCRDWithConfigMapRef) DeepCopyObject() runtime.Object {
return deepCopyObject(in)
}
type ExampleCRDWithConfigMapRefList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ExampleCRDWithConfigMapRef `json:"items"`
}
// DeepCopyObject implements client.ObjectList.
func (in *ExampleCRDWithConfigMapRefList) DeepCopyObject() runtime.Object {
return deepCopyObject(in)
}
func main() {
log := ctrl.Log.WithName("builder-examples")
manager, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
err = ctrl.
NewControllerManagedBy(manager).
For(&ExampleCRDWithConfigMapRef{}).
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cm client.Object) []ctrl.Request {
// map a change from referenced configMap to ExampleCRDWithConfigMapRef, which causes its re-reconcile
crList := &ExampleCRDWithConfigMapRefList{}
if err := manager.GetClient().List(ctx, crList); err != nil {
manager.GetLogger().Error(err, "while listing ExampleCRDWithConfigMapRefs")
return nil
}
reqs := make([]ctrl.Request, 0, len(crList.Items))
for _, item := range crList.Items {
if item.ConfigMapRef.Name == cm.GetName() {
reqs = append(reqs, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: item.GetNamespace(),
Name: item.GetName(),
},
})
}
}
return reqs
})).
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
// Your business logic to implement the API by creating, updating, deleting objects goes here.
return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}
if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}
LimitToNamespaces 用法
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/manager#example-New-LimitToNamespaces
package main
import (
"os"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/config"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
var
// NB: don't call SetLogger in init(), or else you'll mess up logging in the main suite.
log = logf.Log.WithName("manager-examples")
func main() {
cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "unable to get kubeconfig")
os.Exit(1)
}
mgr, err := manager.New(cfg, manager.Options{
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.DefaultNamespaces = map[string]cache.Config{
"namespace1": {},
"namespace2": {},
}
return cache.New(config, opts)
}},
)
if err != nil {
log.Error(err, "unable to set up manager")
os.Exit(1)
}
log.Info("created manager", "manager", mgr)
}
LeaderElection 用法
// https://github.com/kubernetes-sigs/controller-runtime/blob/v0.17.3/example_test.go#L172
package main
import (
"context"
"fmt"
"os"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func main() {
log := ctrl.Log.WithName("builder-examples")
leaseDuration := 100 * time.Second
renewDeadline := 80 * time.Second
retryPeriod := 20 * time.Second
manager, err := ctrl.NewManager(
ctrl.GetConfigOrDie(),
ctrl.Options{
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadline,
RetryPeriod: &retryPeriod,
})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
err = ctrl.
NewControllerManagedBy(manager). // Create the Controller
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}). // ReplicaSet owns Pods created by it
Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}
if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}
// ReplicaSetReconciler is a simple Controller example implementation.
type ReplicaSetReconciler struct {
client.Client
}
// Implement the business logic:
// This function will be called when there is a change to a ReplicaSet or a Pod with an OwnerReference
// to a ReplicaSet.
//
// * Read the ReplicaSet
// * Read the Pods
// * Set a Label on the ReplicaSet with the Pod count.
func (a *ReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := a.Get(ctx, req.NamespacedName, rs)
if err != nil {
return ctrl.Result{}, err
}
// List the Pods matching the PodTemplate Labels
pods := &corev1.PodList{}
err = a.List(ctx, pods, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return ctrl.Result{}, err
}
// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(pods.Items))
err = a.Update(ctx, rs)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
webhook 用法
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/builder#example-WebhookBuilder
package main
import (
"os"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client/config"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
examplegroup "sigs.k8s.io/controller-runtime/examples/crd/pkg"
)
func main() {
var log = logf.Log.WithName("webhookbuilder-example")
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}
err = builder.
WebhookManagedBy(mgr). // Create the WebhookManagedBy
For(&examplegroup.ChaosPod{}). // ChaosPod is a CRD.
Complete()
if err != nil {
log.Error(err, "could not create webhook")
os.Exit(1)
}
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}