Add DoFnRunner::finishKey() method#38454
Conversation
Moving the processTimers call from finish() to finishKey(). In upcoming changes there'll be multiple streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. finishKey() will be called by the NativeIterator classes after iterating through all elements from a work item. Batch processes timers in BatchModeUngroupingParDoFn and does not rely on the processTimers() in ParDoOperation::finish(). So removing the processTimers() call from ParDoOperation::finish() is safe. Batch also does not use the new finishKey() method.
In upcoming changes there'll be multiple dataflow streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. The new finishKey method allows the DoFnRunners to cleanup/persist state (that should not be carried over) before switching work items on multi key bundles. Streaming SideInputDoFnRunners are the only classes using the finishKey method right now. The finishKey() is not exposed to DoFns and is not visible in user apis.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a finishKey() method to the DoFnRunner interface to facilitate the processing of multiple dataflow streaming work items within a single bundle. By allowing runners to perform cleanup or state persistence before switching keys, this change ensures robust state management in multi-key bundles. The method is internal to the runner infrastructure and is not exposed to user-facing APIs. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
R: @scwhittle @reuvenlax @kennknowles PTAL. This is on top of #38430 |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a finishKey() lifecycle method across DoFnRunner and Operation interfaces to facilitate per-key cleanup, specifically moving side input persistence to this method in streaming runners. StreamingModeExecutionContext and several iterators were updated to invoke this method when key processing concludes. The review feedback suggests enhancing robustness by making finishKey() idempotent and adding guards in WindmillReaderIteratorBase to prevent redundant calls if iterators are advanced past exhaustion.
| public void finishKey() { | ||
| checkState(!finishKeyCalled, "finishKey was already called"); | ||
| checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); | ||
| try { | ||
| workExecutor.finishKey(); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| this.finishKeyCalled = true; | ||
| } |
There was a problem hiding this comment.
The current implementation of finishKey() throws an IllegalStateException if called more than once. However, iterators often call advance() multiple times after exhaustion, which would trigger multiple calls to this method. Making finishKey() idempotent is safer and prevents unexpected crashes in the worker. Additionally, providing a descriptive message when wrapping the exception improves debuggability.
public void finishKey() {
if (finishKeyCalled) {
return;
}
checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
try {
workExecutor.finishKey();
} catch (Exception e) {
throw new RuntimeException("Failed to finish key processing", e);
}
this.finishKeyCalled = true;
}| if (bundleIndex >= work.getMessageBundlesCount()) { | ||
| current = null; | ||
| context.finishKey(); | ||
| return false; | ||
| } |
There was a problem hiding this comment.
To avoid redundant calls to context.finishKey() if advance() is invoked after the iterator is already exhausted, consider adding a guard. This ensures the lifecycle method is only triggered once per key, even if the iterator contract is not strictly followed by the caller.
if (bundleIndex >= work.getMessageBundlesCount()) {
if (bundleIndex == work.getMessageBundlesCount()) {
current = null;
context.finishKey();
bundleIndex++;
}
return false;
}
In upcoming changes there'll be multiple dataflow streaming work items
in a single beam bundle. With multiple work items, we've to process elements
and timers of each work item before moving to the next work items.
The new finishKey method allows the DoFnRunners to
cleanup/persist state (that should not be carried over) before switching work items
on multi key bundles.
Dataflow streaming SideInputDoFnRunners are the only classes using the finishKey
method right now.
The finishKey() is not exposed to DoFns and is not visible in user apis.
This is on top of #38430