From 8b205b827ba6906c47129f28525685854fd36e5a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 00:01:37 -0400 Subject: [PATCH 1/5] Fix thread leak for LOOPBACK workers in external worker pool. --- .../runners/portability/prism_runner_test.py | 57 +++++++++++++++++++ .../runners/worker/worker_pool_main.py | 9 +++ 2 files changed, 66 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index a65f9a9960b4..e089f625d3a8 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -488,6 +488,63 @@ def test_singleton(self, enable_singleton): else: mock_prism_server.assert_called_once() + def test_loopback_worker_daemon_thread_accumulation(self): + """Verifies that in LOOPBACK mode, the external worker pool servicer properly + tracks active thread-based SdkHarness workers and cleanly shuts them down in + StopWorker via sentinel messages. This prevents background daemon threads from + accumulating across sequential pipeline executions and leaking resources. + """ + import queue + import threading + import time + from apache_beam.portability.api import beam_fn_api_pb2 + from apache_beam.runners.worker import worker_pool_main + + servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer( + use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0) + + active_workers = [] + mock_responses = queue.Queue() + + def mock_run(self_worker): + active_workers.append(self_worker) + mock_responses.get() + active_workers.remove(self_worker) + + with mock.patch('apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness: + mock_harness.return_value._responses = mock_responses + mock_harness.return_value.run = lambda: mock_run(mock_harness) + + # Simulate starting Worker 1 for Pipeline 1 + req1 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_1") + req1.control_endpoint.url = "localhost:12345" + servicer.StartWorker(req1, None) + + time.sleep(0.05) + self.assertEqual(len(active_workers), 1) + + # Simulate stopping Worker 1 at the end of Pipeline 1 + stop_req1 = beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_1") + servicer.StopWorker(stop_req1, None) + + time.sleep(0.05) + # Verify the fix: StopWorker successfully tells the thread harness to shut down, + # completely resolving the thread leak! + self.assertEqual(len(active_workers), 0) + + # Simulate starting Worker 2 for Pipeline 2 + req2 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_2") + req2.control_endpoint.url = "localhost:12345" + servicer.StartWorker(req2, None) + + time.sleep(0.05) + self.assertEqual(len(active_workers), 1) + + # Clean up the second worker + servicer.StopWorker(beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_2"), None) + time.sleep(0.05) + self.assertEqual(len(active_workers), 0) + if __name__ == '__main__': # Run the tests. diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py index 425a9fc57752..85e61c34bfca 100644 --- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py +++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py @@ -82,6 +82,7 @@ def __init__( self._state_cache_size = state_cache_size self._data_buffer_time_limit_ms = data_buffer_time_limit_ms self._worker_processes: dict[str, subprocess.Popen] = {} + self._worker_threads: dict[str, sdk_worker.SdkHarness] = {} @classmethod def start( @@ -166,6 +167,7 @@ def StartWorker( worker_id=start_worker_request.worker_id, state_cache_size=self._state_cache_size, data_buffer_time_limit_ms=self._data_buffer_time_limit_ms) + self._worker_threads[start_worker_request.worker_id] = worker worker_thread = threading.Thread( name='run_worker_%s' % start_worker_request.worker_id, target=worker.run) @@ -188,6 +190,13 @@ def StopWorker( _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id) kill_process_gracefully(worker_process) + worker_thread_harness = self._worker_threads.pop( + stop_worker_request.worker_id, None) + if worker_thread_harness: + _LOGGER.info("Stopping thread worker %s" % stop_worker_request.worker_id) + from apache_beam.utils.sentinel import Sentinel + worker_thread_harness._responses.put(Sentinel.sentinel) + return beam_fn_api_pb2.StopWorkerResponse() From 744478b2bf7f12ca3f0d8ef760b66718e0fcb1f1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 11:35:43 -0400 Subject: [PATCH 2/5] Fix lints. --- .../apache_beam/runners/portability/prism_runner_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index e089f625d3a8..db1306a2b96d 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -511,7 +511,8 @@ def mock_run(self_worker): mock_responses.get() active_workers.remove(self_worker) - with mock.patch('apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness: + with mock.patch( + 'apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness: mock_harness.return_value._responses = mock_responses mock_harness.return_value.run = lambda: mock_run(mock_harness) @@ -541,7 +542,8 @@ def mock_run(self_worker): self.assertEqual(len(active_workers), 1) # Clean up the second worker - servicer.StopWorker(beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_2"), None) + servicer.StopWorker( + beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_2"), None) time.sleep(0.05) self.assertEqual(len(active_workers), 0) From 23dcafaa7478a7e7ea5326e453227b22ffe7fec5 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 11:40:44 -0400 Subject: [PATCH 3/5] Add comments. --- sdks/python/apache_beam/runners/worker/worker_pool_main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py index 85e61c34bfca..c7cc555a0dc1 100644 --- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py +++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py @@ -190,6 +190,8 @@ def StopWorker( _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id) kill_process_gracefully(worker_process) + # applicable for thread mode to ensure thread cleanup by + # unblocking the harness request stream. worker_thread_harness = self._worker_threads.pop( stop_worker_request.worker_id, None) if worker_thread_harness: From 8fd5f88108eb5cb2460a7024f90098910467238d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 12:03:56 -0400 Subject: [PATCH 4/5] Fix lints. --- .../runners/portability/prism_runner_test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index db1306a2b96d..3f32b90c0629 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -19,7 +19,10 @@ import argparse import logging import os.path +import queue import shlex +import threading +import time import typing import unittest import zipfile @@ -37,8 +40,10 @@ from apache_beam.options.pipeline_options import PortableOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.runners.portability import portable_runner_test from apache_beam.runners.portability import prism_runner +from apache_beam.runners.worker import worker_pool_main from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms import trigger @@ -494,12 +499,6 @@ def test_loopback_worker_daemon_thread_accumulation(self): StopWorker via sentinel messages. This prevents background daemon threads from accumulating across sequential pipeline executions and leaking resources. """ - import queue - import threading - import time - from apache_beam.portability.api import beam_fn_api_pb2 - from apache_beam.runners.worker import worker_pool_main - servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer( use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0) From 30453fd8f9d919b7359fbf0db90de3cee1ba8a8d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 14:17:27 -0400 Subject: [PATCH 5/5] Address review comments. --- .../runners/portability/prism_runner_test.py | 20 +++++++++++-------- .../runners/worker/worker_pool_main.py | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 3f32b90c0629..9c1620603fd3 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -510,6 +510,14 @@ def mock_run(self_worker): mock_responses.get() active_workers.remove(self_worker) + def wait_for_workers(expected_count, timeout=5.0): + start = time.time() + while time.time() - start < timeout: + if len(active_workers) == expected_count: + return + time.sleep(0.01) + self.assertEqual(len(active_workers), expected_count) + with mock.patch( 'apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness: mock_harness.return_value._responses = mock_responses @@ -520,31 +528,27 @@ def mock_run(self_worker): req1.control_endpoint.url = "localhost:12345" servicer.StartWorker(req1, None) - time.sleep(0.05) - self.assertEqual(len(active_workers), 1) + wait_for_workers(1) # Simulate stopping Worker 1 at the end of Pipeline 1 stop_req1 = beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_1") servicer.StopWorker(stop_req1, None) - time.sleep(0.05) # Verify the fix: StopWorker successfully tells the thread harness to shut down, # completely resolving the thread leak! - self.assertEqual(len(active_workers), 0) + wait_for_workers(0) # Simulate starting Worker 2 for Pipeline 2 req2 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_2") req2.control_endpoint.url = "localhost:12345" servicer.StartWorker(req2, None) - time.sleep(0.05) - self.assertEqual(len(active_workers), 1) + wait_for_workers(1) # Clean up the second worker servicer.StopWorker( beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_2"), None) - time.sleep(0.05) - self.assertEqual(len(active_workers), 0) + wait_for_workers(0) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py index c7cc555a0dc1..efe927b729c1 100644 --- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py +++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py @@ -45,6 +45,7 @@ from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker import sdk_worker from apache_beam.utils import thread_pool_executor +from apache_beam.utils.sentinel import Sentinel _LOGGER = logging.getLogger(__name__) @@ -196,7 +197,6 @@ def StopWorker( stop_worker_request.worker_id, None) if worker_thread_harness: _LOGGER.info("Stopping thread worker %s" % stop_worker_request.worker_id) - from apache_beam.utils.sentinel import Sentinel worker_thread_harness._responses.put(Sentinel.sentinel) return beam_fn_api_pb2.StopWorkerResponse()