Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Dockerfile.cross
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
coverage.txt
coverage.html

# Go workspace file
go.work
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ vet: ## Vet Go code
.PHONY: test
test: tidy fmt vet ## Run tests with coverage
go test ./... -race -coverprofile=coverage.out
go tool cover -html=coverage.out -o=coverage.html

.PHONY: build
build: tidy fmt vet test ## Build the binary
Expand Down
280 changes: 179 additions & 101 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,121 @@ const (
AnnStatus = "object-lease-controller.ullberg.io/lease-status"
)

// ParseParams holds runtime configuration parsed from flags and environment.
type ParseParams struct {
Group string
Version string
Kind string
OptInLabelKey string
OptInLabelValue string
MetricsBindAddress string
HealthProbeBindAddress string
PprofBindAddress string
LeaderElectionEnabled bool
LeaderElectionNamespace string
}

var (
setupLog = ctrl.Log.WithName("setup")
)

// Allow injection for testing
var statFn = os.Stat
var readFileFn = os.ReadFile

func main() {
ctrl.SetLogger(zap.New())

params := parseParameters()

enableLeaderElection, leaderElectionNamespace, errE := parseLeaderElectionConfig(params.LeaderElectionEnabled, params.LeaderElectionNamespace)
if errE != nil {
fmt.Printf("%v\n", errE)
os.Exit(1)
}

if params.Version == "" || params.Kind == "" {
fmt.Println("Usage: lease-controller -group=GROUP -version=VERSION -kind=KIND [--leader-elect] [--leader-elect-namespace=NAMESPACE]")
fmt.Println("Or set LEASE_GVK_GROUP, LEASE_GVK_VERSION, LEASE_GVK_KIND, LEASE_LEADER_ELECTION env vars")
os.Exit(1)
}

scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

gvk := schema.GroupVersionKind{
Group: params.Group,
Version: params.Version,
Kind: params.Kind,
}

// Use a unique leader election ID per GVK in lower case
leaderElectionID := strings.ToLower(fmt.Sprintf("object-lease-controller-%s-%s-%s", params.Group, params.Version, params.Kind))

mgrOpts := buildManagerOptions(scheme, params.Group, params.Version, params.Kind, params.MetricsBindAddress, params.HealthProbeBindAddress, params.PprofBindAddress, enableLeaderElection, leaderElectionNamespace)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts)
if err != nil {
setupLog.Error(err, "unable to start manager")
panic(err)
}

// Create a LeaseWatcher for the specified GVK
lw := newLeaseWatcher(mgr, gvk, leaderElectionID)

if tr, err := configureNamespaceReconciler(mgr, params.OptInLabelKey, params.OptInLabelValue, leaderElectionID); err != nil {
setupLog.Error(err, "unable to create controller", "GVK", gvk)
panic(err)
} else {
lw.Tracker = tr
}

// Register the LeaseWatcher with the manager
if err := lw.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "GVK", gvk)
panic(err)
}

// Add metrics server expvar handler
if params.MetricsBindAddress != "" {
setupLog.Info("Adding /debug/vars to metrics", "address", params.MetricsBindAddress)
if err := mgr.AddMetricsServerExtraHandler("/debug/vars", expvar.Handler()); err != nil {
setupLog.Error(err, "unable to set up metrics server extra handler")
os.Exit(1)
}
}

healthCheck := func(req *http.Request) error {
return healthCheck(req, mgr, gvk)
}

if err := mgr.AddHealthzCheck("gvk", healthCheck); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}

// Ready check: verify manager cache is synced
readyCheck := func(req *http.Request) error {
if !mgr.GetCache().WaitForCacheSync(req.Context()) {
return fmt.Errorf("cache not synced")
}
return nil
}
if err := mgr.AddReadyzCheck("readyz", readyCheck); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

setupLog.Info("Starting manager", "group", params.Group, "version", params.Version, "kind", params.Kind, "leaderElectionID", leaderElectionID)
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
panic(err)
}
}

