[SPARK-56586][CONNECT][TESTS] Retry flaky python foreachBatch termination test#55786
Draft
LuciferYang wants to merge 14 commits intoapache:masterfrom
Draft
[SPARK-56586][CONNECT][TESTS] Retry flaky python foreachBatch termination test#55786LuciferYang wants to merge 14 commits intoapache:masterfrom
LuciferYang wants to merge 14 commits intoapache:masterfrom
Conversation
… test with retry + timeout Wrap the body of "python foreachBatch process: process terminates after query is stopped" with SparkFunSuite.retry(n = 2) and failAfter(2.minutes) to bound the impact when the test hangs. The underlying hang (blocking readInt against a Python foreachBatch worker that does not send its response) is untouched; the wrapper is best-effort since failAfter uses Thread.interrupt which cannot unblock a non-interruptible socket read. Co-authored-by: Isaac
Refactor-only: keeps the diff vs master minimal by moving the body out from under the retry/failAfter wrappers, avoiding re-indentation. Co-authored-by: Isaac
The successful CI run (zhengruifeng/spark actions run 24756414638) shows the test completing in 5.57s. A 1-minute cap gives ~10x margin while keeping the 3-attempt retry budget at 3 minutes worst case. Co-authored-by: Isaac
Replace failAfter(1.minute) with a fresh daemon thread + Thread.join(timeoutMillis). The test hangs inside a blocking socket read (StreamingForeachBatchHelper.scala:172 dataIn.readInt()); Thread.interrupt, which failAfter uses, cannot unblock that, so the previous wrap was ineffective. Running the body on a separate thread lets the test thread proceed to a TimeoutException after 1 minute, letting retry fire. Co-authored-by: Isaac
Previous commit failed the sql/connect scalafmt check (inline try/catch inside an anonymous Runnable). Switch to a SAM lambda with explicit Runnable type and multi-line try/catch. Co-authored-by: Isaac
Previous CI run showed the retry mechanism firing correctly (TimeoutException on attempt 1, RETRY #1 and RETRY #2), but attempts 2 and 3 failed with IllegalArgumentException because attempt 1's leaked thread kept q2 alive in spark.streams.active, so re-creating a query with the same name failed. Suffix query names with an atomic counter so each attempt uses fresh names, and relax the "no running query" assertion to only check this attempt's queries (a leaked query from a timed-out attempt cannot be synchronously cleaned up). Drop the listener-count assertion since leaked listeners pollute it on retry. Co-authored-by: Isaac
Replace the AtomicInteger counter with System.nanoTime(); equivalent uniqueness across retries without the extra class-level field. Co-authored-by: Isaac
SparkFunSuite.retry emits its "===== RETRY #N =====" line via log4j, which in our setup only writes to target/unit-tests.log (visible only as a downloaded artifact, not in the live job log). Replace with a small local retry helper that prints to stdout and preserves the same semantics (afterEach/beforeEach reset between attempts, up to maxAttempts total). Co-authored-by: Isaac
…ckets Previous CI run showed the retry mechanism bounding the foreachBatch test at 3 minutes as designed, but the (up to 3) leaked worker threads each held a running SparkConnectService, active streaming queries, and open Python foreachBatch sockets. Downstream SparkConnectServiceE2ESuite tests then hung for ~10 minutes each against that polluted session, burning the 150-min job budget. Plumb the SessionHolder to the retry wrapper so that on timeout we can call streamingForeachBatchRunnerCleanerCache.cleanUpAll(), which eventually closes the Python worker SocketChannels. That makes the hung dataIn.readInt() throw AsynchronousCloseException, which unwinds the leaked thread through its finally block (stops SparkConnectService, removes listeners). The wrapper then joins the worker for up to 30s to let the cleanup complete before the next retry attempt starts. Co-authored-by: Isaac
Adding the onTimeout parameter in the previous commit pushed the method signature into a layout that scalafmt 3.8.6 rejects. Put the parameter list back on a single line so scalafmt is happy again. Generated-by: Claude Code (Anthropic Claude Opus 4.7)
…agnostics Address feedback on the original change: HIGH - Snapshot baseline listeners before the body and capture the live SparkConnectService.server reference inside the body. The finally only stops the service when its identity still matches and only removes listeners this attempt registered, so a leaked finally from a previously timed-out attempt can no longer tear down the live service or strip listeners belonging to a concurrent attempt. - Restore an attempt-scoped variant of the listener-count assertion: exactly one new listener (the cleaner listener) should be registered per attempt over the captured baseline. MEDIUM - If the body finishes during the 30s post-cleanup grace window, attach its original error as the TimeoutException's cause instead of dropping it, so a slow assertion failure is not misreported as a pure hang. - Raise the per-attempt cap from 1 minute to 2 minutes for slow-CI headroom while still strictly bounding the original 150-minute hang (worst case 3 * (2 min + 30s grace) ~= 7.5 minutes). - Add a TODO next to retryWithVisibleLog flagging consolidation with SparkFunSuite.retry once that helper supports console-visible retry notices. LOW - Use NonFatal instead of Throwable in retryWithVisibleLog so fatal errors propagate. - Dump the worker's name, state, and stack trace on timeout for post-mortem diagnosis. - Drop the unused default for awaitTestBodyInNewThread's onTimeout parameter; the only caller always supplies a non-trivial cleanup. - Print suppressed onTimeout cleanUpAll errors via println instead of swallowing them silently. - Mention the 30s grace period in the TimeoutException message.
…on timeout
CI on the previous push hit a different hang mode than the original
fix targets: the worker thread was stuck inside
StreamExecution.interruptAndAwaitExecutionThreadTermination
-> Thread.join
(via query1.stop()), not inside dataIn.readInt(). Closing the cleaner
cache via onTimeout does not unblock that join, and the default
spark.sql.streaming.stopTimeout is 0 ("wait forever"), so query.stop()
hangs indefinitely.
Two changes:
- Wrap the test body in withSQLConf(STREAMING_STOP_TIMEOUT = 30s) so
query.stop() falls through with a TimeoutException instead of waiting
forever; the outer 2-minute attempt cap can then recover via retry.
- After onTimeout() in awaitTestBodyInNewThread, also call
worker.interrupt() so any other interruptible blocking call
(Thread.join, Object.wait, Thread.sleep) wakes up. onTimeout still
handles non-interruptible socket reads via the cleaner-cache close.
Also fixes the scalafmt issues that the previous push tripped:
- Reorder the SQLConf import to its alphabetical position.
- Shorten one comment line to fit under maxColumn = 98.
- Align the closing scalastyle:on comment indent inside the catch.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?