diff --git a/controllers/apps/component_controller.go b/controllers/apps/component_controller.go
index 672c433bf68..605b6a2eb85 100644
--- a/controllers/apps/component_controller.go
+++ b/controllers/apps/component_controller.go
@@ -97,6 +97,7 @@ type ComponentReconciler struct {
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps.kubeblocks.io,resources=componentresourceconstraints,verbs=get;list;watch
+// +kubebuilder:rbac:groups=apps.kubeblocks.io,resources=opsrequests,verbs=get;list;watch;create
// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=serviceaccounts/status,verbs=get
@@ -189,6 +190,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
&componentPatroniDCSRepairTransformer{},
// repair PostgreSQL standby password drift after replication breaks
&componentPostgreSQLStandbyPasswordRepairTransformer{},
+ // report PostgreSQL replication health from controller-side checks
+ &componentPostgreSQLReplicationHealthTransformer{},
).Build()
// Execute stage
diff --git a/controllers/apps/transformer_component_postgresql_replication_health.go b/controllers/apps/transformer_component_postgresql_replication_health.go
new file mode 100644
index 00000000000..519b256175d
--- /dev/null
+++ b/controllers/apps/transformer_component_postgresql_replication_health.go
@@ -0,0 +1,1069 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package apps
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "hash/fnv"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/rest"
+ "k8s.io/utils/pointer"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
+ workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
+ "github.com/apecloud/kubeblocks/pkg/constant"
+ ctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component"
+ "github.com/apecloud/kubeblocks/pkg/controller/graph"
+ "github.com/apecloud/kubeblocks/pkg/controller/model"
+ intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
+)
+
+const (
+ postgreSQLReplicationReadyConditionType = "PostgreSQLReplicationReady"
+
+ postgreSQLReplicationReasonReady = "ReplicationReady"
+ postgreSQLReplicationReasonHealthCheckFailed = "ReplicationHealthCheckFailed"
+ postgreSQLReplicationReasonPrimaryNotFound = "PrimaryNotFound"
+ postgreSQLReplicationReasonReplicasNotFound = "ReplicasNotFound"
+ postgreSQLReplicationReasonReplicasNotApplicable = "ReplicasNotApplicable"
+ postgreSQLReplicationReasonSlotInactive = "SlotInactive"
+ postgreSQLReplicationReasonWALReceiverMissing = "WalReceiverMissing"
+ postgreSQLReplicationReasonWALReceiverNotStreaming = "WalReceiverNotStreaming"
+
+ postgreSQLReplicationHealthRequeueInterval = 5 * time.Minute
+ postgreSQLReplicationExecTimeout = 5 * time.Second
+ postgreSQLReplicationMaxJSONOutputBytes = 64 * 1024
+
+ postgreSQLReplicationAutoRebuildAnnotation = "apps.kubeblocks.io/postgresql-replication-auto-rebuild"
+ postgreSQLReplicationAutoRebuildWindowAnnotation = "apps.kubeblocks.io/postgresql-replication-auto-rebuild-window"
+ postgreSQLReplicationAutoRebuildCooldownAnnotation = "apps.kubeblocks.io/postgresql-replication-auto-rebuild-cooldown"
+
+ postgreSQLReplicationDefaultAutoRebuildWindow = 10 * time.Minute
+ postgreSQLReplicationDefaultAutoRebuildCooldown = 30 * time.Minute
+
+ postgreSQLReplicationAutoRebuildTimeoutSeconds = 30 * 60
+ postgreSQLReplicationAutoRebuildTTLSecondsAfterSucceed = 60 * 60
+ postgreSQLReplicationAutoRebuildTTLSecondsAfterUnsuccessful = 24 * 60 * 60
+
+ postgreSQLReplicationAutoRebuildPendingConditionTypePrefix = "PostgreSQLReplicationAutoRebuildPending-"
+ postgreSQLReplicationAutoRebuildConditionTypePrefix = "PostgreSQLReplicationAutoRebuild-"
+)
+
+// componentPostgreSQLReplicationHealthTransformer reports PostgreSQL
+// replication health from controller-side read-only SQL checks.
+type componentPostgreSQLReplicationHealthTransformer struct {
+ restConfig *rest.Config
+ execRunner podExecRunner
+}
+
+var _ graph.Transformer = &componentPostgreSQLReplicationHealthTransformer{}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) Transform(
+ ctx graph.TransformContext,
+ dag *graph.DAG,
+) error {
+ transCtx, _ := ctx.(*componentTransformContext)
+ if model.IsObjectDeleting(transCtx.ComponentOrig) {
+ return nil
+ }
+ if !isPostgreSQLComponentForReplicationHealth(transCtx) {
+ return nil
+ }
+ if transCtx.Component.Status.Phase != appsv1alpha1.RunningClusterCompPhase {
+ return nil
+ }
+
+ runningITS, ok := transCtx.RunningWorkload.(*workloads.InstanceSet)
+ if !ok || runningITS == nil || runningITS.Status.AvailableReplicas == 0 {
+ return nil
+ }
+
+ now := time.Now()
+ result := t.checkReplicationHealth(transCtx, runningITS)
+ t.recordHealthResult(transCtx, result)
+ if err := t.maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now); err != nil {
+ return intctrlutil.NewDelayedRequeueError(postgreSQLReplicationHealthRequeueInterval, err.Error())
+ }
+ return postgreSQLReplicationHealthRequeue(result)
+}
+
+func isPostgreSQLComponentForReplicationHealth(transCtx *componentTransformContext) bool {
+ return isPostgreSQLComponent(transCtx) ||
+ (transCtx != nil &&
+ transCtx.CompDef != nil &&
+ isPostgreSQLLifecycleActions(transCtx.CompDef.Spec.LifecycleActions))
+}
+
+func isPostgreSQLBuiltinHandler(handler appsv1alpha1.BuiltinActionHandlerType) bool {
+ switch strings.ToLower(strings.TrimSpace(string(handler))) {
+ case string(appsv1alpha1.PostgresqlBuiltinActionHandler),
+ string(appsv1alpha1.OfficialPostgresqlBuiltinActionHandler),
+ string(appsv1alpha1.ApeCloudPostgresqlBuiltinActionHandler):
+ return true
+ default:
+ return false
+ }
+}
+
+func isPostgreSQLLifecycleActions(actions *appsv1alpha1.ComponentLifecycleActions) bool {
+ if actions == nil {
+ return false
+ }
+ handlers := []*appsv1alpha1.LifecycleActionHandler{
+ actions.PostProvision,
+ actions.PreTerminate,
+ actions.MemberJoin,
+ actions.MemberLeave,
+ actions.Readonly,
+ actions.Readwrite,
+ actions.DataDump,
+ actions.DataLoad,
+ actions.Reconfigure,
+ actions.AccountProvision,
+ }
+ for _, handler := range handlers {
+ if handler != nil && handler.BuiltinHandler != nil && isPostgreSQLBuiltinHandler(*handler.BuiltinHandler) {
+ return true
+ }
+ }
+ return actions.RoleProbe != nil &&
+ actions.RoleProbe.BuiltinHandler != nil &&
+ isPostgreSQLBuiltinHandler(*actions.RoleProbe.BuiltinHandler)
+}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) checkReplicationHealth(
+ transCtx *componentTransformContext,
+ runningITS *workloads.InstanceSet,
+) postgreSQLReplicationHealthResult {
+ pods, err := runningPostgreSQLPods(transCtx)
+ if err != nil {
+ return postgreSQLReplicationCheckFailed(fmt.Errorf("list component pods: %w", err))
+ }
+
+ leaderName := leaderMemberPodName(runningITS)
+ if leaderName == "" {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonPrimaryNotFound,
+ Message: "PostgreSQL primary pod is not found from InstanceSet member status",
+ }
+ }
+ leaderPod := pods[leaderName]
+ if leaderPod == nil {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonPrimaryNotFound,
+ Message: fmt.Sprintf("PostgreSQL primary pod %q is not running", leaderName),
+ }
+ }
+
+ replicaPods := replicaPodsForReplicationHealth(runningITS, pods)
+ if len(replicaPods) == 0 {
+ return postgreSQLReplicationHealthResult{
+ Ready: true,
+ Reason: postgreSQLReplicationReasonReplicasNotApplicable,
+ Message: "PostgreSQL replication health check is not applicable because no running replica pods are found",
+ }
+ }
+
+ runner := t.execRunner
+ if runner == nil {
+ runner = newKubePodExecRunner(t.restConfig)
+ }
+ leaderState, err := queryPostgreSQLReplicationLeader(transCtx.Context, runner, leaderPod)
+ if err != nil {
+ return postgreSQLReplicationCheckFailed(err)
+ }
+
+ replicaStates := make(map[string]postgreSQLReplicaState, len(replicaPods))
+ for _, pod := range replicaPods {
+ state, err := queryPostgreSQLReplicationReplica(transCtx.Context, runner, pod)
+ if err != nil {
+ return postgreSQLReplicationCheckFailed(err)
+ }
+ replicaStates[pod.Name] = state
+ }
+ return evaluatePostgreSQLReplicationHealth(leaderPod.Name, leaderState, replicaStates)
+}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) recordHealthResult(
+ transCtx *componentTransformContext,
+ result postgreSQLReplicationHealthResult,
+) {
+ recordWarning := shouldRecordPostgreSQLReplicationWarning(transCtx.Component.Status.Conditions, result)
+ status := metav1.ConditionFalse
+ if result.Ready {
+ status = metav1.ConditionTrue
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationReadyConditionType,
+ Status: status,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.Now(),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ if recordWarning && transCtx.EventRecorder != nil {
+ transCtx.EventRecorder.Event(
+ transCtx.Component,
+ corev1.EventTypeWarning,
+ result.Reason,
+ result.Message,
+ )
+ }
+ t.recordAutoRebuildPendingCondition(transCtx, result)
+}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) recordAutoRebuildPendingCondition(
+ transCtx *componentTransformContext,
+ result postgreSQLReplicationHealthResult,
+) {
+ pendingType := postgreSQLReplicationAutoRebuildPendingConditionType(result.ReplicaPod)
+ for _, condition := range transCtx.Component.Status.Conditions {
+ if strings.HasPrefix(condition.Type, postgreSQLReplicationAutoRebuildPendingConditionTypePrefix) &&
+ condition.Type != pendingType {
+ meta.RemoveStatusCondition(&transCtx.Component.Status.Conditions, condition.Type)
+ }
+ }
+ if !postgreSQLReplicationAutoRebuildAllowed(transCtx.Component) || !isPostgreSQLReplicationRebuildable(result) {
+ if result.ReplicaPod != "" {
+ meta.RemoveStatusCondition(
+ &transCtx.Component.Status.Conditions,
+ pendingType,
+ )
+ }
+ return
+ }
+ if existing := meta.FindStatusCondition(transCtx.Component.Status.Conditions, pendingType); existing != nil &&
+ (existing.Status != metav1.ConditionTrue ||
+ existing.Reason != result.Reason) {
+ meta.RemoveStatusCondition(&transCtx.Component.Status.Conditions, pendingType)
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: pendingType,
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.Now(),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+}
+
+func postgreSQLReplicationCheckFailed(err error) postgreSQLReplicationHealthResult {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonHealthCheckFailed,
+ Message: fmt.Sprintf("PostgreSQL replication health check failed: %s", err.Error()),
+ }
+}
+
+func shouldRecordPostgreSQLReplicationWarning(
+ conditions []metav1.Condition,
+ result postgreSQLReplicationHealthResult,
+) bool {
+ if result.Ready {
+ return false
+ }
+ previous := meta.FindStatusCondition(conditions, postgreSQLReplicationReadyConditionType)
+ return previous == nil || previous.Status != metav1.ConditionFalse || previous.Reason != result.Reason
+}
+
+func postgreSQLReplicationHealthRequeue(result postgreSQLReplicationHealthResult) error {
+ return intctrlutil.NewDelayedRequeueError(postgreSQLReplicationHealthRequeueInterval, result.Message)
+}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) maybeCreateAutoRebuildOpsRequest(
+ transCtx *componentTransformContext,
+ dag *graph.DAG,
+ runningITS *workloads.InstanceSet,
+ result postgreSQLReplicationHealthResult,
+ now time.Time,
+) error {
+ if !postgreSQLReplicationAutoRebuildAllowed(transCtx.Component) ||
+ !isPostgreSQLReplicationRebuildable(result) {
+ return nil
+ }
+ if result.ReplicaPod == leaderMemberPodName(runningITS) {
+ return nil
+ }
+ if !postgreSQLReplicationFailureWindowSatisfied(
+ transCtx.Component.Status.Conditions,
+ result,
+ postgreSQLReplicationAutoRebuildWindow(transCtx.Component),
+ now,
+ ) {
+ return nil
+ }
+ if postgreSQLReplicationAutoRebuildInCooldown(
+ transCtx.Component.Status.Conditions,
+ result,
+ postgreSQLReplicationAutoRebuildCooldown(transCtx.Component),
+ now,
+ ) {
+ return nil
+ }
+ if dag == nil || dag.Root() == nil {
+ return fmt.Errorf("component dag is not initialized")
+ }
+ exists, err := t.hasRunningRebuildOpsRequest(transCtx)
+ if err != nil {
+ return fmt.Errorf("list running RebuildInstance OpsRequests: %w", err)
+ }
+ if exists || autoRebuildOpsRequestExistsInDAG(transCtx, dag) {
+ return nil
+ }
+
+ graphCli, ok := transCtx.Client.(model.GraphClient)
+ if !ok {
+ return fmt.Errorf("component client does not support graph writes")
+ }
+ opsRequest := buildPostgreSQLReplicationAutoRebuildOpsRequest(transCtx, result.ReplicaPod, now)
+ if err := intctrlutil.SetOwnerReference(transCtx.Cluster, opsRequest); err != nil {
+ return fmt.Errorf("set owner reference for PostgreSQL replication auto rebuild OpsRequest: %w", err)
+ }
+ graphCli.Create(dag, opsRequest, inDataContext4G())
+ recordPostgreSQLReplicationAutoRebuildCooldown(transCtx, result, opsRequest, now)
+ if transCtx.EventRecorder != nil {
+ transCtx.EventRecorder.Eventf(
+ transCtx.Component,
+ corev1.EventTypeNormal,
+ "PostgreSQLReplicationAutoRebuild",
+ "created RebuildInstance OpsRequest %q for PostgreSQL replica %q",
+ opsRequest.Name,
+ result.ReplicaPod,
+ )
+ }
+ return nil
+}
+
+func recordPostgreSQLReplicationAutoRebuildCooldown(
+ transCtx *componentTransformContext,
+ result postgreSQLReplicationHealthResult,
+ opsRequest *appsv1alpha1.OpsRequest,
+ now time.Time,
+) {
+ conditionType := postgreSQLReplicationAutoRebuildConditionType(result.ReplicaPod)
+ meta.RemoveStatusCondition(&transCtx.Component.Status.Conditions, conditionType)
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: conditionType,
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now),
+ Reason: result.Reason,
+ Message: fmt.Sprintf(
+ "created RebuildInstance OpsRequest %q for PostgreSQL replica %q",
+ opsRequest.Name,
+ result.ReplicaPod,
+ ),
+ })
+}
+
+func postgreSQLReplicationAutoRebuildAllowed(component *appsv1alpha1.Component) bool {
+ if component == nil || component.Annotations == nil {
+ return true
+ }
+ value := strings.TrimSpace(component.Annotations[postgreSQLReplicationAutoRebuildAnnotation])
+ if value == "" {
+ return true
+ }
+ enabled, err := strconv.ParseBool(value)
+ return err == nil && enabled
+}
+
+func isPostgreSQLReplicationRebuildable(result postgreSQLReplicationHealthResult) bool {
+ if result.Ready || result.ReplicaPod == "" {
+ return false
+ }
+ switch result.Reason {
+ case postgreSQLReplicationReasonSlotInactive,
+ postgreSQLReplicationReasonWALReceiverMissing,
+ postgreSQLReplicationReasonWALReceiverNotStreaming:
+ return true
+ default:
+ return false
+ }
+}
+
+func postgreSQLReplicationFailureWindowSatisfied(
+ conditions []metav1.Condition,
+ result postgreSQLReplicationHealthResult,
+ window time.Duration,
+ now time.Time,
+) bool {
+ condition := meta.FindStatusCondition(
+ conditions,
+ postgreSQLReplicationAutoRebuildPendingConditionType(result.ReplicaPod),
+ )
+ if condition == nil ||
+ condition.Status != metav1.ConditionTrue ||
+ condition.Reason != result.Reason {
+ return false
+ }
+ return !condition.LastTransitionTime.IsZero() &&
+ !condition.LastTransitionTime.Time.After(now) &&
+ now.Sub(condition.LastTransitionTime.Time) >= window
+}
+
+func postgreSQLReplicationAutoRebuildInCooldown(
+ conditions []metav1.Condition,
+ result postgreSQLReplicationHealthResult,
+ cooldown time.Duration,
+ now time.Time,
+) bool {
+ condition := meta.FindStatusCondition(conditions, postgreSQLReplicationAutoRebuildConditionType(result.ReplicaPod))
+ if condition == nil || condition.Status != metav1.ConditionTrue || condition.LastTransitionTime.IsZero() {
+ return false
+ }
+ if condition.LastTransitionTime.Time.After(now) {
+ return true
+ }
+ return now.Sub(condition.LastTransitionTime.Time) < cooldown
+}
+
+func postgreSQLReplicationAutoRebuildWindow(component *appsv1alpha1.Component) time.Duration {
+ return postgreSQLReplicationDurationAnnotation(
+ component,
+ postgreSQLReplicationAutoRebuildWindowAnnotation,
+ postgreSQLReplicationDefaultAutoRebuildWindow,
+ )
+}
+
+func postgreSQLReplicationAutoRebuildCooldown(component *appsv1alpha1.Component) time.Duration {
+ return postgreSQLReplicationDurationAnnotation(
+ component,
+ postgreSQLReplicationAutoRebuildCooldownAnnotation,
+ postgreSQLReplicationDefaultAutoRebuildCooldown,
+ )
+}
+
+func postgreSQLReplicationDurationAnnotation(
+ component *appsv1alpha1.Component,
+ key string,
+ defaultValue time.Duration,
+) time.Duration {
+ if component == nil || component.Annotations == nil {
+ return defaultValue
+ }
+ value := strings.TrimSpace(component.Annotations[key])
+ if value == "" {
+ return defaultValue
+ }
+ duration, err := time.ParseDuration(value)
+ if err != nil || duration < 0 {
+ return defaultValue
+ }
+ return duration
+}
+
+func (t *componentPostgreSQLReplicationHealthTransformer) hasRunningRebuildOpsRequest(
+ transCtx *componentTransformContext,
+) (bool, error) {
+ opsRequestList := &appsv1alpha1.OpsRequestList{}
+ if err := transCtx.Client.List(
+ transCtx.Context,
+ opsRequestList,
+ client.InNamespace(transCtx.Cluster.Namespace),
+ client.MatchingLabels{
+ constant.AppInstanceLabelKey: transCtx.Cluster.Name,
+ constant.OpsRequestTypeLabelKey: string(appsv1alpha1.RebuildInstanceType),
+ },
+ inDataContext4C(),
+ ); err != nil {
+ return false, err
+ }
+ for _, opsRequest := range opsRequestList.Items {
+ if isRunningRebuildOpsRequestForCluster(&opsRequest, transCtx.Cluster.Name) {
+ return true, nil
+ }
+ }
+
+ opsRequestList = &appsv1alpha1.OpsRequestList{}
+ if err := transCtx.Client.List(
+ transCtx.Context,
+ opsRequestList,
+ client.InNamespace(transCtx.Cluster.Namespace),
+ inDataContext4C(),
+ ); err != nil {
+ return false, err
+ }
+ for _, opsRequest := range opsRequestList.Items {
+ if isRunningRebuildOpsRequestForCluster(&opsRequest, transCtx.Cluster.Name) {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func isRunningRebuildOpsRequestForCluster(opsRequest *appsv1alpha1.OpsRequest, clusterName string) bool {
+ if opsRequest == nil ||
+ opsRequest.Spec.Type != appsv1alpha1.RebuildInstanceType ||
+ opsRequest.Spec.GetClusterName() != clusterName {
+ return false
+ }
+ return !opsRequest.IsComplete()
+}
+
+func autoRebuildOpsRequestExistsInDAG(transCtx *componentTransformContext, dag *graph.DAG) bool {
+ if dag == nil {
+ return false
+ }
+ graphCli, ok := transCtx.Client.(model.GraphClient)
+ if !ok {
+ return false
+ }
+ for _, object := range graphCli.FindAll(dag, &appsv1alpha1.OpsRequest{}) {
+ opsRequest, ok := object.(*appsv1alpha1.OpsRequest)
+ if !ok || opsRequest.Namespace != transCtx.Cluster.Namespace {
+ continue
+ }
+ if opsRequest.Spec.GetClusterName() == transCtx.Cluster.Name &&
+ opsRequest.Spec.Type == appsv1alpha1.RebuildInstanceType {
+ return true
+ }
+ }
+ return false
+}
+
+func buildPostgreSQLReplicationAutoRebuildOpsRequest(
+ transCtx *componentTransformContext,
+ podName string,
+ now time.Time,
+) *appsv1alpha1.OpsRequest {
+ name := fmt.Sprintf(
+ "pg-rebuild-%s-%s-%d",
+ shortPostgreSQLReplicationHash(transCtx.Cluster.Name),
+ shortPostgreSQLReplicationHash(podName),
+ now.UnixNano(),
+ )
+ return &appsv1alpha1.OpsRequest{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: transCtx.Cluster.Namespace,
+ Labels: map[string]string{
+ constant.AppInstanceLabelKey: transCtx.Cluster.Name,
+ constant.OpsRequestTypeLabelKey: string(appsv1alpha1.RebuildInstanceType),
+ },
+ },
+ Spec: appsv1alpha1.OpsRequestSpec{
+ ClusterName: transCtx.Cluster.Name,
+ Type: appsv1alpha1.RebuildInstanceType,
+ Force: true,
+ TimeoutSeconds: pointer.Int32(postgreSQLReplicationAutoRebuildTimeoutSeconds),
+ TTLSecondsAfterSucceed: postgreSQLReplicationAutoRebuildTTLSecondsAfterSucceed,
+ TTLSecondsAfterUnsuccessfulCompletion: postgreSQLReplicationAutoRebuildTTLSecondsAfterUnsuccessful,
+ // Force allows rebuilding an abnormal replica; EnqueueOnForce keeps
+ // the auto-repair inside the normal OpsRequest queue.
+ EnqueueOnForce: true,
+ SpecificOpsRequest: appsv1alpha1.SpecificOpsRequest{
+ RebuildFrom: []appsv1alpha1.RebuildInstance{{
+ ComponentOps: appsv1alpha1.ComponentOps{
+ ComponentName: transCtx.SynthesizeComponent.Name,
+ },
+ Instances: []appsv1alpha1.Instance{{
+ Name: podName,
+ }},
+ InPlace: true,
+ }},
+ },
+ },
+ }
+}
+
+func shortPostgreSQLReplicationHash(value string) string {
+ hash := fnv.New32a()
+ _, _ = hash.Write([]byte(value))
+ return fmt.Sprintf("%08x", hash.Sum32())
+}
+
+func postgreSQLReplicationAutoRebuildConditionType(podName string) string {
+ return postgreSQLReplicationAutoRebuildConditionTypePrefix + shortPostgreSQLReplicationHash(podName)
+}
+
+func postgreSQLReplicationAutoRebuildPendingConditionType(podName string) string {
+ return postgreSQLReplicationAutoRebuildPendingConditionTypePrefix + shortPostgreSQLReplicationHash(podName)
+}
+
+var postgreSQLReplicationPSQLCommand = []string{
+ "psql",
+ "-U",
+ "postgres",
+ "-v",
+ "ON_ERROR_STOP=1",
+ "-X",
+ "-A",
+ "-t",
+ "-q",
+ "-f",
+ "-",
+}
+
+const postgreSQLReplicationLeaderSQL = `
+SELECT json_build_object(
+ 'replications', COALESCE((
+ SELECT json_agg(json_build_object(
+ 'application_name', application_name,
+ 'state', state,
+ 'sent_lsn', sent_lsn::text,
+ 'write_lsn', write_lsn::text,
+ 'flush_lsn', flush_lsn::text,
+ 'replay_lsn', replay_lsn::text,
+ 'sync_state', sync_state
+ ))
+ FROM pg_stat_replication
+ ), '[]'::json),
+ 'slots', COALESCE((
+ SELECT json_agg(json_build_object(
+ 'slot_name', slot_name,
+ 'active', active,
+ 'restart_lsn', restart_lsn::text,
+ 'retained_wal_bytes',
+ CASE
+ WHEN restart_lsn IS NULL THEN NULL
+ ELSE pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::text
+ END
+ ))
+ FROM pg_replication_slots
+ WHERE slot_type = 'physical'
+ ), '[]'::json)
+);
+`
+
+const postgreSQLReplicationReplicaSQL = `
+SELECT json_build_object(
+ 'in_recovery', pg_is_in_recovery(),
+ 'primary_conninfo', current_setting('primary_conninfo', true),
+ 'receive_lsn', pg_last_wal_receive_lsn()::text,
+ 'replay_lsn', pg_last_wal_replay_lsn()::text,
+ 'wal_receiver', (
+ SELECT json_build_object(
+ 'status', status,
+ 'sender_host', sender_host,
+ 'slot_name', slot_name,
+ 'latest_end_lsn', latest_end_lsn::text,
+ 'received_tli', received_tli
+ )
+ FROM pg_stat_wal_receiver
+ LIMIT 1
+ )
+);
+`
+
+func runningPostgreSQLPods(transCtx *componentTransformContext) (map[string]*corev1.Pod, error) {
+ labels := constant.GetComponentWellKnownLabels(transCtx.Cluster.Name, transCtx.SynthesizeComponent.Name)
+ pods, err := ctrlcomp.ListPodOwnedByComponent(
+ transCtx.Context,
+ transCtx.Client,
+ transCtx.SynthesizeComponent.Namespace,
+ labels,
+ inDataContext4C(),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ runningPods := make(map[string]*corev1.Pod, len(pods))
+ for _, pod := range pods {
+ if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning || pod.Status.PodIP == "" {
+ continue
+ }
+ runningPods[pod.Name] = pod
+ }
+ return runningPods, nil
+}
+
+func replicaPodsForReplicationHealth(
+ runningITS *workloads.InstanceSet,
+ runningPods map[string]*corev1.Pod,
+) []*corev1.Pod {
+ leaderName := leaderMemberPodName(runningITS)
+ replicas := make([]*corev1.Pod, 0)
+ for _, member := range runningITS.Status.MembersStatus {
+ if member.PodName == "" || member.PodName == leaderName {
+ continue
+ }
+ if pod := runningPods[member.PodName]; pod != nil {
+ replicas = append(replicas, pod)
+ }
+ }
+ sort.Slice(replicas, func(i, j int) bool {
+ return replicas[i].Name < replicas[j].Name
+ })
+ return replicas
+}
+
+func queryPostgreSQLReplicationLeader(
+ ctx context.Context,
+ runner podExecRunner,
+ pod *corev1.Pod,
+) (postgreSQLLeaderState, error) {
+ var state postgreSQLLeaderState
+ stdout, err := execPostgreSQLReplicationSQL(ctx, runner, pod, postgreSQLReplicationLeaderSQL)
+ if err != nil {
+ return state, err
+ }
+ if err := decodePostgreSQLReplicationJSON(stdout, &state); err != nil {
+ return state, fmt.Errorf("decode PostgreSQL primary replication state from pod %q: %w", pod.Name, err)
+ }
+ return state, nil
+}
+
+func queryPostgreSQLReplicationReplica(
+ ctx context.Context,
+ runner podExecRunner,
+ pod *corev1.Pod,
+) (postgreSQLReplicaState, error) {
+ var state postgreSQLReplicaState
+ stdout, err := execPostgreSQLReplicationSQL(ctx, runner, pod, postgreSQLReplicationReplicaSQL)
+ if err != nil {
+ return state, err
+ }
+ if err := decodePostgreSQLReplicationJSON(stdout, &state); err != nil {
+ return state, fmt.Errorf("decode PostgreSQL replica replication state from pod %q: %w", pod.Name, err)
+ }
+ return state, nil
+}
+
+func execPostgreSQLReplicationSQL(
+ ctx context.Context,
+ runner podExecRunner,
+ pod *corev1.Pod,
+ sql string,
+) (string, error) {
+ execCtx, cancel := context.WithTimeout(ctx, postgreSQLReplicationExecTimeout)
+ defer cancel()
+
+ stdout, _, err := runner.Exec(execCtx, pod, postgreSQLReplicationPSQLCommand, sql)
+ if err != nil {
+ return "", fmt.Errorf("exec PostgreSQL replication health SQL on pod %q: %w", pod.Name, err)
+ }
+ return strings.TrimSpace(stdout), nil
+}
+
+func decodePostgreSQLReplicationJSON(output string, target any) error {
+ if len(output) > postgreSQLReplicationMaxJSONOutputBytes {
+ return fmt.Errorf("output exceeds %d bytes", postgreSQLReplicationMaxJSONOutputBytes)
+ }
+ output = strings.TrimSpace(output)
+ if output == "" {
+ return fmt.Errorf("empty output")
+ }
+ if err := json.Unmarshal([]byte(output), target); err != nil {
+ return err
+ }
+ return nil
+}
+
+type postgreSQLLeaderState struct {
+ Replications []postgreSQLReplicationConnection `json:"replications"`
+ Slots []postgreSQLReplicationSlot `json:"slots"`
+}
+
+type postgreSQLReplicationConnection struct {
+ ApplicationName string `json:"application_name"`
+ State string `json:"state"`
+ SentLSN string `json:"sent_lsn"`
+ WriteLSN string `json:"write_lsn"`
+ FlushLSN string `json:"flush_lsn"`
+ ReplayLSN string `json:"replay_lsn"`
+ SyncState string `json:"sync_state"`
+}
+
+type postgreSQLReplicationSlot struct {
+ SlotName string `json:"slot_name"`
+ Active bool `json:"active"`
+ RestartLSN string `json:"restart_lsn"`
+ RetainedWALBytesText string `json:"retained_wal_bytes"`
+}
+
+type postgreSQLReplicaState struct {
+ InRecovery bool `json:"in_recovery"`
+ PrimaryConninfo string `json:"primary_conninfo"`
+ ReceiveLSN string `json:"receive_lsn"`
+ ReplayLSN string `json:"replay_lsn"`
+ WALReceiver *postgreSQLWALReceiverStatus `json:"wal_receiver"`
+}
+
+type postgreSQLWALReceiverStatus struct {
+ Status string `json:"status"`
+ SenderHost string `json:"sender_host"`
+ SlotName string `json:"slot_name"`
+ LatestEndLSN string `json:"latest_end_lsn"`
+ ReceivedTLI int64 `json:"received_tli"`
+}
+
+type postgreSQLReplicationHealthResult struct {
+ Ready bool
+ Reason string
+ Message string
+ ReplicaPod string
+}
+
+func evaluatePostgreSQLReplicationHealth(
+ leaderPodName string,
+ leaderState postgreSQLLeaderState,
+ replicaStates map[string]postgreSQLReplicaState,
+) postgreSQLReplicationHealthResult {
+ if len(replicaStates) == 0 {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonReplicasNotFound,
+ Message: "PostgreSQL replica states are not found",
+ }
+ }
+
+ replicaNames := make([]string, 0, len(replicaStates))
+ for podName := range replicaStates {
+ replicaNames = append(replicaNames, podName)
+ }
+ sort.Strings(replicaNames)
+
+ for _, podName := range replicaNames {
+ replicaState := replicaStates[podName]
+ keys := normalizedPostgreSQLReplicaKeys(podName, replicaState)
+ slot := lookupPostgreSQLSlot(leaderState.Slots, keys)
+ if slot != nil && !slot.Active {
+ retainedWAL, _ := parsePostgreSQLRetainedWALBytes(slot.RetainedWALBytesText)
+ message := fmt.Sprintf(
+ "PostgreSQL replica %q replication slot %q is inactive on primary %q",
+ podName,
+ slot.SlotName,
+ leaderPodName,
+ )
+ if retainedWAL > 0 {
+ message = fmt.Sprintf("%s, retained WAL bytes=%d", message, retainedWAL)
+ }
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: message,
+ ReplicaPod: podName,
+ }
+ }
+ if !replicaState.InRecovery {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ Message: fmt.Sprintf("PostgreSQL replica %q is not in recovery", podName),
+ ReplicaPod: podName,
+ }
+ }
+ if replicaState.WALReceiver == nil {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ Message: fmt.Sprintf("PostgreSQL replica %q has no WAL receiver", podName),
+ ReplicaPod: podName,
+ }
+ }
+ if !strings.EqualFold(strings.TrimSpace(replicaState.WALReceiver.Status), "streaming") {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverNotStreaming,
+ ReplicaPod: podName,
+ Message: fmt.Sprintf(
+ "PostgreSQL replica %q WAL receiver is %q",
+ podName,
+ replicaState.WALReceiver.Status,
+ ),
+ }
+ }
+ replication := lookupPostgreSQLReplication(leaderState.Replications, keys)
+ if replication == nil {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ ReplicaPod: podName,
+ Message: fmt.Sprintf(
+ "PostgreSQL primary %q has no replication connection for replica %q",
+ leaderPodName,
+ podName,
+ ),
+ }
+ }
+ if !strings.EqualFold(strings.TrimSpace(replication.State), "streaming") {
+ return postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverNotStreaming,
+ ReplicaPod: podName,
+ Message: fmt.Sprintf(
+ "PostgreSQL replica %q primary connection is %q on primary %q",
+ podName,
+ replication.State,
+ leaderPodName,
+ ),
+ }
+ }
+ }
+
+ return postgreSQLReplicationHealthResult{
+ Ready: true,
+ Reason: postgreSQLReplicationReasonReady,
+ Message: "PostgreSQL replication is streaming for all running replicas",
+ }
+}
+
+func normalizedPostgreSQLReplicaKeys(podName string, replicaState postgreSQLReplicaState) []string {
+ keys := make([]string, 0, 3)
+ appendKey := func(key string) {
+ key = normalizePostgreSQLReplicationName(key)
+ if key == "" {
+ return
+ }
+ for _, existing := range keys {
+ if existing == key {
+ return
+ }
+ }
+ keys = append(keys, key)
+ }
+
+ appendKey(applicationNameFromPrimaryConninfo(replicaState.PrimaryConninfo))
+ if replicaState.WALReceiver != nil {
+ appendKey(replicaState.WALReceiver.SlotName)
+ }
+ appendKey(podName)
+ return keys
+}
+
+func lookupPostgreSQLReplication(
+ replications []postgreSQLReplicationConnection,
+ keys []string,
+) *postgreSQLReplicationConnection {
+ for _, key := range keys {
+ for i := range replications {
+ if key == normalizePostgreSQLReplicationName(replications[i].ApplicationName) {
+ return &replications[i]
+ }
+ }
+ }
+ return nil
+}
+
+func lookupPostgreSQLSlot(slots []postgreSQLReplicationSlot, keys []string) *postgreSQLReplicationSlot {
+ for _, key := range keys {
+ for i := range slots {
+ if key == normalizePostgreSQLReplicationName(slots[i].SlotName) {
+ return &slots[i]
+ }
+ }
+ }
+ return nil
+}
+
+func normalizePostgreSQLReplicationName(name string) string {
+ name = strings.ToLower(strings.TrimSpace(name))
+ name = strings.ReplaceAll(name, "-", "_")
+ return name
+}
+
+func applicationNameFromPrimaryConninfo(conninfo string) string {
+ fields := parsePostgreSQLConninfo(conninfo)
+ return fields["application_name"]
+}
+
+func parsePostgreSQLConninfo(conninfo string) map[string]string {
+ fields := make(map[string]string)
+ i := 0
+ for i < len(conninfo) {
+ for i < len(conninfo) && conninfo[i] == ' ' {
+ i++
+ }
+ if i >= len(conninfo) {
+ break
+ }
+
+ keyStart := i
+ for i < len(conninfo) && conninfo[i] != '=' && conninfo[i] != ' ' {
+ i++
+ }
+ key := strings.ToLower(strings.TrimSpace(conninfo[keyStart:i]))
+ for i < len(conninfo) && conninfo[i] == ' ' {
+ i++
+ }
+ if i >= len(conninfo) || conninfo[i] != '=' {
+ for i < len(conninfo) && conninfo[i] != ' ' {
+ i++
+ }
+ continue
+ }
+ i++
+ for i < len(conninfo) && conninfo[i] == ' ' {
+ i++
+ }
+
+ var value strings.Builder
+ if i < len(conninfo) && conninfo[i] == '\'' {
+ i++
+ for i < len(conninfo) {
+ switch conninfo[i] {
+ case '\\':
+ i++
+ if i < len(conninfo) {
+ value.WriteByte(conninfo[i])
+ i++
+ }
+ case '\'':
+ i++
+ goto parsedValue
+ default:
+ value.WriteByte(conninfo[i])
+ i++
+ }
+ }
+ } else {
+ for i < len(conninfo) && conninfo[i] != ' ' {
+ value.WriteByte(conninfo[i])
+ i++
+ }
+ }
+
+ parsedValue:
+ if key != "" {
+ fields[key] = value.String()
+ }
+ }
+ return fields
+}
+
+func parsePostgreSQLRetainedWALBytes(value string) (int64, bool) {
+ value = strings.TrimSpace(value)
+ if value == "" {
+ return 0, false
+ }
+ bytes, err := strconv.ParseInt(value, 10, 64)
+ if err == nil {
+ return bytes, true
+ }
+ asFloat, err := strconv.ParseFloat(value, 64)
+ if err != nil {
+ return 0, false
+ }
+ return int64(asFloat), true
+}
diff --git a/controllers/apps/transformer_component_postgresql_replication_health_test.go b/controllers/apps/transformer_component_postgresql_replication_health_test.go
new file mode 100644
index 00000000000..2ad6f6a53c7
--- /dev/null
+++ b/controllers/apps/transformer_component_postgresql_replication_health_test.go
@@ -0,0 +1,900 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package apps
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/tools/record"
+ "sigs.k8s.io/controller-runtime/pkg/client/fake"
+
+ appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
+ workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
+ "github.com/apecloud/kubeblocks/pkg/constant"
+ "github.com/apecloud/kubeblocks/pkg/controller/graph"
+ "github.com/apecloud/kubeblocks/pkg/controller/model"
+ intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
+)
+
+func TestEvaluatePostgreSQLReplicationHealth(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ leaderState postgreSQLLeaderState
+ replicaStates map[string]postgreSQLReplicaState
+ wantReady bool
+ wantReason string
+ }{
+ {
+ name: "ready",
+ leaderState: postgreSQLLeaderState{
+ Replications: []postgreSQLReplicationConnection{{
+ ApplicationName: "test_postgresql_0",
+ State: "streaming",
+ }},
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: true,
+ }},
+ },
+ replicaStates: map[string]postgreSQLReplicaState{
+ "test-postgresql-0": {
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming"},
+ },
+ },
+ wantReady: true,
+ wantReason: postgreSQLReplicationReasonReady,
+ },
+ {
+ name: "inactive slot",
+ leaderState: postgreSQLLeaderState{
+ Replications: []postgreSQLReplicationConnection{{
+ ApplicationName: "test_postgresql_0",
+ State: "streaming",
+ }},
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: false,
+ RetainedWALBytesText: "4096",
+ }},
+ },
+ replicaStates: map[string]postgreSQLReplicaState{
+ "test-postgresql-0": {
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming"},
+ },
+ },
+ wantReason: postgreSQLReplicationReasonSlotInactive,
+ },
+ {
+ name: "missing wal receiver",
+ leaderState: postgreSQLLeaderState{
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: true,
+ }},
+ },
+ replicaStates: map[string]postgreSQLReplicaState{
+ "test-postgresql-0": {
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ },
+ },
+ wantReason: postgreSQLReplicationReasonWALReceiverMissing,
+ },
+ {
+ name: "wal receiver not streaming",
+ leaderState: postgreSQLLeaderState{
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: true,
+ }},
+ },
+ replicaStates: map[string]postgreSQLReplicaState{
+ "test-postgresql-0": {
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "stopped"},
+ },
+ },
+ wantReason: postgreSQLReplicationReasonWALReceiverNotStreaming,
+ },
+ {
+ name: "primary connection missing",
+ leaderState: postgreSQLLeaderState{
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: true,
+ }},
+ },
+ replicaStates: map[string]postgreSQLReplicaState{
+ "test-postgresql-0": {
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming"},
+ },
+ },
+ wantReason: postgreSQLReplicationReasonWALReceiverMissing,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := evaluatePostgreSQLReplicationHealth("test-postgresql-1", tt.leaderState, tt.replicaStates)
+
+ require.Equal(t, tt.wantReady, got.Ready)
+ require.Equal(t, tt.wantReason, got.Reason)
+ require.NotEmpty(t, got.Message)
+ })
+ }
+}
+
+func TestApplicationNameFromPrimaryConninfo(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ conninfo string
+ want string
+ }{
+ {
+ conninfo: "host=leader port=5432 application_name=test_postgresql_0 user=standby",
+ want: "test_postgresql_0",
+ },
+ {
+ conninfo: "host='leader pod' application_name='test-postgresql-0' password='do not log'",
+ want: "test-postgresql-0",
+ },
+ {
+ conninfo: `application_name='test\'s replica' host=leader`,
+ want: "test's replica",
+ },
+ }
+
+ for _, tt := range tests {
+ require.Equal(t, tt.want, applicationNameFromPrimaryConninfo(tt.conninfo))
+ }
+}
+
+func TestNormalizedPostgreSQLReplicaKeysPreferConfiguredNames(t *testing.T) {
+ t.Parallel()
+
+ keys := normalizedPostgreSQLReplicaKeys("test-postgresql-0", postgreSQLReplicaState{
+ PrimaryConninfo: "application_name=configured_replica",
+ WALReceiver: &postgreSQLWALReceiverStatus{SlotName: "slot_replica"},
+ })
+
+ require.Equal(t, []string{"configured_replica", "slot_replica", "test_postgresql_0"}, keys)
+}
+
+func TestLookupPostgreSQLReplicationPrefersReplicaKeyOrder(t *testing.T) {
+ t.Parallel()
+
+ keys := []string{"configured_replica", "slot_replica", "test_postgresql_0"}
+ replication := lookupPostgreSQLReplication([]postgreSQLReplicationConnection{
+ {ApplicationName: "test_postgresql_0", State: "startup"},
+ {ApplicationName: "configured_replica", State: "streaming"},
+ }, keys)
+ slot := lookupPostgreSQLSlot([]postgreSQLReplicationSlot{
+ {SlotName: "test_postgresql_0", Active: false},
+ {SlotName: "configured_replica", Active: true},
+ }, keys)
+
+ require.NotNil(t, replication)
+ require.Equal(t, "configured_replica", replication.ApplicationName)
+ require.NotNil(t, slot)
+ require.Equal(t, "configured_replica", slot.SlotName)
+}
+
+func TestDecodePostgreSQLReplicationJSONRejectsEmptyAndLargeOutput(t *testing.T) {
+ t.Parallel()
+
+ var state postgreSQLLeaderState
+ require.Error(t, decodePostgreSQLReplicationJSON("", &state))
+ require.Error(t, decodePostgreSQLReplicationJSON(strings.Repeat("x", postgreSQLReplicationMaxJSONOutputBytes+1), &state))
+ require.NoError(t, decodePostgreSQLReplicationJSON(`{"replications":[],"slots":[]}`, &state))
+}
+
+func TestComponentPostgreSQLReplicationHealthTransformerReady(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ runner := newFakePostgreSQLReplicationExecRunner(t, map[string]any{
+ leaderPodName: postgreSQLLeaderState{
+ Replications: []postgreSQLReplicationConnection{{
+ ApplicationName: "test_postgresql_0",
+ State: "streaming",
+ }},
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: true,
+ }},
+ },
+ replicaPod: postgreSQLReplicaState{
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming", SlotName: "test_postgresql_0"},
+ },
+ })
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{execRunner: runner}).Transform(transCtx, graph.NewDAG())
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, postgreSQLReplicationReadyConditionType)
+ require.NotNil(t, cond)
+ require.Equal(t, metav1.ConditionTrue, cond.Status)
+ require.Equal(t, postgreSQLReplicationReasonReady, cond.Reason)
+ require.Equal(t, []string{leaderPodName, replicaPod}, runner.pods)
+ require.Equal(t, postgreSQLReplicationLeaderSQL, runner.stdin[leaderPodName])
+ require.Equal(t, postgreSQLReplicationReplicaSQL, runner.stdin[replicaPod])
+}
+
+func TestComponentPostgreSQLReplicationHealthTransformerSlotInactive(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ recorder := record.NewFakeRecorder(1)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.EventRecorder = recorder
+ runner := newFakePostgreSQLReplicationExecRunner(t, map[string]any{
+ leaderPodName: postgreSQLLeaderState{
+ Replications: []postgreSQLReplicationConnection{{
+ ApplicationName: "test_postgresql_0",
+ State: "streaming",
+ }},
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: false,
+ RetainedWALBytesText: "8192",
+ }},
+ },
+ replicaPod: postgreSQLReplicaState{
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming", SlotName: "test_postgresql_0"},
+ },
+ })
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{execRunner: runner}).Transform(transCtx, graph.NewDAG())
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, postgreSQLReplicationReadyConditionType)
+ require.NotNil(t, cond)
+ require.Equal(t, metav1.ConditionFalse, cond.Status)
+ require.Equal(t, postgreSQLReplicationReasonSlotInactive, cond.Reason)
+ require.Contains(t, cond.Message, "retained WAL bytes=8192")
+ event := <-recorder.Events
+ require.Contains(t, event, postgreSQLReplicationReasonSlotInactive)
+}
+
+func TestComponentPostgreSQLReplicationHealthTransformerCheckFailed(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ runner := &fakePostgreSQLReplicationExecRunner{
+ t: t,
+ results: map[string]string{},
+ err: errors.New("psql failed"),
+ }
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{execRunner: runner}).Transform(transCtx, graph.NewDAG())
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, postgreSQLReplicationReadyConditionType)
+ require.NotNil(t, cond)
+ require.Equal(t, metav1.ConditionFalse, cond.Status)
+ require.Equal(t, postgreSQLReplicationReasonHealthCheckFailed, cond.Reason)
+ require.Contains(t, cond.Message, "psql failed")
+ require.NotContains(t, cond.Message, "password=secret")
+}
+
+func TestComponentPostgreSQLReplicationHealthTransformerSkipsSinglePrimary(t *testing.T) {
+ t.Parallel()
+
+ const leaderPodName = "test-postgresql-0"
+
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).Transform(transCtx, graph.NewDAG())
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ cond := meta.FindStatusCondition(transCtx.Component.Status.Conditions, postgreSQLReplicationReadyConditionType)
+ require.NotNil(t, cond)
+ require.Equal(t, metav1.ConditionTrue, cond.Status)
+ require.Equal(t, postgreSQLReplicationReasonReplicasNotApplicable, cond.Reason)
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildWaitsForWindow(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "10m",
+ }
+ runner := newFakePostgreSQLReplicationExecRunner(t, map[string]any{
+ leaderPodName: postgreSQLLeaderState{
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: false,
+ }},
+ },
+ replicaPod: postgreSQLReplicaState{
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming", SlotName: "test_postgresql_0"},
+ },
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{execRunner: runner}).Transform(transCtx, dag)
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ pending := meta.FindStatusCondition(
+ transCtx.Component.Status.Conditions,
+ postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ )
+ require.NotNil(t, pending)
+ require.Equal(t, metav1.ConditionTrue, pending.Status)
+ require.Equal(t, postgreSQLReplicationReasonSlotInactive, pending.Reason)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildTransformCreatesOpsRequestAfterWindow(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(time.Now().Add(-2 * time.Minute)),
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ })
+ runner := newFakePostgreSQLReplicationExecRunner(t, map[string]any{
+ leaderPodName: postgreSQLLeaderState{
+ Slots: []postgreSQLReplicationSlot{{
+ SlotName: "test_postgresql_0",
+ Active: false,
+ }},
+ },
+ replicaPod: postgreSQLReplicaState{
+ InRecovery: true,
+ PrimaryConninfo: "application_name=test_postgresql_0",
+ WALReceiver: &postgreSQLWALReceiverStatus{Status: "streaming", SlotName: "test_postgresql_0"},
+ },
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{execRunner: runner}).Transform(transCtx, dag)
+
+ require.True(t, intctrlutil.IsDelayedRequeueError(err))
+ require.Equal(t, postgreSQLReplicationHealthRequeueInterval, requeueAfter(t, err))
+ opsRequests := graphOpsRequests(transCtx, dag)
+ require.Len(t, opsRequests, 1)
+ require.Equal(t, appsv1alpha1.RebuildInstanceType, opsRequests[0].Spec.Type)
+ require.Equal(t, []appsv1alpha1.Instance{{Name: replicaPod}}, opsRequests[0].Spec.RebuildFrom[0].Instances)
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildCreatesOpsRequestAfterWindow(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 123)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ opsRequests := graphOpsRequests(transCtx, dag)
+ require.Len(t, opsRequests, 1)
+ opsRequest := opsRequests[0]
+ require.Equal(t, appsv1alpha1.RebuildInstanceType, opsRequest.Spec.Type)
+ require.Equal(t, transCtx.Cluster.Name, opsRequest.Spec.ClusterName)
+ require.True(t, opsRequest.Spec.Force)
+ require.True(t, opsRequest.Spec.EnqueueOnForce)
+ require.NotNil(t, opsRequest.Spec.TimeoutSeconds)
+ require.EqualValues(t, postgreSQLReplicationAutoRebuildTimeoutSeconds, *opsRequest.Spec.TimeoutSeconds)
+ require.EqualValues(t, postgreSQLReplicationAutoRebuildTTLSecondsAfterSucceed, opsRequest.Spec.TTLSecondsAfterSucceed)
+ require.EqualValues(
+ t,
+ postgreSQLReplicationAutoRebuildTTLSecondsAfterUnsuccessful,
+ opsRequest.Spec.TTLSecondsAfterUnsuccessfulCompletion,
+ )
+ require.Equal(t, transCtx.Cluster.Name, opsRequest.Labels[constant.AppInstanceLabelKey])
+ require.Equal(t, string(appsv1alpha1.RebuildInstanceType), opsRequest.Labels[constant.OpsRequestTypeLabelKey])
+ require.Len(t, opsRequest.OwnerReferences, 1)
+ require.Len(t, opsRequest.Spec.RebuildFrom, 1)
+ require.Equal(t, transCtx.SynthesizeComponent.Name, opsRequest.Spec.RebuildFrom[0].ComponentName)
+ require.True(t, opsRequest.Spec.RebuildFrom[0].InPlace)
+ require.Equal(t, []appsv1alpha1.Instance{{Name: replicaPod}}, opsRequest.Spec.RebuildFrom[0].Instances)
+
+ cooldown := meta.FindStatusCondition(
+ transCtx.Component.Status.Conditions,
+ postgreSQLReplicationAutoRebuildConditionType(replicaPod),
+ )
+ require.NotNil(t, cooldown)
+ require.Equal(t, metav1.NewTime(now), cooldown.LastTransitionTime)
+ require.Contains(t, cooldown.Message, opsRequest.Name)
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildDisabledByAnnotation(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 123)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildAnnotation: "false",
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildDisabledByInvalidAnnotation(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 123)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildAnnotation: "flase",
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildSkipsExistingCancellingOpsRequest(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 0)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ existingOpsRequest := &appsv1alpha1.OpsRequest{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "existing-rebuild",
+ Namespace: transCtx.Cluster.Namespace,
+ Labels: map[string]string{
+ constant.AppInstanceLabelKey: transCtx.Cluster.Name,
+ constant.OpsRequestTypeLabelKey: string(appsv1alpha1.RebuildInstanceType),
+ },
+ },
+ Spec: appsv1alpha1.OpsRequestSpec{
+ ClusterName: transCtx.Cluster.Name,
+ Type: appsv1alpha1.RebuildInstanceType,
+ },
+ Status: appsv1alpha1.OpsRequestStatus{
+ Phase: appsv1alpha1.OpsCancellingPhase,
+ },
+ }
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ transCtx.Client = model.NewGraphClient(fake.NewClientBuilder().
+ WithScheme(rscheme).
+ WithObjects(existingOpsRequest).
+ Build())
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildSkipsUnlabeledExistingOpsRequest(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 0)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ existingOpsRequest := &appsv1alpha1.OpsRequest{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "existing-unlabeled-rebuild",
+ Namespace: transCtx.Cluster.Namespace,
+ },
+ Spec: appsv1alpha1.OpsRequestSpec{
+ ClusterName: transCtx.Cluster.Name,
+ Type: appsv1alpha1.RebuildInstanceType,
+ },
+ Status: appsv1alpha1.OpsRequestStatus{
+ Phase: appsv1alpha1.OpsPendingPhase,
+ },
+ }
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ transCtx.Client = model.NewGraphClient(fake.NewClientBuilder().
+ WithScheme(rscheme).
+ WithObjects(existingOpsRequest).
+ Build())
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestComponentPostgreSQLReplicationAutoRebuildSkipsDuringCooldown(t *testing.T) {
+ t.Parallel()
+
+ const (
+ leaderPodName = "test-postgresql-1"
+ replicaPod = "test-postgresql-0"
+ )
+
+ now := time.Unix(1700000000, 0)
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, leaderPodName, replicaPod)
+ transCtx.Component.Annotations = map[string]string{
+ postgreSQLReplicationAutoRebuildWindowAnnotation: "1m",
+ postgreSQLReplicationAutoRebuildCooldownAnnotation: "30m",
+ }
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ Message: "wal receiver missing",
+ ReplicaPod: replicaPod,
+ }
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-2 * time.Minute)),
+ Reason: result.Reason,
+ Message: result.Message,
+ })
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: metav1.NewTime(now.Add(-10 * time.Minute)),
+ Reason: result.Reason,
+ Message: "recent rebuild",
+ })
+ dag := initializedPostgreSQLReplicationHealthDAG(t, transCtx)
+ runningITS := transCtx.RunningWorkload.(*workloads.InstanceSet)
+
+ err := (&componentPostgreSQLReplicationHealthTransformer{}).
+ maybeCreateAutoRebuildOpsRequest(transCtx, dag, runningITS, result, now)
+
+ require.NoError(t, err)
+ require.Empty(t, graphOpsRequests(transCtx, dag))
+}
+
+func TestPostgreSQLReplicationAutoRebuildPendingConditionResetsWhenReasonChanges(t *testing.T) {
+ t.Parallel()
+
+ const replicaPod = "test-postgresql-0"
+
+ oldTransitionTime := metav1.NewTime(time.Now().Add(-time.Minute))
+ transCtx := newPostgreSQLReplicationHealthTestContext(t, "test-postgresql-1", replicaPod)
+ meta.SetStatusCondition(&transCtx.Component.Status.Conditions, metav1.Condition{
+ Type: postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ Status: metav1.ConditionTrue,
+ ObservedGeneration: transCtx.Component.Generation,
+ LastTransitionTime: oldTransitionTime,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ })
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ Message: "wal receiver missing",
+ ReplicaPod: replicaPod,
+ }
+
+ (&componentPostgreSQLReplicationHealthTransformer{}).recordAutoRebuildPendingCondition(transCtx, result)
+
+ pending := meta.FindStatusCondition(
+ transCtx.Component.Status.Conditions,
+ postgreSQLReplicationAutoRebuildPendingConditionType(replicaPod),
+ )
+ require.NotNil(t, pending)
+ require.Equal(t, postgreSQLReplicationReasonWALReceiverMissing, pending.Reason)
+ require.True(t, pending.LastTransitionTime.After(oldTransitionTime.Time))
+}
+
+func TestShouldRecordPostgreSQLReplicationWarningOnlyWhenReasonChanges(t *testing.T) {
+ t.Parallel()
+
+ result := postgreSQLReplicationHealthResult{
+ Ready: false,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ Message: "slot inactive",
+ }
+ require.True(t, shouldRecordPostgreSQLReplicationWarning(nil, result))
+ require.False(t, shouldRecordPostgreSQLReplicationWarning([]metav1.Condition{{
+ Type: postgreSQLReplicationReadyConditionType,
+ Status: metav1.ConditionFalse,
+ Reason: postgreSQLReplicationReasonSlotInactive,
+ }}, result))
+ require.True(t, shouldRecordPostgreSQLReplicationWarning([]metav1.Condition{{
+ Type: postgreSQLReplicationReadyConditionType,
+ Status: metav1.ConditionFalse,
+ Reason: postgreSQLReplicationReasonWALReceiverMissing,
+ }}, result))
+ require.False(t, shouldRecordPostgreSQLReplicationWarning(nil, postgreSQLReplicationHealthResult{
+ Ready: true,
+ Reason: postgreSQLReplicationReasonReady,
+ }))
+}
+
+func TestPostgreSQLComponentRecognizesBuiltinHandler(t *testing.T) {
+ t.Parallel()
+
+ handler := appsv1alpha1.OfficialPostgresqlBuiltinActionHandler
+ transCtx := &componentTransformContext{
+ CompDef: &appsv1alpha1.ComponentDefinition{
+ Spec: appsv1alpha1.ComponentDefinitionSpec{
+ LifecycleActions: &appsv1alpha1.ComponentLifecycleActions{
+ RoleProbe: &appsv1alpha1.RoleProbe{
+ LifecycleActionHandler: appsv1alpha1.LifecycleActionHandler{
+ BuiltinHandler: &handler,
+ },
+ },
+ },
+ },
+ },
+ }
+
+ require.False(t, isPostgreSQLComponent(transCtx))
+ require.True(t, isPostgreSQLComponentForReplicationHealth(transCtx))
+}
+
+func newPostgreSQLReplicationHealthTestContext(
+ t *testing.T,
+ leaderPodName string,
+ replicaPodNames ...string,
+) *componentTransformContext {
+ t.Helper()
+ return newPostgreSQLStandbyPasswordRepairTestContext(t, leaderPodName, replicaPodNames...)
+}
+
+func newFakePostgreSQLReplicationExecRunner(
+ t *testing.T,
+ states map[string]any,
+) *fakePostgreSQLReplicationExecRunner {
+ t.Helper()
+
+ results := make(map[string]string, len(states))
+ for podName, state := range states {
+ data, err := json.Marshal(state)
+ require.NoError(t, err)
+ results[podName] = string(data)
+ }
+ return &fakePostgreSQLReplicationExecRunner{t: t, results: results}
+}
+
+func initializedPostgreSQLReplicationHealthDAG(
+ t *testing.T,
+ transCtx *componentTransformContext,
+) *graph.DAG {
+ t.Helper()
+
+ dag := graph.NewDAG()
+ require.NoError(t, (&componentInitTransformer{}).Transform(transCtx, dag))
+ return dag
+}
+
+func graphOpsRequests(transCtx *componentTransformContext, dag *graph.DAG) []*appsv1alpha1.OpsRequest {
+ graphClient := transCtx.Client.(model.GraphClient)
+ objects := graphClient.FindAll(dag, &appsv1alpha1.OpsRequest{})
+ opsRequests := make([]*appsv1alpha1.OpsRequest, 0, len(objects))
+ for _, object := range objects {
+ opsRequests = append(opsRequests, object.(*appsv1alpha1.OpsRequest))
+ }
+ return opsRequests
+}
+
+func requeueAfter(t *testing.T, err error) time.Duration {
+ t.Helper()
+
+ requeueErr, ok := err.(intctrlutil.RequeueError)
+ require.True(t, ok)
+ return requeueErr.RequeueAfter()
+}
+
+type fakePostgreSQLReplicationExecRunner struct {
+ t *testing.T
+ results map[string]string
+ err error
+ pods []string
+ stdin map[string]string
+}
+
+func (r *fakePostgreSQLReplicationExecRunner) Exec(
+ ctx context.Context,
+ pod *corev1.Pod,
+ command []string,
+ stdin string,
+) (string, string, error) {
+ require.NoError(r.t, ctx.Err())
+ require.Equal(r.t, postgreSQLReplicationPSQLCommand, command)
+ if r.stdin == nil {
+ r.stdin = make(map[string]string)
+ }
+ r.pods = append(r.pods, pod.Name)
+ r.stdin[pod.Name] = stdin
+ if r.err != nil {
+ return "", "password=secret should not leak", r.err
+ }
+ return r.results[pod.Name], "", nil
+}
+
+var _ podExecRunner = &fakePostgreSQLReplicationExecRunner{}