From e1f88d97787f3c27954de8d1776df8e9de335f5e Mon Sep 17 00:00:00 2001 From: TongWei1105 Date: Fri, 8 May 2026 16:29:07 +0800 Subject: [PATCH] [SPARK-56793][K8S] Avoid cluster-wide LIST in executor pods polling ### What changes were proposed in this pull request? Scope the executor pod LIST issued by `ExecutorPodsPollingSnapshotSource` to the configured Kubernetes namespace by inserting `.inNamespace(namespace)` between `.pods()` and the label filters. ### Why are the changes needed? Without `.inNamespace(...)` the fabric8 client issues a cluster-wide LIST against the K8s API server. Other paths in the K8s scheduler module (e.g. `KubernetesClusterSchedulerBackend.doKillExecutors`, `ExecutorPodsLifecycleManager`) already scope their pod operations to the configured namespace; the polling source was inconsistent. A cluster-wide LIST: - fails under the typical least-privilege deployment where the driver ServiceAccount is bound to a namespaced Role rather than a ClusterRole; - causes unnecessary load and broadens the visibility surface even when ClusterRole permissions are granted. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated `ExecutorPodsPollingSnapshotSourceSuite` to mock the new `.inNamespace(...)` link in the chain. Ran the full `resource-managers/kubernetes/core` test module: 344 tests across 42 suites, all passing. ### Was this patch authored or co-authored using generative AI tooling? No. --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 2 ++ .../k8s/ExecutorPodsPollingSnapshotSourceSuite.scala | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 4ed34ec3e4c00..3d2822e5eb518 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -47,6 +47,7 @@ class ExecutorPodsPollingSnapshotSource( private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING) + private val namespace = conf.get(KUBERNETES_NAMESPACE) private var pollingFuture: Future[_] = _ @@ -76,6 +77,7 @@ class ExecutorPodsPollingSnapshotSource( logDebug(s"Resynchronizing full executor pod state from Kubernetes.") val pods = kubernetesClient .pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index e0016a2ae0503..71c187a9caf83 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -43,6 +43,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn @Mock private var podOperations: PODS = _ + @Mock + private var namespacedPodOperations: PODS_WITH_NAMESPACE = _ + @Mock private var appIdLabeledPods: LABELED_PODS = _ @@ -62,7 +65,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn MockitoAnnotations.openMocks(this).close() pollingExecutor = new DeterministicScheduler() when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + when(podOperations.inNamespace(defaultConf.get(KUBERNETES_NAMESPACE))) + .thenReturn(namespacedPodOperations) + when(namespacedPodOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(appIdLabeledPods) when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods)