diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 4ed34ec3e4c00..3d2822e5eb518 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -47,6 +47,7 @@ class ExecutorPodsPollingSnapshotSource( private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING) + private val namespace = conf.get(KUBERNETES_NAMESPACE) private var pollingFuture: Future[_] = _ @@ -76,6 +77,7 @@ class ExecutorPodsPollingSnapshotSource( logDebug(s"Resynchronizing full executor pod state from Kubernetes.") val pods = kubernetesClient .pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index e0016a2ae0503..71c187a9caf83 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -43,6 +43,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn @Mock private var podOperations: PODS = _ + @Mock + private var namespacedPodOperations: PODS_WITH_NAMESPACE = _ + @Mock private var appIdLabeledPods: LABELED_PODS = _ @@ -62,7 +65,9 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn MockitoAnnotations.openMocks(this).close() pollingExecutor = new DeterministicScheduler() when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + when(podOperations.inNamespace(defaultConf.get(KUBERNETES_NAMESPACE))) + .thenReturn(namespacedPodOperations) + when(namespacedPodOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(appIdLabeledPods) when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods)