diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 504a285..60f88cc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,17 +27,29 @@ jobs: go mod tidy go vet ./... make test + gotestsum --junitfile=tmp/test-results/junit.xml --format=standard-verbose -- -coverprofile=coverage.out -covermode=atomic ./... - name: Upload coverage report if: always() uses: actions/upload-artifact@v4 with: name: coverage-report - path: coverage.txt + path: coverage.out - name: Upload coverage report if: always() uses: actions/upload-artifact@v4 with: name: junit.xml - path: tmp/test-results/junit.xml + path: tmp/test-results/junit.xml + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.gitignore b/.gitignore index 05073c9..212eb99 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,5 @@ go.work *.swp *.swo *~ + +tmp/** diff --git a/.golangci.yml b/.golangci.yml index a5bd7f5..7ad88b4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -40,6 +40,7 @@ linters: - dupl - lll - errcheck + - unparam paths: - third_party$ - builtin$ diff --git a/Makefile b/Makefile index b5594f1..4ad760c 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ IMG ?= quay.io/ullbergm/object-lease-controller:latest run: build - ./bin/lease-controller -group startpunkt.ullberg.us -kind Application -version v1alpha2 -leader-elect -leader-elect-namespace default + ./bin/lease-controller -group startpunkt.ullberg.us -kind Application -version v1alpha2 -leader-elect -leader-elect-namespace default -opt-in-label-key "object-lease-controller.ullberg.us/enabled" -opt-in-label-value true tidy: go mod tidy @@ -13,7 +13,7 @@ vet: go vet ./... test: tidy fmt vet - go test ./... -timeout 30s + go test ./... -race -coverprofile=coverage.out build: tidy fmt vet test go build -o bin/lease-controller ./cmd/main.go diff --git a/README.md b/README.md index 0e8e59b..c7d6418 100644 --- a/README.md +++ b/README.md @@ -76,20 +76,41 @@ You can specify the time in hours, minutes, days, weeks, etc. | `3h` | 3 hours | | `10s` | 10 seconds | -#### object-lease-controller.ullberg.us/extended-at +### object-lease-controller.ullberg.us/lease-start + +RFC3339 UTC timestamp. Single source of truth for when the lease started. + +Controller behavior: + +* If `ttl` exists and `lease-start` is missing or invalid, the controller sets `lease-start` to now. +* To extend a lease, delete `lease-start`. The controller sets it to now on the next reconcile. +* You can set `lease-start` explicitly to backdate or align with an external clock. + +Examples: -If you set this value, the lease is calculated from this time rather from the creation time. ```bash -kubectl annotate pod test object-lease-controller.ullberg.us/extended-at=2026-06-11T20:48:11Z +# Extend now by resetting the start +kubectl annotate pod test object-lease-controller.ullberg.us/lease-start- --overwrite + +# Set a specific start time +kubectl annotate pod test object-lease-controller.ullberg.us/lease-start=2025-01-01T12:00:00Z --overwrite ``` -#### object-lease-controller.ullberg.us/expire-at +### object-lease-controller.ullberg.us/expire-at -This annotation is updated by the controller to show when the object will expire. This is meant to be used by systems that display information about objects in the environment. +Set by the controller. RFC3339 UTC timestamp for when the object will expire. Safe for dashboards to read. -#### object-lease-controller.ullberg.us/lease-status +### object-lease-controller.ullberg.us/lease-status -This annotation is updated by the controller to indicate status or issues with the annotations. This is meant to be human readable information. +Set by the controller. Human readable status or validation errors. + +### Removing TTL + +Remove `ttl` to stop lease management. The controller clears `lease-start`, `expire-at`, and `lease-status`. + +```bash +kubectl annotate pod test object-lease-controller.ullberg.us/ttl- +``` ## Example Use Cases - Automatically manage leases for custom resources (e.g., Applications, Databases, Services) @@ -112,10 +133,11 @@ cd object-lease-operator make run ``` -## Optimizations / Features +## Behavior summary -Here are some design decisions and optimizations: -- Controller instances are only managing a single GVK, providing separation of duty and scaling. -- Controllers use dedicated ServiceAccounts with permissions limited to the GVK they are managing. -- Reconcile loop knows when the object is going to expire and will not read the status from the object until it has expired or it has been updated. -- TTL can be added after the object has been created, in that case the expiration is based on the time the TTL annotation was added, rather than the object creation time. +* Add `ttl` to start management. Controller sets `lease-start` if missing. +* Delete `lease-start` to extend from now. +* Optionally set `lease-start` to a specific RFC3339 UTC time. +* Delete `ttl` to stop management. Controller removes lease annotations. +* Reconcile filters only react to changes in `ttl` and `lease-start`. +* The controller computes `expire-at` from `lease-start + ttl` and requeues until expiry. diff --git a/cmd/main.go b/cmd/main.go index c57953a..3f41c6a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "expvar" "flag" "fmt" "net/http" @@ -16,7 +17,8 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "object-lease-controller/pkg/leasewatcher" + controllers "object-lease-controller/pkg/controllers" + "object-lease-controller/pkg/util" ) var ( @@ -27,14 +29,19 @@ func main() { ctrl.SetLogger(zap.New()) var group, version, kind string + var optInLabelKey, optInLabelValue string flag.StringVar(&group, "group", "", "Kubernetes API group (e.g., \"apps\")") flag.StringVar(&version, "version", "", "Kubernetes API version (e.g., \"v1\")") flag.StringVar(&kind, "kind", "", "Kubernetes Kind (e.g., \"ConfigMap\")") - var metricsAddr, probeAddr string + flag.StringVar(&optInLabelKey, "opt-in-label-key", "", "The label key to opt-in namespaces") + flag.StringVar(&optInLabelValue, "opt-in-label-value", "", "The label value to opt-in namespaces") + + var metricsAddr, probeAddr, pprofAddr string flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.StringVar(&pprofAddr, "pprof-bind-address", ":6060", "pprof address") var enableLeaderElection bool var leaderElectionNamespace string @@ -57,6 +64,13 @@ func main() { if kind == "" { kind = os.Getenv("LEASE_GVK_KIND") } + if optInLabelKey == "" { + optInLabelKey = os.Getenv("LEASE_OPT_IN_LABEL_KEY") + } + if optInLabelValue == "" { + optInLabelValue = os.Getenv("LEASE_OPT_IN_LABEL_VALUE") + } + if !enableLeaderElection { if val := os.Getenv("LEASE_LEADER_ELECTION"); val != "" { var err error @@ -107,7 +121,7 @@ func main() { BindAddress: metricsAddr, } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + mgrOpts := ctrl.Options{ Scheme: scheme, LeaderElection: enableLeaderElection, LeaderElectionID: leaderElectionID, @@ -115,25 +129,60 @@ func main() { LeaderElectionReleaseOnCancel: true, Metrics: metricsServerOptions, HealthProbeBindAddress: probeAddr, - }) + } + + if pprofAddr != "" { + mgrOpts.PprofBindAddress = pprofAddr + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts) if err != nil { setupLog.Error(err, "unable to start manager") panic(err) } // Create a LeaseWatcher for the specified GVK - lw := &leasewatcher.LeaseWatcher{ + lw := &controllers.LeaseWatcher{ Client: mgr.GetClient(), GVK: gvk, Recorder: mgr.GetEventRecorderFor(leaderElectionID), } + if optInLabelKey != "" && optInLabelValue != "" { + tracker := util.NewNamespaceTracker() + + nw := &controllers.NamespaceReconciler{ + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor(leaderElectionID), + LabelKey: optInLabelKey, + LabelValue: optInLabelValue, + Tracker: tracker, + } + + // Register NamespaceReconciler with the manager + if err := nw.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "GVK", gvk) + panic(err) + } + + lw.Tracker = tracker + } + // Register the LeaseWatcher with the manager if err := lw.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "GVK", gvk) panic(err) } + // Add metrics server expvar handler + if metricsAddr != "" { + setupLog.Info("Adding /debug/vars to metrics", "address", metricsAddr) + if err := mgr.AddMetricsServerExtraHandler("/debug/vars", expvar.Handler()); err != nil { + setupLog.Error(err, "unable to set up metrics server extra handler") + os.Exit(1) + } + } + // Health check: verify we can talk to the Kubernetes API healthCheck := func(req *http.Request) error { ctx := req.Context() diff --git a/go.mod b/go.mod index 2d02afd..2232f3f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 toolchain go1.24.5 require ( + github.com/go-logr/logr v1.4.2 k8s.io/api v0.33.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 @@ -19,7 +20,6 @@ require ( github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/pkg/leasewatcher/leasewatcher.go b/pkg/controllers/lease_controller.go similarity index 61% rename from pkg/leasewatcher/leasewatcher.go rename to pkg/controllers/lease_controller.go index 311b65d..90c2931 100644 --- a/pkg/leasewatcher/leasewatcher.go +++ b/pkg/controllers/lease_controller.go @@ -1,4 +1,4 @@ -package leasewatcher +package controllers import ( "context" @@ -23,15 +23,21 @@ import ( // Lease annotation keys const ( AnnTTL = "object-lease-controller.ullberg.us/ttl" - AnnExtendedAt = "object-lease-controller.ullberg.us/extended-at" + AnnLeaseStart = "object-lease-controller.ullberg.us/lease-start" // RFC3339 UTC AnnExpireAt = "object-lease-controller.ullberg.us/expire-at" AnnStatus = "object-lease-controller.ullberg.us/lease-status" ) +type clientProvider interface { + GetClient() client.Client +} + type LeaseWatcher struct { client.Client - GVK schema.GroupVersionKind - Recorder record.EventRecorder + GVK schema.GroupVersionKind + Tracker *util.NamespaceTracker + Recorder record.EventRecorder + eventChan chan util.NamespaceChangeEvent } var ( @@ -41,7 +47,7 @@ var ( // Only trigger reconcile when relevant annotations change func leaseRelevantAnns(u *unstructured.Unstructured) map[string]string { anns := u.GetAnnotations() - keys := []string{AnnTTL, AnnExtendedAt} + keys := []string{AnnTTL, AnnLeaseStart} result := map[string]string{} for _, k := range keys { if v, ok := anns[k]; ok { @@ -79,6 +85,22 @@ func (r *LeaseWatcher) Reconcile(ctx context.Context, req controller_runtime.Req log := logger.FromContext(ctx).WithValues("GVK", r.GVK) log.Info("reconciling lease") + // Filter by tracker namespaces + if r.Tracker != nil { + namespaces := r.Tracker.ListNamespaces() + found := false + for _, ns := range namespaces { + if req.Namespace == ns { + found = true + break + } + } + if !found { + log.Info("namespace not tracked, skipping", "namespace", req.Namespace) + return controller_runtime.Result{}, nil + } + } + obj := &unstructured.Unstructured{} obj.SetGroupVersionKind(r.GVK) if err := r.Get(ctx, req.NamespacedName, obj); err != nil { @@ -89,10 +111,16 @@ func (r *LeaseWatcher) Reconcile(ctx context.Context, req controller_runtime.Req log.Error(err, "failed to get object") return controller_runtime.Result{}, client.IgnoreNotFound(err) } + anns := obj.GetAnnotations() - if anns == nil || anns[AnnTTL] == "" { + if anns == nil { + anns = map[string]string{} + } + + // If TTL missing, clean up all lease annotations and return + if anns[AnnTTL] == "" { cleaned := false - for _, k := range []string{AnnExpireAt, AnnStatus, AnnExtendedAt} { + for _, k := range []string{AnnExpireAt, AnnStatus, AnnLeaseStart} { if _, exists := anns[k]; exists { delete(anns, k) cleaned = true @@ -103,72 +131,47 @@ func (r *LeaseWatcher) Reconcile(ctx context.Context, req controller_runtime.Req base := obj.DeepCopy() obj.SetAnnotations(anns) _ = r.Patch(ctx, obj, client.MergeFrom(base)) - - // Record the event if r.Recorder != nil { - r.Recorder.Event(obj, "Normal", "LeaseAnnotationsCleaned", "Removed lease-related annotations as TTL is missing") + r.Recorder.Event(obj, "Normal", "LeaseAnnotationsCleaned", "Removed lease annotations because TTL is missing") } } return controller_runtime.Result{}, nil } - // Lease logic - var start time.Time now := time.Now().UTC() - // Only set AnnExtendedAt if TTL is added after creation (i.e., object is not new and AnnExpireAt is missing) - ct := obj.GetCreationTimestamp() - isNew := ct.IsZero() || ct.UTC().Add(10*time.Second).After(now) // treat as new if just created (10s window) - if anns[AnnTTL] != "" && anns[AnnExpireAt] == "" && anns[AnnExtendedAt] == "" { - if !isNew { - log.Info("Lease TTL added after creation, setting extended-at", "name", obj.GetName()) - anns[AnnExtendedAt] = now.Format(time.RFC3339) - r.updateAnnotations(ctx, obj, map[string]string{ - AnnExtendedAt: anns[AnnExtendedAt], - }) - start = now - // Record the event - if r.Recorder != nil { - r.Recorder.Event(obj, "Normal", "LeaseAdded", "Lease has been added to an existing object.") - } + // Ensure lease-start exists. If missing or invalid, set to now. + start := now + if v, ok := anns[AnnLeaseStart]; ok && v != "" { + if t, err := time.Parse(time.RFC3339, v); err == nil { + start = t.UTC() } else { - // Record the event + // reset invalid value + anns[AnnLeaseStart] = now.Format(time.RFC3339) + r.updateAnnotations(ctx, obj, map[string]string{AnnLeaseStart: anns[AnnLeaseStart]}) if r.Recorder != nil { - r.Recorder.Event(obj, "Normal", "LeaseAdded", "Lease was added when the object was created.") + r.Recorder.Event(obj, "Warning", "LeaseStartReset", "Invalid lease-start, reset to now") } } - } else if ext, ok := anns[AnnExtendedAt]; ok && ext != "" { - t, err := time.Parse(time.RFC3339, ext) - if err == nil { - start = t.UTC() - } - - // Record the event + } else { + anns[AnnLeaseStart] = now.Format(time.RFC3339) + r.updateAnnotations(ctx, obj, map[string]string{AnnLeaseStart: anns[AnnLeaseStart]}) if r.Recorder != nil { - r.Recorder.Event(obj, "Normal", "LeaseExtended", "Lease was extended.") - } - } - if start.IsZero() { - if ct.IsZero() { - start = now - } else { - start = ct.UTC() + r.Recorder.Event(obj, "Normal", "LeaseStarted", "Lease started") } + start = now } ttl, err := util.ParseFlexibleDuration(anns[AnnTTL]) if err != nil { message := fmt.Sprintf("Invalid TTL: %v", err) - r.updateAnnotations(ctx, obj, map[string]string{ - AnnStatus: message, - }) - - // Record the event + r.updateAnnotations(ctx, obj, map[string]string{AnnStatus: message}) if r.Recorder != nil { r.Recorder.Event(obj, "Warning", "InvalidTTL", message) } return controller_runtime.Result{}, nil } + expireAt := start.Add(ttl) if now.After(expireAt) { @@ -177,21 +180,15 @@ func (r *LeaseWatcher) Reconcile(ctx context.Context, req controller_runtime.Req AnnExpireAt: expireAt.Format(time.RFC3339), AnnStatus: leaseStatus, }) - log.Info("Deleting object due to expired lease", "name", obj.GetName()) _ = r.Delete(ctx, obj) - - // Record the event if r.Recorder != nil { r.Recorder.Event(obj, "Normal", "LeaseExpired", leaseStatus) } - - // Return empty result to stop further processing return controller_runtime.Result{}, nil } leaseStatus := fmt.Sprintf("Lease active. Expires at %s UTC.", expireAt.Format(time.RFC3339)) - r.updateAnnotations(ctx, obj, map[string]string{ AnnExpireAt: expireAt.Format(time.RFC3339), AnnStatus: leaseStatus, @@ -215,6 +212,14 @@ func (r *LeaseWatcher) updateAnnotations(ctx context.Context, obj *unstructured. func (r *LeaseWatcher) SetupWithManager(mgr manager.Manager) error { setupLog.Info("Setting up LeaseWatcher", "GVK", r.GVK) + + // Set up tracker event listener + if r.Tracker != nil { + r.eventChan = make(chan util.NamespaceChangeEvent, 10) + r.Tracker.RegisterListener(r.eventChan) + go r.handleNamespaceEvents(mgr) + } + return controller_runtime.NewControllerManagedBy(mgr). For(&unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": fmt.Sprintf("%s/%s", r.GVK.Group, r.GVK.Version), @@ -222,3 +227,38 @@ func (r *LeaseWatcher) SetupWithManager(mgr manager.Manager) error { }}, builder.WithPredicates(OnlyWithTTLAnnotation)). Complete(r) } + +// handleNamespaceEvents listens for tracker events and triggers reconciliation for new namespaces +func (r *LeaseWatcher) handleNamespaceEvents(mgr clientProvider) { + for evt := range r.eventChan { + if evt.Change == util.NamespaceAdded { + k8sClient := mgr.GetClient() + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(schema.GroupVersionKind{ + Group: r.GVK.Group, + Version: r.GVK.Version, + Kind: r.GVK.Kind, + }) + opts := &client.ListOptions{Namespace: evt.Namespace} + err := k8sClient.List(context.Background(), list, opts) + if err == nil { + for _, obj := range list.Items { + anns := obj.GetAnnotations() + if anns != nil { + if _, has := anns[AnnTTL]; has { + req := controller_runtime.Request{ + NamespacedName: client.ObjectKeyFromObject(&obj), + } + go func(req controller_runtime.Request) { + ctx := context.Background() + if _, err := r.Reconcile(ctx, req); err != nil { + logger.FromContext(ctx).Error(err, "Reconcile failed", "object", req.NamespacedName) + } + }(req) + } + } + } + } + } + } +} diff --git a/pkg/controllers/lease_controller_test.go b/pkg/controllers/lease_controller_test.go new file mode 100644 index 0000000..d603de1 --- /dev/null +++ b/pkg/controllers/lease_controller_test.go @@ -0,0 +1,740 @@ +package controllers + +import ( + "context" + "object-lease-controller/pkg/util" + "reflect" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + controller_runtime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// helpers + +type stubMgr struct{ c client.Client } + +func (s stubMgr) GetClient() client.Client { return s.c } +func makeObj(anns map[string]string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetAnnotations(anns) + return u +} + +func setMeta(u *unstructured.Unstructured, gvk schema.GroupVersionKind, ns, name string) { + u.SetGroupVersionKind(gvk) + u.SetNamespace(ns) + u.SetName(name) +} + +func newWatcher(t *testing.T, gvk schema.GroupVersionKind, objs ...client.Object) (*LeaseWatcher, client.Client, *runtime.Scheme) { + t.Helper() + scheme := runtime.NewScheme() + // Register object and list types for fake client Get/List + scheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }, &unstructured.UnstructuredList{}) + + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + return &LeaseWatcher{Client: cl, GVK: gvk}, cl, scheme +} + +func waitUntil(t *testing.T, timeout time.Duration, cond func() bool) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("condition not met within %v", timeout) +} + +func get(t *testing.T, cl client.Client, gvk schema.GroupVersionKind, ns, name string) *unstructured.Unstructured { + t.Helper() + out := &unstructured.Unstructured{} + out.SetGroupVersionKind(gvk) + if err := cl.Get(context.Background(), types.NamespacedName{Namespace: ns, Name: name}, out); err != nil { + t.Fatalf("get error: %v", err) + } + return out +} + +// existing predicate tests retained + +func TestLeaseRelevantAnns(t *testing.T) { + u := makeObj(map[string]string{ + AnnTTL: "1h", + "other": "ignore", + }) + got := leaseRelevantAnns(u) + want := map[string]string{ + AnnTTL: "1h", + } + if !reflect.DeepEqual(got, want) { + t.Errorf("leaseRelevantAnns = %v, want %v", got, want) + } + + u2 := makeObj(map[string]string{ + AnnTTL: "30m", + AnnLeaseStart: "2025-01-01T00:00:00Z", + "x": "y", + }) + got2 := leaseRelevantAnns(u2) + want2 := map[string]string{ + AnnTTL: "30m", + AnnLeaseStart: "2025-01-01T00:00:00Z", + } + if !reflect.DeepEqual(got2, want2) { + t.Errorf("leaseRelevantAnns = %v, want %v", got2, want2) + } + + u3 := makeObj(map[string]string{"foo": "bar"}) + got3 := leaseRelevantAnns(u3) + if len(got3) != 0 { + t.Errorf("leaseRelevantAnns(no anns) = %v, want empty", got3) + } +} + +func TestOnlyWithTTLAnnotation_Create(t *testing.T) { + tests := []struct { + name string + anns map[string]string + want bool + }{ + {"has TTL", map[string]string{AnnTTL: "5m"}, true}, + {"has lease-start but no TTL", map[string]string{AnnLeaseStart: "2025-01-01T00:00:00Z"}, false}, + {"no TTL", map[string]string{"foo": "bar"}, false}, + } + + for _, tt := range tests { + u := makeObj(tt.anns) + ev := event.CreateEvent{Object: u} + if got := OnlyWithTTLAnnotation.CreateFunc(ev); got != tt.want { + t.Errorf("CreateFunc(%q) = %v, want %v", tt.name, got, tt.want) + } + } +} + +func TestOnlyWithTTLAnnotation_Update(t *testing.T) { + baseOld := makeObj(map[string]string{AnnTTL: "1h", AnnLeaseStart: "2025-01-01T00:00:00Z"}) + baseNewSame := makeObj(map[string]string{AnnTTL: "1h", AnnLeaseStart: "2025-01-01T00:00:00Z", "other": "x"}) + changedTTL := makeObj(map[string]string{AnnTTL: "2h", AnnLeaseStart: "2025-01-01T00:00:00Z"}) + changedLeaseStart := makeObj(map[string]string{AnnTTL: "1h", AnnLeaseStart: "2025-01-01T01:00:00Z"}) + leaseStartAdded := makeObj(map[string]string{AnnTTL: "1h", AnnLeaseStart: "2025-01-01T00:00:00Z"}) + leaseStartRemoved := makeObj(map[string]string{AnnTTL: "1h"}) + noAnns := makeObj(nil) + + tests := []struct { + name string + oldObj *unstructured.Unstructured + newObj *unstructured.Unstructured + want bool + }{ + {"TTL changed", baseOld, changedTTL, true}, + {"LeaseStart changed", baseOld, changedLeaseStart, true}, + {"LeaseStart added", leaseStartRemoved, leaseStartAdded, true}, + {"LeaseStart removed", baseOld, leaseStartRemoved, true}, + {"Untracked annotation changed", baseOld, baseNewSame, false}, + {"TTL removed", baseOld, noAnns, true}, + {"Neither has TTL", noAnns, noAnns, false}, + } + + for _, tt := range tests { + ev := event.UpdateEvent{ObjectOld: tt.oldObj, ObjectNew: tt.newObj} + if got := OnlyWithTTLAnnotation.UpdateFunc(ev); got != tt.want { + t.Errorf("UpdateFunc(%s) = %v, want %v", tt.name, got, tt.want) + } + } + + // wrong types + evBad := event.UpdateEvent{ + ObjectOld: &corev1.Pod{}, + ObjectNew: &corev1.Pod{}, + } + if OnlyWithTTLAnnotation.UpdateFunc(evBad) { + t.Errorf("UpdateFunc(wrong types) = true, want false") + } + +} + +func TestOnlyWithTTLAnnotation_Delete_Generic(t *testing.T) { + if OnlyWithTTLAnnotation.DeleteFunc(event.DeleteEvent{}) { + t.Error("DeleteFunc always false") + } + if OnlyWithTTLAnnotation.GenericFunc(event.GenericEvent{}) { + t.Error("GenericFunc always false") + } +} + +// TTL change updates expire-at +func TestReconcile_UpdatesExpireAtWhenTTLChanges(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + start := time.Now().UTC().Add(2 * time.Hour).Truncate(time.Second) + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm1") + obj.SetAnnotations(map[string]string{ + AnnTTL: "1h", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm1"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm1") + exp1 := start.Add(1 * time.Hour).Format(time.RFC3339) + if got.GetAnnotations()[AnnExpireAt] != exp1 { + t.Fatalf("expire-at after TTL=1h = %q, want %q", got.GetAnnotations()[AnnExpireAt], exp1) + } + + // change TTL to 2h + anns := got.GetAnnotations() + anns[AnnTTL] = "2h" + got.SetAnnotations(anns) + if err := cl.Update(ctx, got); err != nil { + t.Fatalf("update error: %v", err) + } + + _, err = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm1"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got2 := get(t, cl, gvk, "default", "cm1") + exp2 := start.Add(2 * time.Hour).Format(time.RFC3339) + if got2.GetAnnotations()[AnnExpireAt] != exp2 { + t.Fatalf("expire-at after TTL=2h = %q, want %q", got2.GetAnnotations()[AnnExpireAt], exp2) + } +} + +// Auto-start when lease-start missing +func TestReconcile_AutoStartSetsLeaseStartAndExpire(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm2") + obj.SetAnnotations(map[string]string{ + AnnTTL: "5m", + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + before := time.Now().UTC() + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm2"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm2") + + ls, err := time.Parse(time.RFC3339, got.GetAnnotations()[AnnLeaseStart]) + if err != nil { + t.Fatalf("lease-start parse error: %v", err) + } + after := time.Now().UTC() + if ls.Before(before.Add(-2*time.Second)) || ls.After(after.Add(2*time.Second)) { + t.Fatalf("lease-start not within now window: %v vs [%v,%v]", ls, before, after) + } + exp := ls.Add(5 * time.Minute).Format(time.RFC3339) + if got.GetAnnotations()[AnnExpireAt] != exp { + t.Fatalf("expire-at = %q, want %q", got.GetAnnotations()[AnnExpireAt], exp) + } +} + +// Invalid lease-start resets and updates expire +func TestReconcile_InvalidLeaseStartResetsAndUpdatesExpire(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm3") + obj.SetAnnotations(map[string]string{ + AnnTTL: "10m", + AnnLeaseStart: "not-a-time", + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm3"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm3") + ls, err := time.Parse(time.RFC3339, got.GetAnnotations()[AnnLeaseStart]) + if err != nil { + t.Fatalf("lease-start not reset to RFC3339: %v", err) + } + exp := ls.Add(10 * time.Minute).Format(time.RFC3339) + if got.GetAnnotations()[AnnExpireAt] != exp { + t.Fatalf("expire-at = %q, want %q", got.GetAnnotations()[AnnExpireAt], exp) + } +} + +// Invalid TTL writes status, no expire, no delete +func TestReconcile_InvalidTTL_StatusOnly_NoExpire_NoDelete(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm4") + obj.SetAnnotations(map[string]string{ + AnnTTL: "totally-wrong", + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm4"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm4") + anns := got.GetAnnotations() + if anns[AnnExpireAt] != "" { + t.Fatalf("expire-at should be empty, got %q", anns[AnnExpireAt]) + } + if !strings.Contains(anns[AnnStatus], "Invalid TTL") { + t.Fatalf("lease-status should mention Invalid TTL, got %q", anns[AnnStatus]) + } +} + +// TTL removal clears lease annotations +func TestReconcile_RemoveTTL_CleansLeaseAnnotations(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm5") + obj.SetAnnotations(map[string]string{ + AnnTTL: "1h", + AnnLeaseStart: time.Now().UTC().Format(time.RFC3339), + AnnExpireAt: time.Now().UTC().Add(1 * time.Hour).Format(time.RFC3339), + AnnStatus: "ok", + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + // remove TTL + cur := get(t, cl, gvk, "default", "cm5") + anns := cur.GetAnnotations() + delete(anns, AnnTTL) + cur.SetAnnotations(anns) + if err := cl.Update(ctx, cur); err != nil { + t.Fatalf("update error: %v", err) + } + + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm5"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm5") + out := got.GetAnnotations() + if _, ok := out[AnnLeaseStart]; ok { + t.Fatalf("lease-start should be cleaned") + } + if _, ok := out[AnnExpireAt]; ok { + t.Fatalf("expire-at should be cleaned") + } + if _, ok := out[AnnStatus]; ok { + t.Fatalf("lease-status should be cleaned") + } +} + +// Extend by deleting lease-start +func TestReconcile_ExtendByDeletingLeaseStart(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + start := time.Now().UTC().Add(-1 * time.Minute).Truncate(time.Second) + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm6") + obj.SetAnnotations(map[string]string{ + AnnTTL: "10m", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, _ = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm6"}}) + got := get(t, cl, gvk, "default", "cm6") + oldExp, _ := time.Parse(time.RFC3339, got.GetAnnotations()[AnnExpireAt]) + + anns := got.GetAnnotations() + delete(anns, AnnLeaseStart) + got.SetAnnotations(anns) + if err := cl.Update(ctx, got); err != nil { + t.Fatalf("update error: %v", err) + } + + _, _ = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm6"}}) + got2 := get(t, cl, gvk, "default", "cm6") + newExp, _ := time.Parse(time.RFC3339, got2.GetAnnotations()[AnnExpireAt]) + if !newExp.After(oldExp) { + t.Fatalf("new expire-at %v should be after old %v", newExp, oldExp) + } +} + +// Expired deletes the object +func TestReconcile_ExpiredDeletesObject(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm7") + obj.SetAnnotations(map[string]string{ + AnnTTL: "1s", + AnnLeaseStart: time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, _ = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm7"}}) + + out := &unstructured.Unstructured{} + out.SetGroupVersionKind(gvk) + err := cl.Get(ctx, types.NamespacedName{Namespace: "default", Name: "cm7"}, out) + if err == nil || !apierrors.IsNotFound(err) { + t.Fatalf("expected NotFound after delete, got %v", err) + } +} + +// Idempotency: no change on repeated reconcile +func TestReconcile_Idempotent_NoChange(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + start := time.Now().UTC().Add(30 * time.Minute).Truncate(time.Second) + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm8") + obj.SetAnnotations(map[string]string{ + AnnTTL: "1h", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + _, _ = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm8"}}) + a1 := get(t, cl, gvk, "default", "cm8").GetAnnotations() + + time.Sleep(10 * time.Millisecond) // small delay + _, _ = r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm8"}}) + a2 := get(t, cl, gvk, "default", "cm8").GetAnnotations() + + if !reflect.DeepEqual(a1, a2) { + t.Fatalf("annotations changed between reconciles: %v vs %v", a1, a2) + } +} + +// RequeueAfter ≈ expire-at - now +func TestReconcile_RequeueAfterApproxTTL(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + start := time.Now().UTC().Add(10 * time.Second).Truncate(time.Second) + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "default", "cm9") + obj.SetAnnotations(map[string]string{ + AnnTTL: "1h", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) + + before := time.Now().UTC() + res, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "cm9"}}) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + got := get(t, cl, gvk, "default", "cm9") + exp, _ := time.Parse(time.RFC3339, got.GetAnnotations()[AnnExpireAt]) + + // expected ≈ exp - now + elapsed := time.Since(before) + expected := time.Until(exp) + + if res.RequeueAfter <= 0 { + t.Fatalf("RequeueAfter should be positive, got %v", res.RequeueAfter) + } + // allow small drift + diff := res.RequeueAfter - expected + if diff < -3*time.Second || diff > (elapsed+3*time.Second) { + t.Fatalf("RequeueAfter %v not close to expected %v (diff %v)", res.RequeueAfter, expected, diff) + } +} + +// NotFound returns no error +func TestReconcile_NotFound_NoError(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + r, _, _ := newWatcher(t, gvk /* no objects */) + + _, err := r.Reconcile(ctx, controller_runtime.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "does-not-exist"}}) + if err != nil { + t.Fatalf("expected nil error on not found, got %v", err) + } +} + +// Predicate guardrail: unrelated annotation change does not trigger +func TestPredicate_UnrelatedAnnotationChangeDoesNotTrigger(t *testing.T) { + oldObj := makeObj(map[string]string{AnnTTL: "1h"}) + newObj := makeObj(map[string]string{AnnTTL: "1h", "unrelated": "x"}) + ev := event.UpdateEvent{ObjectOld: oldObj, ObjectNew: newObj} + if OnlyWithTTLAnnotation.UpdateFunc(ev) { + t.Fatalf("unrelated annotation change should not trigger") + } +} + +// helper: like newWatcher but with a tracker +func newWatcherWithTracker(t *testing.T, gvk schema.GroupVersionKind, tr *util.NamespaceTracker, objs ...client.Object) (*LeaseWatcher, client.Client) { + t.Helper() + r, cl, _ := newWatcher(t, gvk, objs...) + r.Tracker = tr + return r, cl +} + +func TestReconcile_SkipsUntrackedNamespace(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + // object lives in ns-a + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "ns-a", "cm-ns-a") + start := time.Now().UTC().Add(30 * time.Minute).Truncate(time.Second) + obj.SetAnnotations(map[string]string{ + AnnTTL: "1h", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + // tracker only allows ns-b + tr := util.NewNamespaceTracker() + tr.AddNamespace("ns-b") + + r, cl := newWatcherWithTracker(t, gvk, tr, obj) + + // reconcile for ns-a should be skipped, so expire-at must remain empty + _, err := r.Reconcile(ctx, controller_runtime.Request{ + NamespacedName: types.NamespacedName{Namespace: "ns-a", Name: "cm-ns-a"}, + }) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + got := get(t, cl, gvk, "ns-a", "cm-ns-a") + if v := got.GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("expire-at set for untracked namespace: %q", v) + } +} + +func TestReconcile_ProcessesTrackedNamespace(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + // object lives in ns-b + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "ns-b", "cm-ns-b") + start := time.Now().UTC().Add(30 * time.Minute).Truncate(time.Second) + obj.SetAnnotations(map[string]string{ + AnnTTL: "45m", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + tr := util.NewNamespaceTracker() + tr.AddNamespace("ns-b") + + r, cl := newWatcherWithTracker(t, gvk, tr, obj) + + _, err := r.Reconcile(ctx, controller_runtime.Request{ + NamespacedName: types.NamespacedName{Namespace: "ns-b", Name: "cm-ns-b"}, + }) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + got := get(t, cl, gvk, "ns-b", "cm-ns-b") + wantExp := start.Add(45 * time.Minute).Format(time.RFC3339) + if got.GetAnnotations()[AnnExpireAt] != wantExp { + t.Fatalf("expire-at = %q, want %q", got.GetAnnotations()[AnnExpireAt], wantExp) + } +} + +func TestReconcile_NoTrackerProcessesAllNamespaces(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + // object in any namespace should be processed if tracker is nil + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "random-ns", "cm-any") + start := time.Now().UTC().Add(15 * time.Minute).Truncate(time.Second) + obj.SetAnnotations(map[string]string{ + AnnTTL: "30m", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + r, cl, _ := newWatcher(t, gvk, obj) // Tracker nil + + _, err := r.Reconcile(ctx, controller_runtime.Request{ + NamespacedName: types.NamespacedName{Namespace: "random-ns", Name: "cm-any"}, + }) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + got := get(t, cl, gvk, "random-ns", "cm-any") + wantExp := start.Add(30 * time.Minute).Format(time.RFC3339) + if got.GetAnnotations()[AnnExpireAt] != wantExp { + t.Fatalf("expire-at = %q, want %q", got.GetAnnotations()[AnnExpireAt], wantExp) + } +} + +func TestReconcile_NamespaceBecomesTrackedLater(t *testing.T) { + ctx := context.Background() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "late-ns", "cm-late") + start := time.Now().UTC().Add(20 * time.Minute).Truncate(time.Second) + obj.SetAnnotations(map[string]string{ + AnnTTL: "40m", + AnnLeaseStart: start.Format(time.RFC3339), + }) + + tr := util.NewNamespaceTracker() // initially empty + r, cl := newWatcherWithTracker(t, gvk, tr, obj) + + // first reconcile should skip + _, err := r.Reconcile(ctx, controller_runtime.Request{ + NamespacedName: types.NamespacedName{Namespace: "late-ns", Name: "cm-late"}, + }) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + if v := get(t, cl, gvk, "late-ns", "cm-late").GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("expire-at should be empty before namespace is tracked, got %q", v) + } + + // now track the namespace and reconcile again + tr.AddNamespace("late-ns") + _, err = r.Reconcile(ctx, controller_runtime.Request{ + NamespacedName: types.NamespacedName{Namespace: "late-ns", Name: "cm-late"}, + }) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + wantExp := start.Add(40 * time.Minute).Format(time.RFC3339) + if v := get(t, cl, gvk, "late-ns", "cm-late").GetAnnotations()[AnnExpireAt]; v != wantExp { + t.Fatalf("expire-at = %q, want %q", v, wantExp) + } +} + +func TestHandleNamespaceEvents_ProcessesAddedNamespace(t *testing.T) { + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + withTTL := &unstructured.Unstructured{} + setMeta(withTTL, gvk, "ns-a", "cm-ttl") + withTTL.SetAnnotations(map[string]string{AnnTTL: "30m"}) + + withoutTTL := &unstructured.Unstructured{} + setMeta(withoutTTL, gvk, "ns-a", "cm-no-ttl") + + nsB := &unstructured.Unstructured{} + setMeta(nsB, gvk, "ns-b", "cm-b") + nsB.SetAnnotations(map[string]string{AnnTTL: "15m"}) + + r, cl, _ := newWatcher(t, gvk, withTTL, withoutTTL, nsB) + + r.eventChan = make(chan util.NamespaceChangeEvent, 1) + go r.handleNamespaceEvents(stubMgr{c: cl}) + + r.eventChan <- util.NamespaceChangeEvent{Namespace: "ns-a", Change: util.NamespaceAdded} + close(r.eventChan) + + waitUntil(t, 2*time.Second, func() bool { + obj := get(t, cl, gvk, "ns-a", "cm-ttl") + ls := obj.GetAnnotations()[AnnLeaseStart] + ea := obj.GetAnnotations()[AnnExpireAt] + if ls == "" || ea == "" { + return false + } + start, err1 := time.Parse(time.RFC3339, ls) + exp, err2 := time.Parse(time.RFC3339, ea) + return err1 == nil && err2 == nil && exp.Equal(start.Add(30*time.Minute)) + }) + + if v := get(t, cl, gvk, "ns-a", "cm-no-ttl").GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("unexpected expire-at for object without TTL: %q", v) + } + if v := get(t, cl, gvk, "ns-b", "cm-b").GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("ns-b object should not be processed by ns-a event, got %q", v) + } +} + +func TestHandleNamespaceEvents_IgnoresNonAddedEvents(t *testing.T) { + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + obj := &unstructured.Unstructured{} + setMeta(obj, gvk, "ns-x", "cm-x") + obj.SetAnnotations(map[string]string{AnnTTL: "10m"}) + + r, cl, _ := newWatcher(t, gvk, obj) + + r.eventChan = make(chan util.NamespaceChangeEvent, 1) + go r.handleNamespaceEvents(stubMgr{c: cl}) + + r.eventChan <- util.NamespaceChangeEvent{Namespace: "ns-x", Change: util.NamespaceRemoved} + close(r.eventChan) + + time.Sleep(50 * time.Millisecond) + if v := get(t, cl, gvk, "ns-x", "cm-x").GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("non-Added event should not process objects, got expire-at=%q", v) + } +} + +func TestHandleNamespaceEvents_ProcessesMultipleObjectsInAddedNamespace(t *testing.T) { + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + a1 := &unstructured.Unstructured{} + setMeta(a1, gvk, "ns-y", "a1") + a1.SetAnnotations(map[string]string{AnnTTL: "5m"}) + + a2 := &unstructured.Unstructured{} + setMeta(a2, gvk, "ns-y", "a2") + a2.SetAnnotations(map[string]string{AnnTTL: "1h"}) + + a3 := &unstructured.Unstructured{} // no TTL, should be ignored + setMeta(a3, gvk, "ns-y", "a3") + a3.SetAnnotations(map[string]string{}) + + r, cl, _ := newWatcher(t, gvk, a1, a2, a3) + r.eventChan = make(chan util.NamespaceChangeEvent, 1) + go r.handleNamespaceEvents(stubMgr{c: cl}) + + r.eventChan <- util.NamespaceChangeEvent{Namespace: "ns-y", Change: util.NamespaceAdded} + close(r.eventChan) + + waitUntil(t, 2*time.Second, func() bool { + g1 := get(t, cl, gvk, "ns-y", "a1").GetAnnotations()[AnnExpireAt] + g2 := get(t, cl, gvk, "ns-y", "a2").GetAnnotations()[AnnExpireAt] + return g1 != "" && g2 != "" + }) + if v := get(t, cl, gvk, "ns-y", "a3").GetAnnotations()[AnnExpireAt]; v != "" { + t.Fatalf("object without TTL should be ignored, got expire-at=%q", v) + } +} diff --git a/pkg/controllers/namespace_controller.go b/pkg/controllers/namespace_controller.go new file mode 100644 index 0000000..0476250 --- /dev/null +++ b/pkg/controllers/namespace_controller.go @@ -0,0 +1,62 @@ +package controllers + +import ( + "context" + + "object-lease-controller/pkg/util" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + logger "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type NamespaceReconciler struct { + client.Client + Log logr.Logger + LabelKey string + LabelValue string + Tracker *util.NamespaceTracker + Recorder record.EventRecorder +} + +func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return builder.ControllerManagedBy(mgr). + For(&corev1.Namespace{}). + WithEventFilter(predicate.Or( + predicate.LabelChangedPredicate{}, + predicate.GenerationChangedPredicate{}, + )). + Complete(r) +} + +func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var ns corev1.Namespace + log := logger.FromContext(ctx).WithValues("namespace", req.Name) + + if err := r.Get(ctx, req.NamespacedName, &ns); err != nil { + if apierrors.IsNotFound(err) { + log.V(2).Info("Namespace not found, not tracking", "namespace", req.Name) + r.Tracker.RemoveNamespace(req.Name) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + if ns.Labels[r.LabelKey] == r.LabelValue { + log.Info("Namespace label matches, tracking") + r.Tracker.AddNamespace(req.Name) + } else { + if _, exists := ns.Labels[r.LabelKey]; exists { + log.V(2).Info("Namespace label exists but does not match, not tracking", "labelKey", r.LabelKey, "labelValue", r.LabelValue) + } else { + log.V(2).Info("Namespace label does not exist, not tracking", "labelKey", r.LabelKey) + } + r.Tracker.RemoveNamespace(req.Name) + } + return ctrl.Result{}, nil +} diff --git a/pkg/controllers/namespace_controller_test.go b/pkg/controllers/namespace_controller_test.go new file mode 100644 index 0000000..6bb0ebf --- /dev/null +++ b/pkg/controllers/namespace_controller_test.go @@ -0,0 +1,210 @@ +// pkg/controllers/namespace_controller_test.go +package controllers + +import ( + "context" + "errors" + "testing" + + "object-lease-controller/pkg/util" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := corev1.AddToScheme(s); err != nil { + t.Fatal(err) + } + return s +} + +func newReq(name string) ctrl.Request { + return ctrl.Request{NamespacedName: types.NamespacedName{Name: name}} +} + +// erroringClient wraps a client and forces Get to return an error +type erroringClient struct { + client crclient.Client + err error +} + +func (e *erroringClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { + return e.client.GroupVersionKindFor(obj) +} +func (e *erroringClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { + return e.client.IsObjectNamespaced(obj) +} +func (e *erroringClient) Get(ctx context.Context, key crclient.ObjectKey, obj crclient.Object, opts ...crclient.GetOption) error { + return e.err +} +func (e *erroringClient) Create(ctx context.Context, obj crclient.Object, opts ...crclient.CreateOption) error { + return e.client.Create(ctx, obj, opts...) +} +func (e *erroringClient) Delete(ctx context.Context, obj crclient.Object, opts ...crclient.DeleteOption) error { + return e.client.Delete(ctx, obj, opts...) +} +func (e *erroringClient) Update(ctx context.Context, obj crclient.Object, opts ...crclient.UpdateOption) error { + return e.client.Update(ctx, obj, opts...) +} +func (e *erroringClient) Patch(ctx context.Context, obj crclient.Object, patch crclient.Patch, opts ...crclient.PatchOption) error { + return e.client.Patch(ctx, obj, patch, opts...) +} +func (e *erroringClient) DeleteAllOf(ctx context.Context, obj crclient.Object, opts ...crclient.DeleteAllOfOption) error { + return e.client.DeleteAllOf(ctx, obj, opts...) +} +func (e *erroringClient) List(ctx context.Context, list crclient.ObjectList, opts ...crclient.ListOption) error { + return e.client.List(ctx, list, opts...) +} +func (e *erroringClient) Status() crclient.SubResourceWriter { return e.client.Status() } +func (e *erroringClient) SubResource(s string) crclient.SubResourceClient { + return e.client.SubResource(s) +} +func (e *erroringClient) Scheme() *runtime.Scheme { return e.client.Scheme() } +func (e *erroringClient) RESTMapper() meta.RESTMapper { return e.client.RESTMapper() } + +func TestReconcile_AddsNamespaceOnMatchingLabel(t *testing.T) { + scheme := newScheme(t) + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "optin", + Labels: map[string]string{"watch/enabled": "true"}, + }, + } + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(ns).Build() + + tracker := util.NewNamespaceTracker() + + r := &NamespaceReconciler{ + Client: cl, + LabelKey: "watch/enabled", + LabelValue: "true", + Tracker: tracker, + } + + _, err := r.Reconcile(context.Background(), newReq("optin")) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + if !tracker.TrackingNamespace("optin") { + t.Fatalf("expected tracker to contain namespace optin") + } +} + +func TestReconcile_RemovesNamespaceOnMismatchedLabel(t *testing.T) { + scheme := newScheme(t) + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mismatch", + Labels: map[string]string{"watch/enabled": "false"}, + }, + } + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(ns).Build() + + tracker := util.NewNamespaceTracker() + tracker.AddNamespace("mismatch") + + r := &NamespaceReconciler{ + Client: cl, + LabelKey: "watch/enabled", + LabelValue: "true", + Tracker: tracker, + } + + _, err := r.Reconcile(context.Background(), newReq("mismatch")) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + if tracker.TrackingNamespace("mismatch") { + t.Fatalf("expected tracker to not contain namespace mismatch") + } +} + +func TestReconcile_RemoveWhenLabelMissing(t *testing.T) { + scheme := newScheme(t) + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nolabel", + }, + } + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(ns).Build() + + tracker := util.NewNamespaceTracker() + tracker.AddNamespace("nolabel") // ensure it stays tracked + + r := &NamespaceReconciler{ + Client: cl, + LabelKey: "watch/enabled", + LabelValue: "true", + Tracker: tracker, + } + + _, err := r.Reconcile(context.Background(), newReq("nolabel")) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + if tracker.TrackingNamespace("nolabel") { + t.Fatalf("expected tracker to not contain namespace nolabel") + } +} + +func TestReconcile_NotFoundRemovesTracking(t *testing.T) { + scheme := newScheme(t) + cl := fake.NewClientBuilder().WithScheme(scheme).Build() // no objects + + tracker := util.NewNamespaceTracker() + tracker.AddNamespace("ghost") + + r := &NamespaceReconciler{ + Client: cl, + LabelKey: "watch/enabled", + LabelValue: "true", + Tracker: tracker, + } + + _, err := r.Reconcile(context.Background(), newReq("ghost")) + if err != nil { + t.Fatalf("reconcile error: %v", err) + } + + if tracker.TrackingNamespace("ghost") { + t.Fatalf("expected tracker to remove namespace ghost after NotFound") + } +} + +func TestReconcile_ClientErrorBubblesUp(t *testing.T) { + scheme := newScheme(t) + base := fake.NewClientBuilder().WithScheme(scheme).Build() + cl := &erroringClient{client: base, err: errors.New("boom")} + + tracker := util.NewNamespaceTracker() + tracker.AddNamespace("ns") + + r := &NamespaceReconciler{ + Client: cl, + LabelKey: "watch/enabled", + LabelValue: "true", + Tracker: tracker, + } + + _, err := r.Reconcile(context.Background(), newReq("ns")) + if err == nil { + t.Fatalf("expected error, got nil") + } + + if !tracker.TrackingNamespace("ns") { + t.Fatalf("expected tracker to still contain namespace ns on error") + } +} diff --git a/pkg/leasewatcher/leasewatcher_test.go b/pkg/leasewatcher/leasewatcher_test.go deleted file mode 100644 index 24f0e1a..0000000 --- a/pkg/leasewatcher/leasewatcher_test.go +++ /dev/null @@ -1,103 +0,0 @@ -package leasewatcher - -import ( - "reflect" - "testing" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "sigs.k8s.io/controller-runtime/pkg/event" -) - -func makeObj(anns map[string]string) *unstructured.Unstructured { - u := &unstructured.Unstructured{} - u.SetAnnotations(anns) - return u -} - -func TestLeaseRelevantAnns(t *testing.T) { - u := makeObj(map[string]string{ - AnnTTL: "1h", - AnnExtendedAt: "foo", - "other": "ignore", - }) - got := leaseRelevantAnns(u) - want := map[string]string{ - AnnTTL: "1h", - AnnExtendedAt: "foo", - } - if !reflect.DeepEqual(got, want) { - t.Errorf("leaseRelevantAnns = %v, want %v", got, want) - } - - u2 := makeObj(map[string]string{"foo": "bar"}) - got2 := leaseRelevantAnns(u2) - if len(got2) != 0 { - t.Errorf("leaseRelevantAnns(no anns) = %v, want empty", got2) - } -} - -func TestOnlyWithTTLAnnotation_Create(t *testing.T) { - tests := []struct { - name string - anns map[string]string - want bool - }{ - {"has TTL", map[string]string{AnnTTL: "5m"}, true}, - {"no TTL", map[string]string{"foo": "bar"}, false}, - } - - for _, tt := range tests { - u := makeObj(tt.anns) - ev := event.CreateEvent{Object: u} - if got := OnlyWithTTLAnnotation.CreateFunc(ev); got != tt.want { - t.Errorf("CreateFunc(%q) = %v, want %v", tt.name, got, tt.want) - } - } -} - -func TestOnlyWithTTLAnnotation_Update(t *testing.T) { - baseOld := makeObj(map[string]string{AnnTTL: "1h"}) - baseNewSame := makeObj(map[string]string{AnnTTL: "1h", "other": "x"}) - changedTTL := makeObj(map[string]string{AnnTTL: "2h"}) - addExtended := makeObj(map[string]string{AnnTTL: "1h", AnnExtendedAt: "now"}) - noAnns := makeObj(nil) - - tests := []struct { - name string - oldObj *unstructured.Unstructured - newObj *unstructured.Unstructured - want bool - }{ - {"TTL changed", baseOld, changedTTL, true}, - {"ExtendedAt added", baseOld, addExtended, true}, - {"Untracked annotation changed", baseOld, baseNewSame, false}, - {"TTL removed", baseOld, noAnns, true}, - {"Neither has TTL", noAnns, noAnns, false}, - } - - for _, tt := range tests { - ev := event.UpdateEvent{ObjectOld: tt.oldObj, ObjectNew: tt.newObj} - if got := OnlyWithTTLAnnotation.UpdateFunc(ev); got != tt.want { - t.Errorf("UpdateFunc(%s) = %v, want %v", tt.name, got, tt.want) - } - } - - // wrong-type case - evBad := event.UpdateEvent{ - ObjectOld: &corev1.Pod{}, - ObjectNew: &corev1.Pod{}, - } - if OnlyWithTTLAnnotation.UpdateFunc(evBad) { - t.Errorf("UpdateFunc(wrong types) = true, want false") - } -} - -func TestOnlyWithTTLAnnotation_Delete_Generic(t *testing.T) { - if OnlyWithTTLAnnotation.DeleteFunc(event.DeleteEvent{}) { - t.Error("DeleteFunc always false") - } - if OnlyWithTTLAnnotation.GenericFunc(event.GenericEvent{}) { - t.Error("GenericFunc always false") - } -} diff --git a/pkg/util/tracker.go b/pkg/util/tracker.go new file mode 100644 index 0000000..71989df --- /dev/null +++ b/pkg/util/tracker.go @@ -0,0 +1,90 @@ +package util + +import ( + "log" + "sync" +) + +// NamespaceChangeType represents the type of change in the tracker +type NamespaceChangeType int + +const ( + NamespaceAdded NamespaceChangeType = iota + NamespaceRemoved +) + +// NamespaceChangeEvent represents a change event for namespaces +type NamespaceChangeEvent struct { + Namespace string + Change NamespaceChangeType +} + +// NamespaceTracker tracks namespaces and notifies listeners on changes +type NamespaceTracker struct { + mu sync.RWMutex + namespaces map[string]struct{} + listeners []chan NamespaceChangeEvent +} + +func NewNamespaceTracker() *NamespaceTracker { + return &NamespaceTracker{ + namespaces: make(map[string]struct{}), + listeners: make([]chan NamespaceChangeEvent, 0), + } +} + +// AddNamespace adds a namespace and notifies listeners +func (t *NamespaceTracker) AddNamespace(ns string) { + t.mu.Lock() + defer t.mu.Unlock() + if _, exists := t.namespaces[ns]; !exists { + t.namespaces[ns] = struct{}{} + t.notifyListeners(NamespaceChangeEvent{Namespace: ns, Change: NamespaceAdded}) + } +} + +// RemoveNamespace removes a namespace and notifies listeners +func (t *NamespaceTracker) RemoveNamespace(ns string) { + t.mu.Lock() + defer t.mu.Unlock() + if _, exists := t.namespaces[ns]; exists { + delete(t.namespaces, ns) + t.notifyListeners(NamespaceChangeEvent{Namespace: ns, Change: NamespaceRemoved}) + } +} + +// ListNamespaces returns a slice of tracked namespaces +func (t *NamespaceTracker) ListNamespaces() []string { + t.mu.RLock() + defer t.mu.RUnlock() + ns := make([]string, 0, len(t.namespaces)) + for k := range t.namespaces { + ns = append(ns, k) + } + return ns +} + +// TrackingNamespace returns true if the namespace is being tracked +func (t *NamespaceTracker) TrackingNamespace(ns string) bool { + t.mu.RLock() + defer t.mu.RUnlock() + _, exists := t.namespaces[ns] + return exists +} + +// RegisterListener registers a channel to receive change events +func (t *NamespaceTracker) RegisterListener(ch chan NamespaceChangeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + t.listeners = append(t.listeners, ch) +} + +func (t *NamespaceTracker) notifyListeners(event NamespaceChangeEvent) { + for _, ch := range t.listeners { + select { + case ch <- event: + default: + log.Printf("NamespaceTracker: dropped event for namespace %q (change: %v) because listener channel is full", event.Namespace, event.Change) + } + } +}