Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ package jobcontroller
import (
"context"
"fmt"
"sort"
"time"

"github.com/koderover/zadig/v2/pkg/tool/clientmanager"
helmtool "github.com/koderover/zadig/v2/pkg/tool/helmclient"
"github.com/koderover/zadig/v2/pkg/tool/kube/getter"
"github.com/koderover/zadig/v2/pkg/tool/kube/updater"
"github.com/koderover/zadig/v2/pkg/util"
"github.com/pkg/errors"
"go.uber.org/zap"
versionedclient "istio.io/client-go/pkg/clientset/versioned"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
crClient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -168,7 +173,7 @@ func (c *RestartJobCtl) restartK8sService(ctx context.Context, env *commonmodels
return fmt.Errorf("failed to fetch imported manifests: %v", err)
}

replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env)
replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env, c.kubeClient, c.clientSet)
if err != nil {
return fmt.Errorf("failed to restart workload resources: %v", err)
}
Expand Down Expand Up @@ -225,7 +230,7 @@ func (c *RestartJobCtl) restartHelmService(ctx context.Context, env *commonmodel
}
}

replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env)
replaceResources, relatedPodLabels, err := restartWorkloadResources(ctx, c.jobTaskSpec.ClusterID, resources, env, c.kubeClient, c.clientSet)
if err != nil {
return fmt.Errorf("failed to restart workload resources: %v", err)
}
Expand All @@ -236,64 +241,64 @@ func (c *RestartJobCtl) restartHelmService(ctx context.Context, env *commonmodel
return nil
}

