feat: support stateful CometUDFs#4345
Merged
Merged
Conversation
kazuyukitanimura
approved these changes
May 15, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
Peeled off from #4267 to keep that PR scoped to codegen. The cache-shape change is independent of any consumer and benefits the codegen dispatcher (#4267), the regex
CometUDF(#4239), and the JSONCometUDF(#4305) equally.The current process-wide
ConcurrentHashMap<String, CometUDF>requires everyCometUDFto be strictly stateless: one shared instance services all tasks. A thread-local cache would not help because Tokio work-stealing on the scan-free execution path can move a Spark task's future between workers across batches, losing per-batch state. Keying by Spark task attempt ID gives continuity within a task and isolation across tasks regardless of which worker is polling.What changes are included in this PR?
CometUdfBridge.INSTANCESbecomesConcurrentHashMap<Long, ConcurrentHashMap<String, CometUDF>>keyed by(taskAttemptId, className).TaskCompletionListenerregistered on the first cache miss for a task evicts the per-task entry on task end.NO_TASK_ID = -1Lsentinel covers calls without aTaskContext(unit tests, direct native driver runs); that bucket is not evicted because no task-completion event fires.CometUDFScaladoc updates the contract to "may hold per-task state in fields" and documents the single-threaded-per-instance invariant (Spark runs one native future per partition, Tokio polls one future per worker at a time).evaluatepreconditions, the post-installTaskContextinvariant, and the cache-side invariants (single listener registration, non-null cache, reflective-instantiate success).How are these changes tested?
No new tests in this PR for the same reason as #4306: the Arrow shading boundary in
common/blocks unit tests that subclassCometUDF. End-to-end coverage lands with each consumer (#4267, #4239, #4305) when it drives the bridge.