// parseParameters returns a ParseParams struct instead of a tuple to make it
// easier to extend and pass around in tests.
func parseParameters() ParseParams {
var group, version, kind string
var optInLabelKey, optInLabelValue string
flag.StringVar(&group, "group", "", "Kubernetes API group (e.g., \"apps\")")
Expand Down Expand Up @@ -84,56 +192,71 @@ func main() {
optInLabelValue = os.Getenv("LEASE_OPT_IN_LABEL_VALUE")
}

// Leader election may be enabled via env var when not set via flags
if !enableLeaderElection {
if ele := os.Getenv("LEASE_LEADER_ELECTION"); ele != "" {
if strings.EqualFold(ele, "true") || ele == "1" {
enableLeaderElection = true
}
}
}

// If no leader election namespace was provided via flags, allow env fallback.
if leaderElectionNamespace == "" {
leaderElectionNamespace = os.Getenv("LEASE_LEADER_ELECTION_NAMESPACE")
}

return ParseParams{
Group: group,
Version: version,
Kind: kind,
OptInLabelKey: optInLabelKey,
OptInLabelValue: optInLabelValue,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: probeAddr,
PprofBindAddress: pprofAddr,
LeaderElectionEnabled: enableLeaderElection,
LeaderElectionNamespace: leaderElectionNamespace,
}
}

// Parse leader election configuration from flags and environment.
// Returns (enabled, namespace, error)
func parseLeaderElectionConfig(enableLeaderElection bool, leaderElectionNamespace string) (bool, string, error) {
if !enableLeaderElection {
if val := os.Getenv("LEASE_LEADER_ELECTION"); val != "" {
var err error
enableLeaderElection, err = strconv.ParseBool(val)
if err != nil {
fmt.Printf("Invalid LEASE_LEADER_ELECTION value: %v\n", err)
os.Exit(1)
return false, "", fmt.Errorf("invalid LEASE_LEADER_ELECTION value: %v", err)
}

// If leader election is enabled, check for namespace and fail if not set and not running in a cluster
if enableLeaderElection && leaderElectionNamespace == "" {
leaderElectionNamespace = os.Getenv("LEASE_LEADER_ELECTION_NAMESPACE")
}
if leaderElectionNamespace == "" {
// If running outside a cluster, we need a namespace for leader election
if _, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); os.IsNotExist(err) {
fmt.Println("Leader election enabled but LEASE_LEADER_ELECTION_NAMESPACE is not set. Please set it to a valid namespace.")
os.Exit(1)
} else {
// Default to the namespace file if running in a cluster
data, _ := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")

if enableLeaderElection && leaderElectionNamespace == "" {
if _, err := statFn("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); os.IsNotExist(err) {
return false, "", fmt.Errorf("leader election enabled but LEASE_LEADER_ELECTION_NAMESPACE is not set; set it to a valid namespace")
}
// we're in a cluster; default to serviceaccount namespace
if data, err := readFileFn("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
leaderElectionNamespace = strings.TrimSpace(string(data))
}
}
}
}

if version == "" || kind == "" {
fmt.Println("Usage: lease-controller -group=GROUP -version=VERSION -kind=KIND [--leader-elect] [--leader-elect-namespace=NAMESPACE]")
fmt.Println("Or set LEASE_GVK_GROUP, LEASE_GVK_VERSION, LEASE_GVK_KIND, LEASE_LEADER_ELECTION env vars")
os.Exit(1)
}

scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

gvk := schema.GroupVersionKind{
Group: group,
Version: version,
Kind: kind,
if leaderElectionNamespace == "" {
leaderElectionNamespace = os.Getenv("LEASE_LEADER_ELECTION_NAMESPACE")
}
return enableLeaderElection, leaderElectionNamespace, nil
}

// Use a unique leader election ID per GVK in lower case
// Build manager options for a given GVK and flags; extracted for unit testing
func buildManagerOptions(scheme *runtime.Scheme, group, version, kind string, metricsAddr, probeAddr, pprofAddr string, enableLeaderElection bool, leaderElectionNamespace string) ctrl.Options {
leaderElectionID := strings.ToLower(fmt.Sprintf("object-lease-controller-%s-%s-%s", group, version, kind))

// Set up metrics server options
metricsServerOptions := metricsserver.Options{
BindAddress: metricsAddr,
}

metricsServerOptions := metricsserver.Options{BindAddress: metricsAddr}
mgrOpts := ctrl.Options{
Scheme: scheme,
LeaderElection: enableLeaderElection,
Expand All @@ -143,24 +266,20 @@ func main() {
Metrics: metricsServerOptions,
HealthProbeBindAddress: probeAddr,
Cache: cache.Options{
DefaultTransform: util.MinimalObjectTransform(
AnnTTL, AnnLeaseStart, AnnExpireAt, AnnStatus,
),
DefaultTransform: util.MinimalObjectTransform(AnnTTL, AnnLeaseStart, AnnExpireAt, AnnStatus),
},
}

if pprofAddr != "" {
mgrOpts.PprofBindAddress = pprofAddr
}
return mgrOpts
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts)
if err != nil {
setupLog.Error(err, "unable to start manager")
panic(err)
}

// Create a LeaseWatcher for the specified GVK
lw := &controllers.LeaseWatcher{
// Create a LeaseWatcher attached to the given manager. The LeaseWatcher is initialized
// with default annotations and metrics for the provided GVK. The function does not
// call SetupWithManager - this is left to the caller.
func newLeaseWatcher(mgr ctrl.Manager, gvk schema.GroupVersionKind, leaderElectionID string) *controllers.LeaseWatcher {
return &controllers.LeaseWatcher{
Client: mgr.GetClient(),
GVK: gvk,
Recorder: mgr.GetEventRecorderFor(leaderElectionID),
Expand All @@ -172,68 +291,27 @@ func main() {
},
Metrics: ometrics.NewLeaseMetrics(gvk),
}
}

if optInLabelKey != "" && optInLabelValue != "" {
tracker := util.NewNamespaceTracker()

nw := &controllers.NamespaceReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(leaderElectionID),
LabelKey: optInLabelKey,
LabelValue: optInLabelValue,
Tracker: tracker,
}

// Register NamespaceReconciler with the manager
if err := nw.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "GVK", gvk)
panic(err)
}

lw.Tracker = tracker
}

// Register the LeaseWatcher with the manager
if err := lw.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "GVK", gvk)
panic(err)
}

