Skip to content

Commit 44f1139

Browse files
kumahq[bot]lukidzi
andauthored
fix(kds): reconnect mux client when GlobalToZone stream is closed by … (backport of #16326) (#16434)
Automatic cherry-pick of #16326 for branch release-2.12 Generated by [action](https://github.com/kumahq/kuma/actions/runs/25050771696) cherry-picked commit 666d45d ⚠️ ⚠️ ⚠️ Conflicts happened when cherry-picking! ⚠️ ⚠️ ⚠️ ``` On branch release-2.12 Your branch is up to date with 'origin/release-2.12'. You are currently cherry-picking commit 666d45d. (fix conflicts and run "git cherry-pick --continue") (use "git cherry-pick --skip" to skip this patch) (use "git cherry-pick --abort" to cancel the cherry-pick operation) Changes to be committed: new file: pkg/kds/mux/client_test.go modified: pkg/kds/v2/client/kds_client.go modified: pkg/kds/v2/client/stream.go modified: pkg/test/grpc/clientstream.go Unmerged paths: (use "git add <file>..." to mark resolution) both modified: pkg/kds/mux/client.go both modified: pkg/kds/mux/zone_sync.go both modified: pkg/kds/v2/client/zone_sync_test.go both modified: pkg/kds/zone/components_test.go both modified: pkg/test/kds/setup/client.go ``` --------- Signed-off-by: Lukasz Dziedziak <lukidzi@gmail.com> Co-authored-by: Lukasz Dziedziak <lukidzi@gmail.com>
1 parent 704d7a8 commit 44f1139

9 files changed

Lines changed: 303 additions & 45 deletions

File tree

pkg/kds/global/components.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func Setup(rt runtime.Runtime) error {
9393
}
9494
log := kdsDeltaGlobalLog.WithValues("peer-id", zoneID)
9595
log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
96-
kdsStream := kds_client_v2.NewDeltaKDSStream(stream, zoneID, rt, "", len(rt.KDSContext().TypesSentByZone))
96+
kdsStream := kds_client_v2.NewDeltaKDSStream(kds_client_v2.NewServerStreamAdapter(stream), zoneID, rt, "", len(rt.KDSContext().TypesSentByZone))
9797
sink := kds_client_v2.NewKDSSyncClient(
9898
log,
9999
rt.KDSContext().TypesSentByZone,

pkg/kds/mux/client.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,15 @@ func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, con
131131
log.Info("initializing Kuma Discovery Service (KDS) stream for global to zone sync of resources with delta xDS")
132132
stream, err := kdsClient.GlobalToZoneSync(ctx)
133133
if err != nil {
134-
errorCh <- err
134+
trySend(ctx, errorCh, err)
135135
return
136136
}
137137
processingErrorsCh := make(chan error)
138138
c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, processingErrorsCh)
139-
c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh)
139+
// Pass nil for stream: the callback creates a NewDeltaKDSStream wrapper
140+
// whose sendLoop calls CloseSend on the raw stream. Passing the raw
141+
// stream here would race with sendLoop's CloseSend.
142+
c.handleProcessingErrors(ctx, nil, log, processingErrorsCh, errorCh)
140143
}
141144

142145
func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) {
@@ -145,12 +148,27 @@ func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, con
145148
log.Info("initializing Kuma Discovery Service (KDS) stream for zone to global sync of resources with delta xDS")
146149
stream, err := kdsClient.ZoneToGlobalSync(ctx)
147150
if err != nil {
148-
errorCh <- err
151+
trySend(ctx, errorCh, err)
149152
return
150153
}
151154
processingErrorsCh := make(chan error)
152155
c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, processingErrorsCh)
153-
c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh)
156+
// Pass nil: same reason as GlobalToZone above.
157+
c.handleProcessingErrors(ctx, nil, log, processingErrorsCh, errorCh)
158+
}
159+
160+
// trySend attempts to send err to errorCh. If the context is already
161+
// done (another stream triggered a restart, or the app is shutting
162+
// down), the error is dropped. If errorCh already has an error from
163+
// another stream, the error is also dropped.
164+
func trySend(ctx context.Context, errorCh chan error, err error) {
165+
if ctx.Err() != nil {
166+
return
167+
}
168+
select {
169+
case errorCh <- err:
170+
default:
171+
}
154172
}
155173

