Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/controller/chi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,15 @@ func (c *Controller) handleObject(obj interface{}) {
}

func ShouldEnqueue(cr *api.ClickHouseInstallation) bool {
if cr == nil {
return false
}

if cr.Spec.Suspend.Value() {
log.V(2).M(cr).Info("skip enqueue, CHI is suspended")
return false
}

ns := cr.GetNamespace()
if !chop.Config().IsNamespaceWatched(ns) {
log.V(2).M(cr).Info("skip enqueue, namespace '%s' is not watched or is in deny list", ns)
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/chi/worker-deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ func (w *worker) hasHostVolumesToRetain(ctx context.Context, host *api.Host) (ha
// Check whether among all PVCs host has reclaim policy "retain" specified
storage.NewStoragePVC(w.c.kube.Storage()).WalkDiscoveredPVCs(ctx, host, func(pvc *core.PersistentVolumeClaim) {
if chiLabeler.New(nil).GetReclaimPolicy(pvc.GetObjectMeta()) == api.PVCReclaimPolicyRetain {
w.a.V(1).F().Info("PVC: %s/%s blocks drop replica. Reclaim policy: %s", api.PVCReclaimPolicyRetain.String())
w.a.V(1).F().Info(
"PVC: %s/%s blocks drop replica. Reclaim policy: %s",
api.PVCReclaimPolicyRetain.String(),
)
has = true
}
})
Expand Down Expand Up @@ -451,7 +454,7 @@ func (a dropReplicaOptionsArr) First() *dropReplicaOptions {
// dropZKReplica drops replica's info from Zookeeper
func (w *worker) dropZKReplica(ctx context.Context, hostToDrop *api.Host, opts *dropReplicaOptions) error {
if hostToDrop == nil {
w.a.V(1).F().Error("FAILED to drop replica. Need to have host to drop. hostToDrop: %s", hostToDrop.GetName())
w.a.V(1).F().Error("FAILED to drop replica. Need to have host to drop. hostToDrop is nil")
return nil
}

Expand All @@ -462,7 +465,7 @@ func (w *worker) dropZKReplica(ctx context.Context, hostToDrop *api.Host, opts *
}

if hostToRunOn == nil {
w.a.V(1).F().Error("FAILED to drop replica. hostToRunOn: %s, hostToDrop: %s", hostToRunOn.GetName(), hostToDrop.GetName())
w.a.V(1).F().Error("FAILED to drop replica. hostToRunOn is nil, hostToDrop: %s", hostToDrop.GetName())
return nil
}

Expand Down
79 changes: 71 additions & 8 deletions pkg/controller/chi/worker-reconciler-chi.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal
switch {
case w.isAfterFinalizerInstalled(old, new):
w.a.M(new).F().Info("isAfterFinalizerInstalled - continue reconcile-1")
case w.isGenerationTheSame(old, new):
log.V(2).M(new).F().Info("isGenerationTheSame() - nothing to do here, exit")
return nil
}

w.a.M(new).S().P()
Expand All @@ -62,6 +59,8 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal
startTime := time.Now()

new = w.buildCR(ctx, new)
generationTheSame := w.isGenerationTheSame(old, new)
hasUnhealthyHosts := w.hasUnhealthyHosts(ctx, new)

switch {
case new.Spec.Suspend.Value():
Expand All @@ -87,6 +86,12 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal
metrics.CRReconcilesCompleted(ctx, new)
}
return nil
case generationTheSame && !hasUnhealthyHosts:
w.a.M(new).F().Info("isGenerationTheSame() and all hosts are healthy - nothing to do, exit")
metrics.CRReconcilesCompleted(ctx, new)
return nil
case generationTheSame && hasUnhealthyHosts:
w.a.M(new).F().Warning("Generation is unchanged, but unhealthy hosts detected - forcing steady-state recovery reconcile")
case new.HasReconcileWork():
w.a.M(new).F().Info("CR has reconcile work - continue reconcile")
case w.isAfterFinalizerInstalled(new.GetAncestorT(), new):
Expand Down Expand Up @@ -401,8 +406,12 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o

// Start with force-restart host
if w.shouldForceRestartHost(ctx, host) {
w.a.V(1).M(host).F().Info("Reconcile host STS force restart: %s", host.GetName())
_ = w.hostForceRestart(ctx, host, opts)
if w.isHostHealthyForReconcile(ctx, host) && !w.isShardSafeToDisruptHost(ctx, host) {
w.a.V(1).M(host).F().Warning("Skip force restart for host due to shard safety guard (no healthy peer): %s", host.GetName())
} else {
w.a.V(1).M(host).F().Info("Reconcile host STS force restart: %s", host.GetName())
_ = w.hostForceRestart(ctx, host, opts)
}
}

w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, host.IsStopped())
Expand Down Expand Up @@ -670,12 +679,66 @@ func (w *worker) reconcileClusterShardsAndHosts(ctx context.Context, cluster *ap
}

func (w *worker) reconcileShardWithHosts(ctx context.Context, shard api.IShard) error {
if err := w.reconcileShard(ctx, shard); err != nil {
if err := w.reconcileShardWithHook(ctx, shard); err != nil {
return err
}
return shard.WalkHostsAbortOnError(func(host *api.Host) error {
return w.reconcileHost(ctx, host)

recoveryHosts := make([]*api.Host, 0)
rolloutHosts := make([]*api.Host, 0)
shard.WalkHosts(func(host *api.Host) error {
if w.isHostHealthyForReconcile(ctx, host) {
rolloutHosts = append(rolloutHosts, host)
} else {
recoveryHosts = append(recoveryHosts, host)
}
return nil
})

recoveredHosts := 0
reconciledHosts := 0
deferredHosts := 0

for _, host := range recoveryHosts {
w.a.V(1).M(host).F().Info("Recovery-first pass: reconciling unhealthy host before rollout")
if err := w.reconcileHostWithHook(ctx, host); err != nil {
return err
}
recoveredHosts++
}

for _, host := range rolloutHosts {
if w.hostMayRequireDisruption(ctx, host) && !w.isShardSafeToDisruptHost(ctx, host) {
w.a.V(1).M(host).F().Warning("Deferring host reconcile due to shard safety guard (no healthy peer). Host: %s", host.GetName())
deferredHosts++
continue
}

if err := w.reconcileHostWithHook(ctx, host); err != nil {
return err
}
reconciledHosts++
}

if deferredHosts > 0 && recoveredHosts == 0 && reconciledHosts == 0 {
w.a.V(1).M(shard).F().Warning("No progress in shard reconcile: %d host(s) deferred by shard safety guard", deferredHosts)
return common.ErrCRUDAbort
}

return nil
}

func (w *worker) reconcileShardWithHook(ctx context.Context, shard api.IShard) error {
if w.reconcileShardFn != nil {
return w.reconcileShardFn(ctx, shard)
}
return w.reconcileShard(ctx, shard)
}

func (w *worker) reconcileHostWithHook(ctx context.Context, host *api.Host) error {
if w.reconcileHostFn != nil {
return w.reconcileHostFn(ctx, host)
}
return w.reconcileHost(ctx, host)
}

// reconcileShard reconciles specified shard, excluding nested replicas
Expand Down
146 changes: 146 additions & 0 deletions pkg/controller/chi/worker-reconciler-shard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package chi

import (
"context"
"reflect"
"testing"

api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1"
"github.com/altinity/clickhouse-operator/pkg/apis/common/types"
)

func makeTestShardFixture(hostNames ...string) (*api.ClickHouseInstallation, *api.ChiShard, []*api.Host) {
shard := &api.ChiShard{
Name: "shard-0",
Hosts: make([]*api.Host, 0, len(hostNames)),
}
cluster := &api.Cluster{
Name: "cluster-0",
Layout: &api.ChiClusterLayout{Shards: []*api.ChiShard{shard}},
}
chi := &api.ClickHouseInstallation{
Spec: api.ChiSpec{
Configuration: &api.Configuration{Clusters: []*api.Cluster{cluster}},
},
}
cluster.Runtime.CHI = chi
shard.Runtime.CHI = chi

hosts := make([]*api.Host, 0, len(hostNames))
for i, hostName := range hostNames {
host := &api.Host{Name: hostName}
host.SetCR(chi)
host.Runtime.Address.ClusterName = cluster.Name
host.Runtime.Address.ShardName = shard.Name
host.Runtime.Address.HostName = hostName
host.Runtime.Address.ReplicaIndex = i
host.GetReconcileAttributes().SetStatus(types.ObjectStatusModified)
shard.Hosts = append(shard.Hosts, host)
hosts = append(hosts, host)
}

return chi, shard, hosts
}

func Test_isShardSafeToDisruptHost(t *testing.T) {
_, _, hosts := makeTestShardFixture("host-a", "host-b")
hostA := hosts[0]
hostB := hosts[1]

health := map[string]bool{
hostA.GetName(): true,
hostB.GetName(): false,
}

w := &worker{
hostHealthyFn: func(ctx context.Context, host *api.Host) bool {
return health[host.GetName()]
},
}

if w.isShardSafeToDisruptHost(context.Background(), hostA) {
t.Fatalf("expected shard to be unsafe to disrupt %s when peer is unhealthy", hostA.GetName())
}

health[hostB.GetName()] = true
if !w.isShardSafeToDisruptHost(context.Background(), hostA) {
t.Fatalf("expected shard to be safe to disrupt %s after peer recovery", hostA.GetName())
}
}

func Test_reconcileShardWithHosts_RecoveryFirstOrdering(t *testing.T) {
_, shard, hosts := makeTestShardFixture("host-a", "host-b")
hostA := hosts[0]
hostB := hosts[1]

health := map[string]bool{
hostA.GetName(): true,
hostB.GetName(): false,
}
order := make([]string, 0)

w := &worker{
hostHealthyFn: func(ctx context.Context, host *api.Host) bool {
return health[host.GetName()]
},
reconcileShardFn: func(ctx context.Context, shard api.IShard) error {
return nil
},
reconcileHostFn: func(ctx context.Context, host *api.Host) error {
order = append(order, host.GetName())
if host.GetName() == hostB.GetName() {
// Simulate steady-state recovery before rollout continues.
health[hostB.GetName()] = true
}
return nil
},
}

if err := w.reconcileShardWithHosts(context.Background(), shard); err != nil {
t.Fatalf("reconcileShardWithHosts() unexpected error: %v", err)
}

wantOrder := []string{hostB.GetName(), hostA.GetName()}
if !reflect.DeepEqual(order, wantOrder) {
t.Fatalf("unexpected reconcile order, got=%v want=%v", order, wantOrder)
}
}

func Test_reconcileShardWithHosts_InterruptedRolloutAfterRestart_RecoversMissingReplicaFirst(t *testing.T) {
_, shard, hosts := makeTestShardFixture("chi-foo-1-0-0-0", "chi-foo-1-0-1-0")
host00 := hosts[0]
host01 := hosts[1]

// Simulate restart in the middle of rollout: host01 is still down, host00 is up.
health := map[string]bool{
host00.GetName(): true,
host01.GetName(): false,
}
order := make([]string, 0)

w := &worker{
hostHealthyFn: func(ctx context.Context, host *api.Host) bool {
return health[host.GetName()]
},
reconcileShardFn: func(ctx context.Context, shard api.IShard) error {
return nil
},
reconcileHostFn: func(ctx context.Context, host *api.Host) error {
order = append(order, host.GetName())
if host.GetName() == host01.GetName() {
health[host01.GetName()] = true
}
return nil
},
}

if err := w.reconcileShardWithHosts(context.Background(), shard); err != nil {
t.Fatalf("reconcileShardWithHosts() unexpected error: %v", err)
}

// Regression assertion: missing replica is recovered before any further disruption on the healthy replica.
wantOrder := []string{host01.GetName(), host00.GetName()}
if !reflect.DeepEqual(order, wantOrder) {
t.Fatalf("unexpected reconcile order after interrupted rollout, got=%v want=%v", order, wantOrder)
}
}
Loading
Loading