Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1beta1/slicegateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type TunnelStatus struct {
TxRate uint64 `json:"TxRate,omitempty"`
RxRate uint64 `json:"RxRate,omitempty"`
PacketLoss uint64 `json:"PacketLoss,omitempty"`
// Status is the status of the tunnel. 0: DOWN, 1: UP
// Status is the status of the tunnel. 0: UP, 1: DOWN (protobuf TunnelStatusType)
Status int32 `json:"Status,omitempty"`
// TunnelState is the state of the tunnel in string format: UP, DOWN, UNKNOWN
TunnelState string `json:"TunnelState,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/networking.kubeslice.io_slicegateways.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ spec:
format: int64
type: integer
Status:
description: 'Status is the status of the tunnel. 0: DOWN,
1: UP'
description: 'Status is the status of the tunnel. 0: UP,
1: DOWN (protobuf TunnelStatusType)'
format: int32
type: integer
TunnelState:
Expand Down
66 changes: 55 additions & 11 deletions controllers/slicegateway/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,12 @@ func isGWPodStatusChanged(slicegateway *kubeslicev1beta1.SliceGateway, gwPod *ku
gwPodStatus := slicegateway.Status.GatewayPodStatus
for _, gw := range gwPodStatus {
if gw.PodName == gwPod.PodName {
// Check if tunnel status has changed by comparing all relevant fields
// Return true only if NOTHING has changed
// Check if tunnel status has changed by comparing fields that matter for
// connection context and routing. Exclude volatile metrics (Latency, TxRate,
// RxRate, PacketLoss) so that normal fluctuation does not cause a status update
// and requeue on every reconcile; otherwise we never reach
// SendConnectionContextToSliceRouter and the vl3 router never gets routes.
tunnelUnchanged := gw.TunnelStatus.Status == gwPod.TunnelStatus.Status &&
gw.TunnelStatus.Latency == gwPod.TunnelStatus.Latency &&
gw.TunnelStatus.RxRate == gwPod.TunnelStatus.RxRate &&
gw.TunnelStatus.TxRate == gwPod.TunnelStatus.TxRate &&
gw.TunnelStatus.PacketLoss == gwPod.TunnelStatus.PacketLoss &&
gw.TunnelStatus.RemoteIP == gwPod.TunnelStatus.RemoteIP &&
gw.TunnelStatus.LocalIP == gwPod.TunnelStatus.LocalIP &&
gw.TunnelStatus.IntfName == gwPod.TunnelStatus.IntfName &&
Expand Down Expand Up @@ -193,16 +192,61 @@ func isPodPresentInPodList(podList *corev1.PodList, podName string) bool {
return false
}

// Helper function to find existing GwPodInfo by name in the status
// findGwPodInfo finds existing GwPodInfo by pod name in the gateway status.
func findGwPodInfo(gwPodStatus []*kubeslicev1beta1.GwPodInfo, podName string) *kubeslicev1beta1.GwPodInfo {
for _, gwPod := range gwPodStatus {
if gwPod.PodName == podName {
if gwPod != nil && gwPod.PodName == podName {
return gwPod
}
}
return nil
}

// ValidateGatewayPodReadiness checks if a gateway pod is ready for FSM trigger.
// This includes validating:
// 1. Pod exists in gateway status
// 2. Tunnel is UP and operational
// 3. TUN interface is created and configured
// 4. Peer pod information is available
//
// Returns an error with specific message patterns that callers can check
// to determine if the error is transient (should retry).
func ValidateGatewayPodReadiness(sliceGw *kubeslicev1beta1.SliceGateway, podName string) error {
if sliceGw == nil {
return fmt.Errorf("sliceGw is nil")
}

// Check 1: Find pod info in status
podInfo := findGwPodInfo(sliceGw.Status.GatewayPodStatus, podName)
if podInfo == nil {
return fmt.Errorf("pod %s not found in gateway status", podName)
}

// Check 2: Validate tunnel status is UP
if podInfo.TunnelStatus.Status != int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP) {
return fmt.Errorf("tunnel not up for pod %s, current status: %d", podName, podInfo.TunnelStatus.Status)
}

// Check 3: Validate TUN interface is configured
if podInfo.TunnelStatus.IntfName == "" {
return fmt.Errorf("tunnel interface name not set for pod %s", podName)
}

// Check 4: Validate tunnel has local and remote IPs
if podInfo.TunnelStatus.LocalIP == "" || podInfo.TunnelStatus.RemoteIP == "" {
return fmt.Errorf("tunnel IPs not configured for pod %s (local: %s, remote: %s)",
podName, podInfo.TunnelStatus.LocalIP, podInfo.TunnelStatus.RemoteIP)
}

// Check 5: Validate peer pod information is available
if podInfo.PeerPodName == "" {
return fmt.Errorf("peer pod name not available for pod %s", podName)
}

// All checks passed - pod is ready
return nil
}

func getPodPairToRebalance(podsOnNode []corev1.Pod, sliceGw *kubeslicev1beta1.SliceGateway) (string, string) {
for _, pod := range podsOnNode {
podInfo := findGwPodInfo(sliceGw.Status.GatewayPodStatus, pod.Name)
Expand All @@ -223,13 +267,13 @@ func getPodPairToRebalance(podsOnNode []corev1.Pod, sliceGw *kubeslicev1beta1.Sl
func GetPeerGwPodName(gwPodName string, sliceGw *kubeslicev1beta1.SliceGateway) (string, error) {
podInfo := findGwPodInfo(sliceGw.Status.GatewayPodStatus, gwPodName)
if podInfo == nil {
return "", errors.New("gw pod not found")
return "", fmt.Errorf("gw pod %s not found in status", gwPodName)
}
if podInfo.TunnelStatus.Status != int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP) {
return "", errors.New("gw tunnel is down")
return "", fmt.Errorf("gw tunnel is down for pod %s, current status: %d", gwPodName, podInfo.TunnelStatus.Status)
}
if podInfo.PeerPodName == "" {
return "", errors.New("gw peer pod info unavailable")
return "", fmt.Errorf("gw peer pod info unavailable for pod %s", gwPodName)
}

return podInfo.PeerPodName, nil
Expand Down
165 changes: 165 additions & 0 deletions controllers/slicegateway/utils_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package slicegateway

import (
"testing"

gwsidecarpb "github.com/kubeslice/gateway-sidecar/pkg/sidecar/sidecarpb"
kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
)

func TestValidateGatewayPodReadiness(t *testing.T) {
tests := []struct {
name string
sliceGw *kubeslicev1beta1.SliceGateway
podName string
wantErr bool
errText string
}{
{
name: "nil slice gateway",
sliceGw: nil,
podName: "test-pod",
wantErr: true,
errText: "sliceGw is nil",
},
{
name: "pod not found in status",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{},
},
},
podName: "test-pod",
wantErr: true,
errText: "not found in gateway status",
},
{
name: "tunnel not up",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{
{
PodName: "test-pod",
TunnelStatus: kubeslicev1beta1.TunnelStatus{
Status: int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_DOWN),
},
},
},
},
},
podName: "test-pod",
wantErr: true,
errText: "tunnel not up",
},
{
name: "tunnel interface not set",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{
{
PodName: "test-pod",
TunnelStatus: kubeslicev1beta1.TunnelStatus{
Status: int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP),
IntfName: "",
},
},
},
},
},
podName: "test-pod",
wantErr: true,
errText: "tunnel interface name not set",
},
{
name: "tunnel IPs not configured",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{
{
PodName: "test-pod",
TunnelStatus: kubeslicev1beta1.TunnelStatus{
Status: int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP),
IntfName: "tun0",
LocalIP: "",
RemoteIP: "10.0.0.2",
},
},
},
},
},
podName: "test-pod",
wantErr: true,
errText: "tunnel IPs not configured",
},
{
name: "peer pod name missing",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{
{
PodName: "test-pod",
TunnelStatus: kubeslicev1beta1.TunnelStatus{
Status: int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP),
IntfName: "tun0",
LocalIP: "10.0.0.1",
RemoteIP: "10.0.0.2",
},
PeerPodName: "",
},
},
},
},
podName: "test-pod",
wantErr: true,
errText: "peer pod name not available",
},
{
name: "all checks pass",
sliceGw: &kubeslicev1beta1.SliceGateway{
Status: kubeslicev1beta1.SliceGatewayStatus{
GatewayPodStatus: []*kubeslicev1beta1.GwPodInfo{
{
PodName: "test-pod",
TunnelStatus: kubeslicev1beta1.TunnelStatus{
Status: int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP),
IntfName: "tun0",
LocalIP: "10.0.0.1",
RemoteIP: "10.0.0.2",
},
PeerPodName: "peer-pod",
},
},
},
},
podName: "test-pod",
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ValidateGatewayPodReadiness(tt.sliceGw, tt.podName)

if (err != nil) != tt.wantErr {
t.Errorf("ValidateGatewayPodReadiness() error = %v, wantErr %v", err, tt.wantErr)
return
}

if tt.wantErr && err != nil {
if tt.errText != "" && !stringContains(err.Error(), tt.errText) {
t.Errorf("ValidateGatewayPodReadiness() error = %v, expected to contain %q", err, tt.errText)
}
}
})
}
}

// stringContains checks if string s contains substring
func stringContains(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
83 changes: 78 additions & 5 deletions pkg/hub/controllers/vpnkeyrotation/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,38 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (ctrl
log.Error(err, "err listing gw pods")
return ctrl.Result{}, err
}
// trigger FSM for all the gateway pods before updating the status and timestamp

// STEP 1: Validate all gateway pods are ready before triggering FSM
log.Info("Validating gateway pod readiness before FSM trigger",
"gateway", selectedGw,
"podCount", len(podsUnderGw.Items))

for _, pod := range podsUnderGw.Items {
if err := slicegateway.ValidateGatewayPodReadiness(sliceGw, pod.Name); err != nil {
errMsg := err.Error()
// Check for transient errors that should trigger a requeue
if strings.Contains(errMsg, "not found in gateway status") {
log.Info("Gateway pod status not yet populated, requeueing", "pod", pod.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
if strings.Contains(errMsg, "tunnel not up") || strings.Contains(errMsg, "tunnel interface name not set") {
log.Info("Waiting for tunnel to come up, requeueing", "pod", pod.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
if strings.Contains(errMsg, "tunnel IPs not configured") || strings.Contains(errMsg, "peer pod name not available") {
log.Info("Waiting for tunnel configuration, requeueing", "pod", pod.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
log.Error(err, "Failed to validate gateway pod readiness", "pod", pod.Name)
return ctrl.Result{}, err
}
}

log.Info("All gateway pods validated and ready for FSM trigger",
"gateway", selectedGw,
"podCount", len(podsUnderGw.Items))

// STEP 2: trigger FSM for all the gateway pods before updating the status and timestamp
for _, v := range podsUnderGw.Items {
// trigger FSM to recylce both gateway pod pairs
slice, err := controllers.GetSlice(ctx, r.WorkerClient, sliceName)
Expand All @@ -167,18 +198,60 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (ctrl

peerPodName, err := slicegateway.GetPeerGwPodName(v.Name, sliceGw)
if err != nil {
// Wrap transient errors as retryable
errMsg := err.Error()
if strings.Contains(errMsg, "not found in status") {
log.Info("Gateway pod status not yet populated, requeueing", "pod", v.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
if strings.Contains(errMsg, "tunnel is down") {
log.Info("Waiting for tunnel to come up, requeueing", "pod", v.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
if strings.Contains(errMsg, "peer pod info unavailable") {
log.Info("Peer pod information not yet synchronized, requeueing", "pod", v.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
log.Error(err, "Failed to get peer pod name for gw pod", "pod", v.Name)
return ctrl.Result{}, err
}

serverID := slicegateway.GetDepNameFromPodName(sliceGw.Status.Config.SliceGatewayID, v.Name)
clientID := slicegateway.GetDepNameFromPodName(sliceGw.Status.Config.SliceGatewayRemoteGatewayID, peerPodName)
// using pod name as a unique identifier
err = r.WorkerRecyclerClient.TriggerFSM(sliceGw, slice, serverID, clientID, controllerName)

// STEP 2b: Trigger FSM with retry on transient errors
err = retry.OnError(
retry.DefaultRetry, // Uses exponential backoff
func(err error) bool {
// Retry on specific transient errors
if err == nil {
return false
}
errStr := err.Error()
return apierrors.IsTimeout(err) ||
apierrors.IsServerTimeout(err) ||
apierrors.IsServiceUnavailable(err) ||
strings.Contains(errStr, "connection refused") ||
strings.Contains(errStr, "i/o timeout") ||
strings.Contains(errStr, "deadline exceeded")
},
func() error {
return r.WorkerRecyclerClient.TriggerFSM(sliceGw, slice, serverID, clientID, controllerName)
},
)
if err != nil {
log.Error(err, "Err(): triggering FSM")
return ctrl.Result{}, err
log.Error(err, "Err(): triggering FSM after retries",
"pod", v.Name,
"serverID", serverID,
"clientID", clientID)
// Return with requeue instead of failing permanently
return ctrl.Result{RequeueAfter: 30 * time.Second}, err
}

log.Info("Successfully triggered FSM for gateway pod",
"pod", v.Name,
"serverID", serverID,
"clientID", clientID)
}
utils.RecordEvent(ctx, r.EventRecorder, vpnKeyRotation, nil, ossEvents.EventTriggeredFSMToRecycleGateways, controllerName)
// update the last Updated Timestamp so even if it requeues the rotation Time Diff will not be greater than 0
Expand Down
Loading
Loading