156174
func (c *client) startXDSConfigs(
@@ -164,13 +182,13 @@ func (c *client) startXDSConfigs(
164182
log.Info("initializing rpc stream for executing config dump on data plane proxies")
165183
stream, err := client.StreamXDSConfigs(ctx)
166184
if err != nil {
167-
errorCh <- err
185+
trySend(ctx, errorCh, err)
168186
return
169187
}
170188

171189
processingErrorsCh := make(chan error)
172190
go c.envoyAdminProcessor.StartProcessingXDSConfigs(stream, processingErrorsCh)
173-
c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh)
191+
c.handleProcessingErrors(ctx, stream, log, processingErrorsCh, errorCh)
174192
}
175193

176194
func (c *client) startStats(
@@ -184,13 +202,13 @@ func (c *client) startStats(
184202
log.Info("initializing rpc stream for executing stats on data plane proxies")
185203
stream, err := client.StreamStats(ctx)
186204
if err != nil {
187-
errorCh <- err
205+
trySend(ctx, errorCh, err)
188206
return
189207
}
190208

191209
processingErrorsCh := make(chan error)
192210
go c.envoyAdminProcessor.StartProcessingStats(stream, processingErrorsCh)
193-
c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh)
211+
c.handleProcessingErrors(ctx, stream, log, processingErrorsCh, errorCh)
194212
}
195213

196214
func (c *client) startClusters(
@@ -204,13 +222,13 @@ func (c *client) startClusters(
204222
log.Info("initializing rpc stream for executing clusters on data plane proxies")
205223
stream, err := client.StreamClusters(ctx)
206224
if err != nil {
207-
errorCh <- err
225+
trySend(ctx, errorCh, err)
208226
return
209227
}
210228

211229
processingErrorsCh := make(chan error)
212230
go c.envoyAdminProcessor.StartProcessingClusters(stream, processingErrorsCh)
213-
c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh)
231+
c.handleProcessingErrors(ctx, stream, log, processingErrorsCh, errorCh)
214232
}
215233

