Skip to content

Fix thread leak for LOOPBACK workers in external worker pool#38432

Merged
shunping merged 5 commits into
apache:masterfrom
shunping:fix-grpc-deadline-exceed
May 11, 2026
Merged

Fix thread leak for LOOPBACK workers in external worker pool#38432
shunping merged 5 commits into
apache:masterfrom
shunping:fix-grpc-deadline-exceed

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented May 11, 2026

We have been seeing "DEADLINE EXCEEDED" from time to time in our tests since we switched the default python runner to Prism.

Below is an example traceback:

target/.tox-py314-cloud/py314-cloud/lib/python3.14/site-packages/apache_beam/pipeline.py:654: in __exit__
    self.result.wait_until_finish()
target/.tox-py314-cloud/py314-cloud/lib/python3.14/site-packages/apache_beam/runners/portability/portable_runner.py:572: in wait_until_finish
    raise self._runtime_exception
target/.tox-py314-cloud/py314-cloud/lib/python3.14/site-packages/apache_beam/runners/portability/portable_runner.py:581: in _observe_state
    for state_response in self._state_stream:
                          ^^^^^^^^^^^^^^^^^^
target/.tox-py314-cloud/py314-cloud/lib/python3.14/site-packages/grpc/_channel.py:538: in __next__
    return self._next()
           ^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            if cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                if self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py314-cloud/py314-cloud/lib/python3.14/site-packages/grpc/_channel.py:956: _MultiThreadedRendezvous

The above traceback suggests that the Python client tries to retrieve state from Prism server, but it is timeout. We suspect the timeout is due to resource starvation from leaking threads and grpc channels of SdkHarness workers.

  • When executing portable pipelines in LOOPBACK mode (the default environment for PrismRunner), the external worker pool launches SdkHarness workers inside the Python process as background daemon threads (not separate processes).
    Previously, StopWorker explicitly skipped terminating thread-based workers under the assumption that they would exit automatically. However, because PrismRunner acts as a process-level shared singleton(Use singleton prism server by default. #36228), its gRPC server remains active across tests and does not immediately close these control/data streams. As a result, background daemon threads and open gRPC channels accumulate sequentially across test runs.
  • When executing large test suites via pytest, accumulating dozens or hundreds of these threads creates severe GIL contention, memory pressure, and socket exhaustion, eventually stalling pipeline execution and causing DEADLINE_EXCEEDED timeouts.

In this PR, we add code to (1) track active SdkHarness worker threads and (2) push Sentinel.sentinel directly into its _responses queue in StopWorker() to unblock the worker's request iterator and shut down all associated thread pools and grpc channel resources.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 11, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.75%. Comparing base (a8e7ffa) to head (8fd5f88).
⚠️ Report is 7 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #38432      +/-   ##
============================================
- Coverage     55.76%   55.75%   -0.01%     
  Complexity     2095     2095              
============================================
  Files          1099     1099              
  Lines        172277   172288      +11     
  Branches       1350     1350              
============================================
- Hits          96065    96064       -1     
- Misses        73817    73829      +12     
  Partials       2395     2395              
Flag Coverage Δ
python 79.83% <100.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@shunping shunping force-pushed the fix-grpc-deadline-exceed branch from befc93c to 23dcafa Compare May 11, 2026 15:40
@shunping shunping changed the title Attempt to fix deadline exceeded in Python test workflows Fix thread leak for LOOPBACK workers in external worker pool May 11, 2026
@shunping
Copy link
Copy Markdown
Collaborator Author

r: @Abacn

@shunping
Copy link
Copy Markdown
Collaborator Author

cc'ed @tvalentyn since it is related to the DEADLINE EXCEEDED problem.

@shunping shunping marked this pull request as ready for review May 11, 2026 17:22
@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a resource leak issue occurring in LOOPBACK mode, where SdkHarness workers running as background daemon threads were not being properly terminated. This accumulation of threads and gRPC channels across test runs led to resource starvation and intermittent DEADLINE_EXCEEDED errors. The changes introduce a mechanism to track these threads and explicitly signal them to shut down, ensuring stable performance during large test suites.

Highlights

  • Thread Leak Resolution: Implemented tracking for active SdkHarness worker threads in the external worker pool to prevent resource accumulation.
  • Graceful Shutdown: Updated StopWorker to inject a sentinel message into the worker's response queue, ensuring thread-based workers terminate correctly and release gRPC resources.
  • Regression Testing: Added a new test case in prism_runner_test.py to verify that worker threads are properly cleaned up after execution.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a mechanism to track and cleanly shut down thread-based SdkHarness workers in the BeamFnExternalWorkerPoolServicer, preventing resource leaks in LOOPBACK mode. Reviewers identified a potential thread leak if StartWorker is called with an existing ID and suggested a logic fix. Additionally, feedback was provided to improve test stability by replacing sleep calls with synchronization primitives and to move an inline import to the top level for better code style.

Comment thread sdks/python/apache_beam/runners/worker/worker_pool_main.py
Comment thread sdks/python/apache_beam/runners/portability/prism_runner_test.py Outdated
Comment thread sdks/python/apache_beam/runners/worker/worker_pool_main.py Outdated
@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented May 11, 2026

This LGTM, though additional reviewer for gRPC related logic is preferred

@shunping
Copy link
Copy Markdown
Collaborator Author

The last two presubmit runs both succeeded without any reruns.

Copy link
Copy Markdown
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

since the main code change only touches stop_worker it won't affect pipeline functionality

@shunping shunping merged commit a72b781 into apache:master May 11, 2026
98 checks passed
aIbrahiim pushed a commit to aIbrahiim/beam that referenced this pull request May 12, 2026
…38432)

* Fix thread leak for LOOPBACK workers in external worker pool.

* Fix lints.

* Add comments.

* Fix lints.

* Address review comments.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants