Skip to content

Commit 121be03

Browse files
[release-2.5] fix: Add label selector to ConfigMap cache to prevent OOM via informer flooding (#2881)
- Add label selector to ConfigMap cache filter in newCacheOptions() - Add label to buildPrometheusConfigMap() for cache visibility - Handle upgrade path for pre-existing ConfigMaps without the label using merge patch to avoid resourceVersion rejection and preserve existing labels - Propagate label on ConfigMap updates, not just creation - Add filteredCacheClient test wrapper to simulate label-filtered informer cache - Add test coverage for label assertion and upgrade scenario exercising the full Get->NotFound->Create->AlreadyExists->Patch code path Signed-off-by: roburishabh <roburishabh@outlook.com> Co-authored-by: roburishabh <roburishabh@outlook.com>
1 parent 9a83d4f commit 121be03

3 files changed

Lines changed: 130 additions & 5 deletions

File tree

cmd/operator/controller/start.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,11 @@ func newCacheOptions() cache.Options {
415415
common.LabelLaunchedBySparkOperator: "true",
416416
}),
417417
},
418-
&corev1.ConfigMap{}: {},
418+
&corev1.ConfigMap{}: {
419+
Label: labels.SelectorFromSet(labels.Set{
420+
common.LabelCreatedBySparkOperator: "true",
421+
}),
422+
},
419423
&corev1.PersistentVolumeClaim{}: {},
420424
&corev1.Service{}: {},
421425
&v1beta2.SparkApplication{}: {},

internal/controller/sparkapplication/monitoring_config.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"github.com/kubeflow/spark-operator/v2/pkg/util"
3434
)
3535