216234
func (c *client) startHealthCheck(
@@ -235,7 +253,7 @@ func (c *client) startHealthCheck(
235253
return
236254
}
237255
log.Error(err, "health check failed")
238-
errorCh <- errors.Wrap(err, "zone health check request failed")
256+
trySend(ctx, errorCh, errors.Wrap(err, "zone health check request failed"))
239257
} else if interval := resp.Interval.AsDuration(); interval > 0 {
240258
if prevInterval != interval {
241259
prevInterval = interval
@@ -255,6 +273,7 @@ func (c *client) startHealthCheck(
255273
}
256274

257275
func (c *client) handleProcessingErrors(
276+
ctx context.Context,
258277
stream grpc.ClientStream,
259278
log logr.Logger,
260279
processingErrorsCh chan error,
@@ -266,18 +285,22 @@ func (c *client) handleProcessingErrors(
266285
// backwards compatibility. Do not rethrow error, so KDS multiplex can still operate.
267286
return
268287
}
269-
if errors.Is(err, context.Canceled) {
288+
if ctx.Err() != nil && (errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled) {
289+
// Suppress cancellation only when the zone CP itself is shutting
290+
// down (ctx is done). Server-initiated codes.Canceled (when ctx
291+
// is still alive) must trigger a reconnect.
270292
log.Info("rpc stream shutting down")
271-
// Let's not propagate this error further as we've already cancelled the context
272293
err = nil
273294
} else {
274295
log.Error(err, "rpc stream failed prematurely, will restart in background")
275296
}
276-
if err := stream.CloseSend(); err != nil {
277-
log.Error(err, "CloseSend returned an error")
297+
if stream != nil {
298+
if err := stream.CloseSend(); err != nil {
299+
log.Error(err, "CloseSend returned an error")
300+
}
278301
}
279302
if err != nil {
280-
errorCh <- err
303+
trySend(ctx, errorCh, err)
281304
}
282305
}
283306

pkg/kds/mux/client_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package mux_test
2+
3+
import (
4+
"context"
5+
"net"
6+
"sync"
7+
"time"
8+
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
"github.com/pkg/errors"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
15+
"google.golang.org/protobuf/types/known/durationpb"
16+
17+
mesh_proto "github.com/kumahq/kuma/v2/api/mesh/v1alpha1"
18+
kuma_cp "github.com/kumahq/kuma/v2/pkg/config/app/kuma-cp"
19+
"github.com/kumahq/kuma/v2/pkg/core"
20+
"github.com/kumahq/kuma/v2/pkg/core/runtime/component"
21+
"github.com/kumahq/kuma/v2/pkg/kds/mux"
22+
"github.com/kumahq/kuma/v2/pkg/kds/service"
23+
kds_client_v2 "github.com/kumahq/kuma/v2/pkg/kds/v2/client"
24+
core_metrics "github.com/kumahq/kuma/v2/pkg/metrics"
25+
kds_setup "github.com/kumahq/kuma/v2/pkg/test/kds/setup"
26+
)
27+
28+
// reconnectTrackingServer simulates a Global CP that closes the
29+
// GlobalToZoneSync stream on the first connection and then tracks
30+
// whether the zone mux client re-establishes it.
31+
type reconnectTrackingServer struct {
32+
mesh_proto.UnimplementedKDSSyncServiceServer
33+
mesh_proto.UnimplementedGlobalKDSServiceServer
34+
mu sync.Mutex
35+
globalToZoneConns int
36+
reconnectedOnce sync.Once
37+
reconnectedCh chan struct{} // closed on second GlobalToZoneSync call
38+
firstConnErrCode codes.Code // if non-zero, return this status code on first connection instead of nil
39+
}
40+
41+
func (s *reconnectTrackingServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
42+
s.mu.Lock()
43+
s.globalToZoneConns++
44+
count := s.globalToZoneConns
45+
s.mu.Unlock()
46+
47+
if count == 1 {
48+
select {
49+
case <-time.After(300 * time.Millisecond):
50+
case <-stream.Context().Done():
51+
return nil
52+
}
53+
if s.firstConnErrCode != codes.OK {
54+
return status.Error(s.firstConnErrCode, "stream terminated by global")
55+
}
56+
return nil
57+
}
58+
59+
s.reconnectedOnce.Do(func() { close(s.reconnectedCh) })
60+
<-stream.Context().Done()
61+
return nil
62+
}
63+
64+
func (s *reconnectTrackingServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
65+
<-stream.Context().Done()
66+
return nil
67+
}
68+
69+
func (s *reconnectTrackingServer) HealthCheck(_ context.Context, _ *mesh_proto.ZoneHealthCheckRequest) (*mesh_proto.ZoneHealthCheckResponse, error) {
70+
return &mesh_proto.ZoneHealthCheckResponse{
71+
Interval: durationpb.New(time.Minute),
72+
}, nil
73+
}
74+
75+
func (s *reconnectTrackingServer) StreamXDSConfigs(stream mesh_proto.GlobalKDSService_StreamXDSConfigsServer) error {
76+
<-stream.Context().Done()
77+
return nil
78+
}
79+
80+
func (s *reconnectTrackingServer) StreamStats(stream mesh_proto.GlobalKDSService_StreamStatsServer) error {
81+
<-stream.Context().Done()
82+
return nil
83+
}
84+
85+
func (s *reconnectTrackingServer) StreamClusters(stream mesh_proto.GlobalKDSService_StreamClustersServer) error {
86+
<-stream.Context().Done()
87+
return nil
88+
}
89+
90+
var _ = Describe("Client", func() {
91+
type testCase struct {
92+
description string
93+
errCode codes.Code
94+
}
95+
96+
DescribeTable("reconnects when globalToZone stream is terminated by server",
97+
func(tc testCase) {
98+
svc := &reconnectTrackingServer{
99+
reconnectedCh: make(chan struct{}),
100+
firstConnErrCode: tc.errCode,
101+
}
102+
lis, err := net.Listen("tcp", "127.0.0.1:0")
103+
Expect(err).ToNot(HaveOccurred())
104+
defer lis.Close()
105+
106+
grpcSrv := grpc.NewServer()
107+
mesh_proto.RegisterKDSSyncServiceServer(grpcSrv, svc)
108+
mesh_proto.RegisterGlobalKDSServiceServer(grpcSrv, svc)
109+
go func() { _ = grpcSrv.Serve(lis) }()
110+
defer grpcSrv.Stop()
111+
112+
cfg := kuma_cp.DefaultConfig()
113+
cfg.Multizone.Zone.Name = "zone-1"
114+
rt := kds_setup.NewTestRuntime(context.Background(), cfg, nil)
115+
116+
metrics, err := core_metrics.NewMetrics("")
117+
Expect(err).ToNot(HaveOccurred())
118+
119+
// GlobalToZone callback: run a KDS sync client that will
120+
// encounter io.EOF when the server closes the stream.
121+
// The error must propagate to errChan to trigger reconnect.
122+
globalToZoneCb := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncClient, errChan chan error) {
123+
kdsStream := kds_client_v2.NewDeltaKDSStream(stream, "zone-1", rt, "", len(rt.KDSContext().TypesSentByGlobal))
124+
syncClient := kds_client_v2.NewKDSSyncClient(
125+
core.Log.WithName("test-g2z"),
126+
rt.KDSContext().TypesSentByGlobal,
127+
kdsStream,
128+
nil,
129+
0,
130+
)
131+
go func() {
132+
err := syncClient.Receive()
133+
if err != nil && !errors.Is(err, context.Canceled) {
134+
select {
135+
case errChan <- errors.Wrap(err, "GlobalToZoneSyncClient finished with an error"):
136+
default:
137+
}
138+
}
139+
}()
140+
})
141+
142+
// ZoneToGlobal callback: no-op, just block.
143+
zoneToGlobalCb := mux.OnZoneToGlobalSyncStartedFunc(func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient, errChan chan error) {
144+
go func() {
145+
<-stream.Context().Done()
146+
}()
147+
})
148+
149+
muxClient := mux.NewClient(
150+
context.Background(),
151+
"grpc://"+lis.Addr().String(),
152+
"zone-1",
153+
globalToZoneCb,
154+
zoneToGlobalCb,
155+
*cfg.Multizone.Zone.KDS,
156+
cfg.Experimental,
157+
metrics,
158+
service.NewEnvoyAdminProcessor(rt.ReadOnlyResourceManager(), rt.EnvoyAdminClient()),
159+
)
160+
161+
resilient := component.NewResilientComponent(
162+
core.Log.WithName("test-resilient"),
163+
muxClient,
164+
1*time.Millisecond,
165+
10*time.Millisecond,
166+
)
167+
168+
stop := make(chan struct{})
169+
go func() { _ = resilient.Start(stop) }()
170+
defer close(stop)
171+
172+
Eventually(svc.reconnectedCh, "10s", "100ms").Should(BeClosed())
173+
},
174+
Entry("server returns nil (io.EOF)", testCase{
175+
description: "LB or global CP closes stream cleanly",
176+
errCode: codes.OK,
177+
}),
178+
Entry("server returns Canceled", testCase{
179+
description: "global CP explicitly cancels the stream",
180+
errCode: codes.Canceled,
181+
}),
182+
)
183+
})

pkg/kds/v2/client/kds_client.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package client
22

33
import (
44
std_errors "errors"
5-
"io"
65
"time"
76

87
"github.com/go-logr/logr"
@@ -44,6 +43,7 @@ type DeltaKDSStream interface {
4443
Receive() (UpstreamResponse, error)
4544
ACK(resourceType core_model.ResourceType) error
4645
NACK(resourceType core_model.ResourceType, err error) error
46+
CloseSend() error
4747
}
4848

4949
type KDSSyncClient interface {
@@ -85,9 +85,6 @@ func (s *kdsSyncClient) Receive() error {
8585
for {
8686
received, err := s.kdsStream.Receive()
8787
if err != nil {
88-
if err == io.EOF {
89-
return nil
90-
}
9188
return errors.Wrap(err, "failed to receive a discovery response")
9289
}
9390
s.log.V(1).Info("DeltaDiscoveryResponse received", "response", received)
@@ -97,18 +94,12 @@ func (s *kdsSyncClient) Receive() error {
9794
if validationErrors != nil {
9895
s.log.Info("received resource is invalid, sending NACK", "err", validationErrors)
9996
if err := s.kdsStream.NACK(received.Type, validationErrors); err != nil {
100-
if err == io.EOF {
101-
return nil
102-
}
10397
return errors.Wrap(err, "failed to NACK a discovery response")
10498
}
10599
continue
106100
}
107101
s.log.Info("no callback set, sending ACK", "type", string(received.Type))
108102
if err := s.kdsStream.ACK(received.Type); err != nil {
109-
if err == io.EOF {
110-
return nil
111-
}
112103
return errors.Wrap(err, "failed to ACK a discovery response")
113104
}
114105
continue
@@ -121,9 +112,6 @@ func (s *kdsSyncClient) Receive() error {
121112
combinedErrors := std_errors.Join(nackError, validationErrors)
122113
s.log.Info("received resource is invalid, sending NACK", "err", combinedErrors)
123114
if err := s.kdsStream.NACK(received.Type, combinedErrors); err != nil {
124-
if err == io.EOF {
125-
return nil
126-
}
127115
return errors.Wrap(err, "failed to NACK a discovery response")
128116
}
129117
continue
@@ -135,9 +123,6 @@ func (s *kdsSyncClient) Receive() error {
135123
}
136124
s.log.V(1).Info("sending ACK", "type", received.Type)
137125
if err := s.kdsStream.ACK(received.Type); err != nil {
138-
if err == io.EOF {
139-
return nil
140-
}
141126
return errors.Wrap(err, "failed to ACK a discovery response")
142127
}
143128
}

0 commit comments

Comments
 (0)