Skip to content

[core] Add committer-side bucket consistency check#7793

Merged
JingsongLi merged 1 commit into
apache:masterfrom
Aitozi:aitozi/bucket-unorder
May 13, 2026
Merged

[core] Add committer-side bucket consistency check#7793
JingsongLi merged 1 commit into
apache:masterfrom
Aitozi:aitozi/bucket-unorder

Conversation

@Aitozi
Copy link
Copy Markdown
Contributor

@Aitozi Aitozi commented May 9, 2026

Purpose

Add committer-side bucket consistency validation for write-only unordered append tables.

after #6741 When bucket-append-ordered=false and write-only=true, writers skip restoring previous files, so bucket-count validation can be bypassed after bucket rescale. This change adds an internal commit-side checkSameBucket path for fixed hash bucket tables to validate touched partitions before committing.

The check is integrated with ConflictDetection, reuses the existing conflict path when available, and uses a bounded partition cache to avoid repeatedly checking the same partition within one committer lifecycle.

Tests

  • Added core coverage for unordered write-only append:
    • prepareCommit succeeds when bucket count changes.
    • commit fails for an existing partition with mismatched bucket count.
    • writing a new partition succeeds.
    • append-style commits with both DELETE and ADD still validate bucket consistency.
  • Added Flink IT coverage:
    • INSERT INTO fails after bucket count changes.
    • INSERT OVERWRITE succeeds for rescaling.

@Aitozi Aitozi force-pushed the aitozi/bucket-unorder branch 4 times, most recently from ab6ff42 to 8384547 Compare May 9, 2026 13:15
@Aitozi Aitozi requested a review from JingsongLi May 9, 2026 14:12
try {
Set<BinaryRow> remainingPartitions = new HashSet<>(changedPartitions);
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
FileStoreScan freshScan = scanSupplier.get().dropStats();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropStats?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

.checkSameBucket(
bucketMode() == BucketMode.HASH_FIXED
&& options.writeOnly()
&& !options.bucketAppendOrdered());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileStoreCommitImpl already has options, can you just check it inside?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@Aitozi Aitozi force-pushed the aitozi/bucket-unorder branch from 8384547 to 89e6214 Compare May 11, 2026 15:25
@Aitozi Aitozi force-pushed the aitozi/bucket-unorder branch from 81ebe02 to fce7662 Compare May 12, 2026 08:06
@Aitozi
Copy link
Copy Markdown
Contributor Author

Aitozi commented May 12, 2026

CC @JingsongLi , please take another look

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. The goal makes sense: for fixed-bucket append tables with write-only=true and unordered appends, the committer should reject new APPEND files whose totalBuckets differs from the existing layout, unless the user first performs an INSERT OVERWRITE rescale.

I found one correctness issue that I think should be fixed before merge.

sameBucketCheckedPartitions is cached across commits, but the cache is not tied to either the latest snapshot id or the bucket number that was checked. This can skip a required check after an overwrite/rescale by another committer.

A possible sequence:

  1. A long-lived streaming committer writes partition p with bucket num 2.
  2. A later APPEND from the same committer checks p against the latest snapshot and records p in sameBucketCheckedPartitions.
  3. A batch job performs the recommended rescale flow: INSERT OVERWRITE partition/table with bucket num 4.
  4. The old streaming committer is still alive and appends to p with bucket num 2 again.

At step 4, collectUncheckedBucketPartitions skips p because it is already cached, so checkSameBucketFromSnapshot does not call readTotalBuckets and the APPEND can succeed with the old bucket num. checkBucketKeepSame has the same skip when conflict detection is enabled. That leaves files with mixed totalBuckets after an overwrite rescale, which is exactly what this PR is trying to prevent.

I think the cache should either be removed, or made safe by tying it to the snapshot/layout that was actually checked. For example, invalidate/recheck cached partitions when the latest snapshot advances in a way that may change layout, or store enough information to verify the cached bucket number against the current snapshot. The simplest safe implementation may be to always read the current total bucket for the changed partitions; readTotalBuckets already stops after finding one file per partition.

Could you also add a regression test for this case? Something like: use the same StreamTableCommit to append to one partition twice so the cache is populated, use another committer/table copy to withOverwrite({"f0": "1"}) the same partition with a different bucket num, then verify the old committer's next APPEND with the old bucket num fails.

I attempted to run a focused BucketedAppendFileStoreWriteTest, but this environment could not resolve the current paimon-arrow:1.5-SNAPSHOT test dependency from the configured snapshot repository, so I could not complete local test validation here.

@Aitozi
Copy link
Copy Markdown
Contributor Author

Aitozi commented May 13, 2026

Hi @leaves12138 , Thank you for your comment.

It is certainly possible that the situation you described could happen.But currently, our check logic actually doesn't verify this type of situation.

For example, taking our Writer-side Bucket Rescale detection as an instance: it actually stops detecting once the WriterContainer has been initialized. Therefore, the scenario you mentioned is similarly unavoidable at this stage.

In a scenario where one job is performing a normal write while another job is performing an "insert overwrite rescale" the expected outcome is that the data will be overwritten. However, this is not considered a normal or intended write behavior.
Based on this consideration, I believe adding a cache to the Commit side is a reasonable approach.

Its primary purpose is to prevent the Commit side from repeatedly checking the Manifest file and result in additional cost.

@JingsongLi
Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 1a58e5e into apache:master May 13, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants