Flink: Centralize dynamic-sink record routing on DynamicRecord#16231
Closed
jordepic wants to merge 0 commit intoapache:mainfrom
Closed
Flink: Centralize dynamic-sink record routing on DynamicRecord#16231jordepic wants to merge 0 commit intoapache:mainfrom
jordepic wants to merge 0 commit intoapache:mainfrom
Conversation
mxm
reviewed
May 7, 2026
Contributor
mxm
left a comment
There was a problem hiding this comment.
Hey Jordan! Thanks for the PR! Do you think we could factor out the fixes before we attempt the refactor? Otherwise this could be a bit tricky to review. For ease for review, we also typically remove all Flink versions but the latest.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Move per-record overrides (resolved equality fields, forward eligibility, distribution-mode resolution, and key-selector construction) onto DynamicRecord itself, and centralize how DynamicRecordProcessor builds and emits a DynamicRecordInternal behind a single DynamicRecordPlan abstraction with three implementations (CachedRecordPlan, DeferredUpdateRecordPlan, ImmediateUpdateRecordPlan). HashKeyGenerator keeps its selector cache and now delegates dispatch to DynamicRecord.keySelector. Existing equality-field-id resolution (user-supplied set, falling back to schema identifier fields) is reused unchanged.
Existing routing functionality is preserved with one exception: forward records (null distribution mode) that resolve to a non-empty equality-field set no longer take the forward path. Forwarding such records can land rows sharing an equality key on different writer subtasks, where independent equality deletes can leave duplicates behind. They are now hash-distributed by their equality fields, matching the rest of the dynamic sink. The Forward Mode section of docs/docs/flink-writes.md notes this carve-out.
Also corrects the DynamicRecord field reference in the docs: UpsertMode is a per-record write-time flag and does not override the table's write.upsert.enabled property.