36-
func configPrometheusMonitoring(ctx context.Context, app *v1beta2.SparkApplication, client client.Client) error {
36+
func configPrometheusMonitoring(ctx context.Context, app *v1beta2.SparkApplication, c client.Client) error {
3737
logger := log.FromContext(ctx)
3838
port := common.DefaultPrometheusJavaAgentPort
3939
if app.Spec.Monitoring.Prometheus != nil && app.Spec.Monitoring.Prometheus.Port != nil {
@@ -47,14 +47,41 @@ func configPrometheusMonitoring(ctx context.Context, app *v1beta2.SparkApplicati
4747
key := types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}
4848
if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
4949
cm := &corev1.ConfigMap{}
50-
if err := client.Get(ctx, key, cm); err != nil {
50+
if err := c.Get(ctx, key, cm); err != nil {
5151
if errors.IsNotFound(err) {
52-
return client.Create(ctx, configMap)
52+
if createErr := c.Create(ctx, configMap); createErr != nil {
53+
// Handle upgrade scenario: ConfigMap exists in the cluster but is not
54+
// visible in the filtered cache because it was created before the
55+
// LabelCreatedBySparkOperator label selector was added to the informer.
56+
// Use Patch (merge patch) instead of Update because we cannot Get the
57+
// object from the filtered cache to obtain its resourceVersion.
58+
if errors.IsAlreadyExists(createErr) {
59+
base := &corev1.ConfigMap{
60+
ObjectMeta: metav1.ObjectMeta{
61+
Name: key.Name,
62+
Namespace: key.Namespace,
63+
},
64+
}
65+
desired := base.DeepCopy()
66+
desired.Labels = map[string]string{
67+
common.LabelCreatedBySparkOperator: "true",
68+
}
69+
desired.Data = configMap.Data
70+
desired.OwnerReferences = configMap.OwnerReferences
71+
return c.Patch(ctx, desired, client.MergeFrom(base))
72+
}
73+
return createErr
74+
}
75+
return nil
5376
}
5477
return err
5578
}
5679
cm.Data = configMap.Data
57-
return client.Update(ctx, cm)
80+
if cm.Labels == nil {
81+
cm.Labels = map[string]string{}
82+
}
83+
cm.Labels[common.LabelCreatedBySparkOperator] = "true"
84+
return c.Update(ctx, cm)
5885
}); retryErr != nil {
5986
return retryErr
6087
}
@@ -146,6 +173,9 @@ func buildPrometheusConfigMap(app *v1beta2.SparkApplication, prometheusConfigMap
146173
Name: prometheusConfigMapName,
147174
Namespace: app.Namespace,
148175
OwnerReferences: []metav1.OwnerReference{util.GetOwnerReference(app)},
176+
Labels: map[string]string{
177+
common.LabelCreatedBySparkOperator: "true",
178+
},
149179
},
150180
Data: configMapData,
151181
}

internal/controller/sparkapplication/monitoring_config_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323

2424
"github.com/stretchr/testify/assert"
2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/errors"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
2729
"k8s.io/utils/ptr"
2830
"sigs.k8s.io/controller-runtime/pkg/client"
2931
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -33,6 +35,28 @@ import (
3335
"github.com/kubeflow/spark-operator/v2/pkg/util"
3436
)
3537

38+
// filteredCacheClient wraps a client.Client and overrides Get to simulate a
39+
// label-filtered informer cache. ConfigMaps that lack the required label are
40+
// invisible to Get (returns NotFound), just like a real ByObject label selector
41+
// would behave at runtime.
42+
type filteredCacheClient struct {
43+
client.Client
44+
requiredLabel string
45+
requiredValue string
46+
}
47+
48+
func (f *filteredCacheClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
49+
if err := f.Client.Get(ctx, key, obj, opts...); err != nil {
50+
return err
51+
}
52+
if cm, ok := obj.(*corev1.ConfigMap); ok {
53+
if cm.Labels[f.requiredLabel] != f.requiredValue {
54+
return errors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, key.Name)
55+
}
56+
}
57+
return nil
58+
}
59+
3660
func TestConfigPrometheusMonitoring(t *testing.T) {
3761
type testcase struct {
3862
app *v1beta2.SparkApplication
@@ -57,6 +81,9 @@ func TestConfigPrometheusMonitoring(t *testing.T) {
5781
err = fakeClient.Get(context.TODO(), client.ObjectKeyFromObject(configMap), configMap)
5882
assert.NoError(t, err, "failed to get ConfigMap %s", configMapName)
5983

84+
assert.Equal(t, "true", configMap.Labels[common.LabelCreatedBySparkOperator],
85+
"ConfigMap %s should have LabelCreatedBySparkOperator label", configMapName)
86+
6087
if test.app.Spec.Monitoring.Prometheus != nil && test.app.Spec.Monitoring.Prometheus.ConfigFile == nil &&
6188
test.app.Spec.Monitoring.MetricsPropertiesFile == nil {
6289
assert.Len(t, configMap.Data, 2, "expected 2 data items")
@@ -342,3 +369,67 @@ func TestConfigPrometheusMonitoring(t *testing.T) {
342369
testFn(test, t)
343370
}
344371
}
372+
373+
func TestConfigPrometheusMonitoring_UpgradeExistingConfigMapWithoutLabel(t *testing.T) {
374+
app := &v1beta2.SparkApplication{
375+
ObjectMeta: metav1.ObjectMeta{
376+
Name: "upgrade-app",
377+
Namespace: "default",
378+
UID: "test-uid",
379+
},
380+
Spec: v1beta2.SparkApplicationSpec{
381+
Monitoring: &v1beta2.MonitoringSpec{
382+
ExposeDriverMetrics: true,
383+
ExposeExecutorMetrics: true,
384+
Prometheus: &v1beta2.PrometheusSpec{
385+
JmxExporterJar: "/prometheus/exporter.jar",
386+
},
387+
},
388+
},
389+
}
390+
391+
configMapName := util.GetPrometheusConfigMapName(app)
392+
393+
// Pre-create a ConfigMap WITHOUT the LabelCreatedBySparkOperator label,
394+
// simulating a ConfigMap created by a previous version of the operator.
395+
existingCM := &corev1.ConfigMap{
396+
ObjectMeta: metav1.ObjectMeta{
397+
Name: configMapName,
398+
Namespace: app.Namespace,
399+
Labels: map[string]string{
400+
"some-other-label": "should-be-preserved",
401+
},
402+
},
403+
Data: map[string]string{
404+
"old-key": "old-value",
405+
},
406+
}
407+
408+
underlying := fake.NewFakeClient(existingCM)
409+
410+
// Wrap the fake client to simulate a label-filtered informer cache:
411+
// Get returns NotFound for ConfigMaps that lack LabelCreatedBySparkOperator,
412+
// forcing the code through the Create -> AlreadyExists -> Patch path.
413+
filtered := &filteredCacheClient{
414+
Client: underlying,
415+
requiredLabel: common.LabelCreatedBySparkOperator,
416+
requiredValue: "true",
417+
}
418+
419+
err := configPrometheusMonitoring(context.TODO(), app, filtered)
420+
assert.NoError(t, err, "configPrometheusMonitoring should handle pre-existing ConfigMap without label")
421+
422+
// Read back from the underlying (unfiltered) client to verify the patch was applied.
423+
updatedCM := &corev1.ConfigMap{}
424+
err = underlying.Get(context.TODO(), client.ObjectKeyFromObject(existingCM), updatedCM)
425+
assert.NoError(t, err, "failed to get updated ConfigMap")
426+
427+
assert.Equal(t, "true", updatedCM.Labels[common.LabelCreatedBySparkOperator],
428+
"Pre-existing ConfigMap should have LabelCreatedBySparkOperator added after upgrade")
429+
430+
assert.Equal(t, "should-be-preserved", updatedCM.Labels["some-other-label"],
431+
"Merge patch should preserve pre-existing labels")
432+
433+
assert.Contains(t, updatedCM.Data, common.MetricsPropertiesKey,
434+
"ConfigMap data should be updated with metrics properties")
435+
}

0 commit comments

Comments
 (0)