func restartWorkloadResources(ctx context.Context, clusterID string, resources []*kube.WorkloadResource, env *commonmodels.Product) (replaceResources []commonmodels.Resource, relatedPodLabels []map[string]string, err error) {
// deployments, statefulSets, _, _, _, err := kube.FetchSelectedWorkloads(env.Namespace, resources, kubeClient, clientSet)
// if err != nil {
// return nil, nil, err
// }

// for _, deployment := range deployments {
// err = updater.RestartDeploymentV2(ctx, clusterID, deployment.Namespace, deployment.Name)
// if err != nil {
// return nil, nil, fmt.Errorf("failed to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
// }

// selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
// if err != nil {
// return nil, nil, fmt.Errorf("failed to get selector for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
// }

// // ensure latest replicaset to be created
// replicaSets, err := getter.ListReplicaSets(deployment.Namespace, selector, kubeClient)
// if err != nil {
// return nil, nil, fmt.Errorf("failed to list replica sets for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
// }

// // Only include those whose ControllerRef matches the Deployment.
// owned := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
// for _, rs := range replicaSets {
// if metav1.IsControlledBy(rs, deployment) {
// owned = append(owned, rs)
// }
// }
// if len(owned) <= 0 {
// return nil, nil, fmt.Errorf("no replicaset found for deployment: %s", deployment.Name)
// }
// sort.Slice(owned, func(i, j int) bool {
// return owned[i].CreationTimestamp.After(owned[j].CreationTimestamp.Time)
// })

// replaceResources = append(replaceResources, commonmodels.Resource{
// Kind: setting.Deployment,
// Name: deployment.Name,
// PodOwnerUID: string(owned[0].ObjectMeta.UID),
// })
// relatedPodLabels = append(relatedPodLabels, deployment.Spec.Template.Labels)
// }

// for _, sts := range statefulSets {
// err = updater.RestartStatefulSetV2(ctx, clusterID, sts.Namespace, sts.Name)
// if err != nil {
// return nil, nil, fmt.Errorf("failed to restart statefulset %s/%s: %v", sts.Namespace, sts.Name, err)
// }

// replaceResources = append(replaceResources, commonmodels.Resource{
// Kind: setting.StatefulSet,
// Name: sts.Name,
// PodOwnerUID: string(sts.ObjectMeta.UID),
// })
// relatedPodLabels = append(relatedPodLabels, sts.Spec.Template.Labels)
// }
func restartWorkloadResources(ctx context.Context, clusterID string, resources []*kube.WorkloadResource, env *commonmodels.Product, kubeClient crClient.Client, clientSet *kubernetes.Clientset) (replaceResources []commonmodels.Resource, relatedPodLabels []map[string]string, err error) {
deployments, statefulSets, _, _, _, err := kube.FetchSelectedWorkloads(env.Namespace, resources, kubeClient, clientSet)
if err != nil {
return nil, nil, err
}

for _, deployment := range deployments {
err = updater.RestartDeploymentV2(ctx, clusterID, deployment.Namespace, deployment.Name)
if err != nil {
return nil, nil, fmt.Errorf("failed to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
}

selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, nil, fmt.Errorf("failed to get selector for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
}

// ensure latest replicaset to be created
replicaSets, err := getter.ListReplicaSets(deployment.Namespace, selector, kubeClient)
if err != nil {
return nil, nil, fmt.Errorf("failed to list replica sets for deployment %s/%s: %v", deployment.Namespace, deployment.Name, err)
}

// Only include those whose ControllerRef matches the Deployment.
owned := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
for _, rs := range replicaSets {
if metav1.IsControlledBy(rs, deployment) {
owned = append(owned, rs)
}
}
if len(owned) <= 0 {
return nil, nil, fmt.Errorf("no replicaset found for deployment: %s", deployment.Name)
}
sort.Slice(owned, func(i, j int) bool {
return owned[i].CreationTimestamp.After(owned[j].CreationTimestamp.Time)
})

replaceResources = append(replaceResources, commonmodels.Resource{
Kind: setting.Deployment,
Name: deployment.Name,
PodOwnerUID: string(owned[0].ObjectMeta.UID),
})
relatedPodLabels = append(relatedPodLabels, deployment.Spec.Template.Labels)
}

for _, sts := range statefulSets {
err = updater.RestartStatefulSetV2(ctx, clusterID, sts.Namespace, sts.Name)
if err != nil {
return nil, nil, fmt.Errorf("failed to restart statefulset %s/%s: %v", sts.Namespace, sts.Name, err)
}

replaceResources = append(replaceResources, commonmodels.Resource{
Kind: setting.StatefulSet,
Name: sts.Name,
PodOwnerUID: string(sts.ObjectMeta.UID),
})
relatedPodLabels = append(relatedPodLabels, sts.Spec.Template.Labels)
}

return replaceResources, relatedPodLabels, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2942,7 +2942,7 @@ func restartRelatedWorkloads(env *commonmodels.Product, service *commonmodels.Pr
err = updater.RestartDeploymentV2(context.Background(), clusterID, env.Namespace, u.GetName())
return errors.Wrapf(err, "failed to restart deployment %s", u.GetName())
case setting.StatefulSet:
// err = updater.RestartStatefulSet(env.Namespace, u.GetName(), kubeClient)
err = updater.RestartStatefulSetV2(context.Background(), clusterID, env.Namespace, u.GetName())
return errors.Wrapf(err, "failed to restart statefulset %s", u.GetName())
}
}
Expand Down
29 changes: 12 additions & 17 deletions pkg/tool/kube/updater/deployment_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -49,19 +48,15 @@ func RestartDeploymentV2(ctx context.Context, clusterID, namespace, name string)
return fmt.Errorf("failed to get deployment %s/%s: %w", namespace, name, err)
}

selector, err := metav1.LabelSelectorAsSelector(deploy.Spec.Selector)
if err != nil {
return fmt.Errorf("failed to parse deployment selector: %w", err)
}

deleteOpts := []client.DeleteAllOfOption{
client.InNamespace(namespace),
client.MatchingLabelsSelector{Selector: selector},
now := time.Now().Format(time.RFC3339Nano)
patch := client.MergeFrom(deploy.DeepCopy())
if deploy.Spec.Template.Annotations == nil {
deploy.Spec.Template.Annotations = map[string]string{}
}
deploy.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = now

pod := &corev1.Pod{}
if err := c.DeleteAllOf(ctx, pod, deleteOpts...); err != nil {
return fmt.Errorf("failed to delete pods for deployment %s/%s: %w", namespace, name, err)
if err := c.Patch(ctx, deploy, patch); err != nil {
return fmt.Errorf("failed to restart %s/deploy/%s: %w", namespace, name, err)
}

return nil
Expand All @@ -79,7 +74,7 @@ func DeleteDeploymentV2(ctx context.Context, clusterID, namespace string, opts .
if config.name != "" && config.selector != "" {
return fmt.Errorf("cannot specify both name and selector simultaneously")
}

c, err := clientmanager.NewKubeClientManager().GetControllerRuntimeClient(clusterID)
if err != nil {
return fmt.Errorf("failed to get kube client: %w", err)
Expand All @@ -92,12 +87,12 @@ func DeleteDeploymentV2(ctx context.Context, clusterID, namespace string, opts .
Name: config.name,
},
}

propagationPolicy := metav1.DeletePropagationBackground
deleteOpts := &client.DeleteOptions{
PropagationPolicy: &propagationPolicy,
}

err = c.Delete(ctx, deploy, deleteOpts)
return util.IgnoreNotFoundError(err)
}
Expand Down Expand Up @@ -200,7 +195,7 @@ func DeleteDeploymentAndWaitV2(ctx context.Context, clusterID, namespace string,
return nil
}

// CreateOrPatchDeploymentV2 is used when the YAML is fully controlled by this system, it implements a 3-way merge patch for the deployment.
// CreateOrPatchDeploymentV2 is used when the YAML is fully controlled by this system, it implements a 3-way merge patch for the deployment.
// If we are simply editing the deployment, use UpdateDeploymentV2 instead.
func CreateOrPatchDeploymentV2(ctx context.Context, clusterID, namespace, originalYAML, targetYAML string, resourceOverride bool) error {
c, err := clientmanager.NewKubeClientManager().GetKubernetesClientSet(clusterID)
Expand Down Expand Up @@ -438,4 +433,4 @@ func CreateDeploymentV2(ctx context.Context, clusterID, namespace string, deploy
}

return nil
}
}
19 changes: 7 additions & 12 deletions pkg/tool/kube/updater/statefulset_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -49,19 +48,15 @@ func RestartStatefulSetV2(ctx context.Context, clusterID, namespace, name string
return fmt.Errorf("failed to get statefulset %s/%s: %w", namespace, name, err)
}

selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector)
if err != nil {
return fmt.Errorf("failed to parse statefulset selector: %w", err)
}

deleteOpts := []client.DeleteAllOfOption{
client.InNamespace(namespace),
client.MatchingLabelsSelector{Selector: selector},
now := time.Now().Format(time.RFC3339Nano)
patch := client.MergeFrom(sts.DeepCopy())
if sts.Spec.Template.Annotations == nil {
sts.Spec.Template.Annotations = map[string]string{}
}
sts.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = now

pod := &corev1.Pod{}
if err := c.DeleteAllOf(ctx, pod, deleteOpts...); err != nil {
return fmt.Errorf("failed to delete pods for statefulset %s/%s: %w", namespace, name, err)
if err := c.Patch(ctx, sts, patch); err != nil {
return fmt.Errorf("failed to restart %s/statefulset/%s: %w", namespace, name, err)
}

return nil
Expand Down
Loading