From 4b9ca6c8c0c6e6d32031e7884bc3be709c268f87 Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Tue, 9 Jun 2026 17:17:47 +0800 Subject: [PATCH 1/2] fix restart delete all pods issue Signed-off-by: Patrick Zhao --- .../core/environment/service/environment.go | 2 +- pkg/tool/kube/updater/deployment_v2.go | 29 ++++++++----------- pkg/tool/kube/updater/statefulset_v2.go | 19 +++++------- 3 files changed, 20 insertions(+), 30 deletions(-) diff --git a/pkg/microservice/aslan/core/environment/service/environment.go b/pkg/microservice/aslan/core/environment/service/environment.go index 2b9fab9efb..874d73f602 100644 --- a/pkg/microservice/aslan/core/environment/service/environment.go +++ b/pkg/microservice/aslan/core/environment/service/environment.go @@ -2942,7 +2942,7 @@ func restartRelatedWorkloads(env *commonmodels.Product, service *commonmodels.Pr err = updater.RestartDeploymentV2(context.Background(), clusterID, env.Namespace, u.GetName()) return errors.Wrapf(err, "failed to restart deployment %s", u.GetName()) case setting.StatefulSet: - // err = updater.RestartStatefulSet(env.Namespace, u.GetName(), kubeClient) + err = updater.RestartStatefulSetV2(context.Background(), clusterID, env.Namespace, u.GetName()) return errors.Wrapf(err, "failed to restart statefulset %s", u.GetName()) } } diff --git a/pkg/tool/kube/updater/deployment_v2.go b/pkg/tool/kube/updater/deployment_v2.go index 7b06ca408d..5d2d3d53a1 100644 --- a/pkg/tool/kube/updater/deployment_v2.go +++ b/pkg/tool/kube/updater/deployment_v2.go @@ -22,7 +22,6 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -49,19 +48,15 @@ func RestartDeploymentV2(ctx context.Context, clusterID, namespace, name string) return fmt.Errorf("failed to get deployment %s/%s: %w", namespace, name, err) } - selector, err := metav1.LabelSelectorAsSelector(deploy.Spec.Selector) - if err != nil { - return fmt.Errorf("failed to parse deployment selector: %w", err) - } - - deleteOpts := []client.DeleteAllOfOption{ - client.InNamespace(namespace), - client.MatchingLabelsSelector{Selector: selector}, + now := time.Now().Format(time.RFC3339Nano) + patch := client.MergeFrom(deploy.DeepCopy()) + if deploy.Spec.Template.Annotations == nil { + deploy.Spec.Template.Annotations = map[string]string{} } + deploy.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = now - pod := &corev1.Pod{} - if err := c.DeleteAllOf(ctx, pod, deleteOpts...); err != nil { - return fmt.Errorf("failed to delete pods for deployment %s/%s: %w", namespace, name, err) + if err := c.Patch(ctx, deploy, patch); err != nil { + return fmt.Errorf("failed to restart %s/deploy/%s: %w", namespace, name, err) } return nil @@ -79,7 +74,7 @@ func DeleteDeploymentV2(ctx context.Context, clusterID, namespace string, opts . if config.name != "" && config.selector != "" { return fmt.Errorf("cannot specify both name and selector simultaneously") } - + c, err := clientmanager.NewKubeClientManager().GetControllerRuntimeClient(clusterID) if err != nil { return fmt.Errorf("failed to get kube client: %w", err) @@ -92,12 +87,12 @@ func DeleteDeploymentV2(ctx context.Context, clusterID, namespace string, opts . Name: config.name, }, } - + propagationPolicy := metav1.DeletePropagationBackground deleteOpts := &client.DeleteOptions{ PropagationPolicy: &propagationPolicy, } - + err = c.Delete(ctx, deploy, deleteOpts) return util.IgnoreNotFoundError(err) } @@ -200,7 +195,7 @@ func DeleteDeploymentAndWaitV2(ctx context.Context, clusterID, namespace string, return nil } -// CreateOrPatchDeploymentV2 is used when the YAML is fully controlled by this system, it implements a 3-way merge patch for the deployment. +// CreateOrPatchDeploymentV2 is used when the YAML is fully controlled by this system, it implements a 3-way merge patch for the deployment. // If we are simply editing the deployment, use UpdateDeploymentV2 instead. func CreateOrPatchDeploymentV2(ctx context.Context, clusterID, namespace, originalYAML, targetYAML string, resourceOverride bool) error { c, err := clientmanager.NewKubeClientManager().GetKubernetesClientSet(clusterID) @@ -438,4 +433,4 @@ func CreateDeploymentV2(ctx context.Context, clusterID, namespace string, deploy } return nil -} \ No newline at end of file +} diff --git a/pkg/tool/kube/updater/statefulset_v2.go b/pkg/tool/kube/updater/statefulset_v2.go index 4df3d37a78..2ea5f4bd2a 100644 --- a/pkg/tool/kube/updater/statefulset_v2.go +++ b/pkg/tool/kube/updater/statefulset_v2.go @@ -22,7 +22,6 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -49,19 +48,15 @@ func RestartStatefulSetV2(ctx context.Context, clusterID, namespace, name string return fmt.Errorf("failed to get statefulset %s/%s: %w", namespace, name, err) } - selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector) - if err != nil { - return fmt.Errorf("failed to parse statefulset selector: %w", err) - } - - deleteOpts := []client.DeleteAllOfOption{ - client.InNamespace(namespace), - client.MatchingLabelsSelector{Selector: selector}, + now := time.Now().Format(time.RFC3339Nano) + patch := client.MergeFrom(sts.DeepCopy()) + if sts.Spec.Template.Annotations == nil { + sts.Spec.Template.Annotations = map[string]string{} } + sts.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = now - pod := &corev1.Pod{} - if err := c.DeleteAllOf(ctx, pod, deleteOpts...); err != nil { - return fmt.Errorf("failed to delete pods for statefulset %s/%s: %w", namespace, name, err) + if err := c.Patch(ctx, sts, patch); err != nil { + return fmt.Errorf("failed to restart %s/statefulset/%s: %w", namespace, name, err) } return nil From 827e67a8b0c81720cf9631c845c70a20a6be7478 Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Wed, 10 Jun 2026 10:22:48 +0800 Subject: [PATCH 2/2] fix restart job Signed-off-by: Patrick Zhao --- .../jobcontroller/job_restart.go | 125 +++++++++--------- 1 file changed, 65 insertions(+), 60 deletions(-) diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_restart.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_restart.go index 1d04f385fc..1461aa01c8 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_restart.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_restart.go @@ -19,14 +19,19 @@ package jobcontroller import ( "context" "fmt" + "sort" "time" "github.com/koderover/zadig/v2/pkg/tool/clientmanager" helmtool "github.com/koderover/zadig/v2/pkg/tool/helmclient" + "github.com/koderover/zadig/v2/pkg/tool/kube/getter" + "github.com/koderover/zadig/v2/pkg/tool/kube/updater" "github.com/koderover/zadig/v2/pkg/util" "github.com/pkg/errors" "go.uber.org/zap" versionedclient "istio.io/client-go/pkg/clientset/versioned" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" crClient "sigs.k8s.io/controller-runtime/pkg/client" @@ -168,7 +173,7 @@ func (c *RestartJobCtl) restartK8sService(ctx context.Context, env *commonmodels return fmt.Errorf("failed to fetch imported manifests: %v", err) } - replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env) + replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env, c.kubeClient, c.clientSet) if err != nil { return fmt.Errorf("failed to restart workload resources: %v", err) } @@ -225,7 +230,7 @@ func (c *RestartJobCtl) restartHelmService(ctx context.Context, env *commonmodel } } - replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env) + replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env, c.kubeClient, c.clientSet) if err != nil { return fmt.Errorf("failed to restart workload resources: %v", err) } @@ -236,64 +241,64 @@ func (c *RestartJobCtl) restartHelmService(ctx context.Context, env *commonmodel return nil } -func restartWorkloadResources(ctx context.Context, clusterID string, resources []*kube.WorkloadResource, env *commonmodels.Product) (replaceResources []commonmodels.Resource, relatedPodLabels []map[string]string, err error) { - // deployments, statefulSets, _, _, _, err := kube.FetchSelectedWorkloads(env.Namespace, resources, kubeClient, clientSet) - // if err != nil { - // return nil, nil, err - // } - - // for _, deployment := range deployments { - // err = updater.RestartDeploymentV2(ctx, clusterID, deployment.Namespace, deployment.Name) - // if err != nil { - // return nil, nil, fmt.Errorf("failed to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) - // } - - // selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) - // if err != nil { - // return nil, nil, fmt.Errorf("failed to get selector for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) - // } - - // // ensure latest replicaset to be created - // replicaSets, err := getter.ListReplicaSets(deployment.Namespace, selector, kubeClient) - // if err != nil { - // return nil, nil, fmt.Errorf("failed to list replica sets for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) - // } - - // // Only include those whose ControllerRef matches the Deployment. - // owned := make([]*appsv1.ReplicaSet, 0, len(replicaSets)) - // for _, rs := range replicaSets { - // if metav1.IsControlledBy(rs, deployment) { - // owned = append(owned, rs) - // } - // } - // if len(owned) <= 0 { - // return nil, nil, fmt.Errorf("no replicaset found for deployment: %s", deployment.Name) - // } - // sort.Slice(owned, func(i, j int) bool { - // return owned[i].CreationTimestamp.After(owned[j].CreationTimestamp.Time) - // }) - - // replaceResources = append(replaceResources, commonmodels.Resource{ - // Kind: setting.Deployment, - // Name: deployment.Name, - // PodOwnerUID: string(owned[0].ObjectMeta.UID), - // }) - // relatedPodLabels = append(relatedPodLabels, deployment.Spec.Template.Labels) - // } - - // for _, sts := range statefulSets { - // err = updater.RestartStatefulSetV2(ctx, clusterID, sts.Namespace, sts.Name) - // if err != nil { - // return nil, nil, fmt.Errorf("failed to restart statefulset %s/%s: %v", sts.Namespace, sts.Name, err) - // } - - // replaceResources = append(replaceResources, commonmodels.Resource{ - // Kind: setting.StatefulSet, - // Name: sts.Name, - // PodOwnerUID: string(sts.ObjectMeta.UID), - // }) - // relatedPodLabels = append(relatedPodLabels, sts.Spec.Template.Labels) - // } +func restartWorkloadResources(ctx context.Context, clusterID string, resources []*kube.WorkloadResource, env *commonmodels.Product, kubeClient crClient.Client, clientSet *kubernetes.Clientset) (replaceResources []commonmodels.Resource, relatedPodLabels []map[string]string, err error) { + deployments, statefulSets, _, _, _, err := kube.FetchSelectedWorkloads(env.Namespace, resources, kubeClient, clientSet) + if err != nil { + return nil, nil, err + } + + for _, deployment := range deployments { + err = updater.RestartDeploymentV2(ctx, clusterID, deployment.Namespace, deployment.Name) + if err != nil { + return nil, nil, fmt.Errorf("failed to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) + } + + selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, nil, fmt.Errorf("failed to get selector for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) + } + + // ensure latest replicaset to be created + replicaSets, err := getter.ListReplicaSets(deployment.Namespace, selector, kubeClient) + if err != nil { + return nil, nil, fmt.Errorf("failed to list replica sets for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) + } + + // Only include those whose ControllerRef matches the Deployment. + owned := make([]*appsv1.ReplicaSet, 0, len(replicaSets)) + for _, rs := range replicaSets { + if metav1.IsControlledBy(rs, deployment) { + owned = append(owned, rs) + } + } + if len(owned) <= 0 { + return nil, nil, fmt.Errorf("no replicaset found for deployment: %s", deployment.Name) + } + sort.Slice(owned, func(i, j int) bool { + return owned[i].CreationTimestamp.After(owned[j].CreationTimestamp.Time) + }) + + replaceResources = append(replaceResources, commonmodels.Resource{ + Kind: setting.Deployment, + Name: deployment.Name, + PodOwnerUID: string(owned[0].ObjectMeta.UID), + }) + relatedPodLabels = append(relatedPodLabels, deployment.Spec.Template.Labels) + } + + for _, sts := range statefulSets { + err = updater.RestartStatefulSetV2(ctx, clusterID, sts.Namespace, sts.Name) + if err != nil { + return nil, nil, fmt.Errorf("failed to restart statefulset %s/%s: %v", sts.Namespace, sts.Name, err) + } + + replaceResources = append(replaceResources, commonmodels.Resource{ + Kind: setting.StatefulSet, + Name: sts.Name, + PodOwnerUID: string(sts.ObjectMeta.UID), + }) + relatedPodLabels = append(relatedPodLabels, sts.Spec.Template.Labels) + } return replaceResources, relatedPodLabels, nil }