// Add metrics server expvar handler
if metricsAddr != "" {
setupLog.Info("Adding /debug/vars to metrics", "address", metricsAddr)
if err := mgr.AddMetricsServerExtraHandler("/debug/vars", expvar.Handler()); err != nil {
setupLog.Error(err, "unable to set up metrics server extra handler")
os.Exit(1)
}
}

healthCheck := func(req *http.Request) error {
return healthCheck(req, mgr, gvk)
}

if err := mgr.AddHealthzCheck("gvk", healthCheck); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
// If optInLabelKey and optInLabelValue are provided, create a NamespaceReconciler and
// register it with the manager. Returns the NamespaceTracker that was created if any,
// or nil if opt-in was not requested.
func configureNamespaceReconciler(mgr ctrl.Manager, optInLabelKey, optInLabelValue, leaderElectionID string) (*util.NamespaceTracker, error) {
if optInLabelKey == "" || optInLabelValue == "" {
return nil, nil
}

// Ready check: verify manager cache is synced
readyCheck := func(req *http.Request) error {
if !mgr.GetCache().WaitForCacheSync(req.Context()) {
return fmt.Errorf("cache not synced")
}
return nil
}
if err := mgr.AddReadyzCheck("readyz", readyCheck); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
tracker := util.NewNamespaceTracker()
nw := &controllers.NamespaceReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(leaderElectionID),
LabelKey: optInLabelKey,
LabelValue: optInLabelValue,
Tracker: tracker,
}

setupLog.Info("Starting manager", "group", group, "version", version, "kind", kind, "leaderElectionID", leaderElectionID)
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
panic(err)
if err := nw.SetupWithManager(mgr); err != nil {
return nil, err
}
return tracker, nil
}

// Health check: confirm GVK is discoverable and listable with minimal load
Expand Down
Loading