_SNAPSHOT_ID: Commit snapshot id for snapshot-based merge ordering. ID =
+ * 2147483641.
*
*
* Structured type fields:
@@ -93,13 +95,17 @@ public class SpecialFields {
public static final DataField ROW_ID =
new DataField(Integer.MAX_VALUE - 5, "_ROW_ID", DataTypes.BIGINT().notNull());
+ public static final DataField SNAPSHOT_ID =
+ new DataField(Integer.MAX_VALUE - 6, "_SNAPSHOT_ID", DataTypes.BIGINT());
+
public static final Set SYSTEM_FIELD_NAMES =
Stream.of(
SEQUENCE_NUMBER.name(),
VALUE_KIND.name(),
LEVEL.name(),
ROW_KIND.name(),
- ROW_ID.name())
+ ROW_ID.name(),
+ SNAPSHOT_ID.name())
.collect(Collectors.toSet());
public static boolean isSystemField(int fieldId) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index c314f21107a7..a0b99d876af2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -33,6 +33,7 @@
import static org.apache.paimon.table.SpecialFields.LEVEL;
import static org.apache.paimon.table.SpecialFields.SEQUENCE_NUMBER;
+import static org.apache.paimon.table.SpecialFields.SNAPSHOT_ID;
import static org.apache.paimon.table.SpecialFields.VALUE_KIND;
/**
@@ -43,6 +44,7 @@ public class KeyValue {
public static final long UNKNOWN_SEQUENCE = -1;
public static final int UNKNOWN_LEVEL = -1;
+ public static final long UNKNOWN_SNAPSHOT_ID = -1;
private InternalRow key;
// determined after written into memory table or read from file
@@ -51,6 +53,8 @@ public class KeyValue {
private InternalRow value;
// determined after read from file
private int level;
+ // determined after read from file; UNKNOWN_SNAPSHOT_ID if snapshot-ordering is not enabled
+ private long snapshotId;
public KeyValue replace(InternalRow key, RowKind valueKind, InternalRow value) {
return replace(key, UNKNOWN_SEQUENCE, valueKind, value);
@@ -63,6 +67,7 @@ public KeyValue replace(
this.valueKind = valueKind;
this.value = value;
this.level = UNKNOWN_LEVEL;
+ this.snapshotId = UNKNOWN_SNAPSHOT_ID;
return this;
}
@@ -89,6 +94,11 @@ public long sequenceNumber() {
return sequenceNumber;
}
+ public KeyValue setSequenceNumber(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ return this;
+ }
+
public RowKind valueKind() {
return valueKind;
}
@@ -110,6 +120,21 @@ public KeyValue setLevel(int level) {
return this;
}
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public KeyValue setSnapshotId(long snapshotId) {
+ this.snapshotId = snapshotId;
+ return this;
+ }
+
+ public static int compareSnapshotId(KeyValue a, KeyValue b) {
+ long sa = a.snapshotId == UNKNOWN_SNAPSHOT_ID ? Long.MIN_VALUE : a.snapshotId;
+ long sb = b.snapshotId == UNKNOWN_SNAPSHOT_ID ? Long.MIN_VALUE : b.snapshotId;
+ return Long.compare(sa, sb);
+ }
+
public static RowType schema(RowType keyType, RowType valueType) {
return new RowType(false, createKeyValueFields(keyType.getFields(), valueType.getFields()));
}
@@ -120,6 +145,13 @@ public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
return new RowType(fields);
}
+ public static RowType schemaWithLevelAndSnapshotId(RowType keyType, RowType valueType) {
+ List fields = new ArrayList<>(schema(keyType, valueType).getFields());
+ fields.add(LEVEL);
+ fields.add(SNAPSHOT_ID);
+ return new RowType(fields);
+ }
+
/**
* Create key-value fields.
*
@@ -173,7 +205,8 @@ public KeyValue copy(
sequenceNumber,
valueKind,
valueSerializer.copy(value))
- .setLevel(level);
+ .setLevel(level)
+ .setSnapshotId(snapshotId);
}
@VisibleForTesting
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index a7a4de0b5cbe..ecf57b77fd96 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -277,8 +277,15 @@ static DataFileMeta create(
SimpleStats valueStats();
+ /**
+ * Minimum sequence number of records in this file. When {@code sequence.snapshot-ordering} is
+ * enabled for a primary-key table, this field is repurposed to carry the commit snapshot id
+ * instead of the per-record sequence number range (see {@code
+ * FileStoreCommitImpl.assignSnapshotSequenceOrdering}).
+ */
long minSequenceNumber();
+ /** @see #minSequenceNumber() */
long maxSequenceNumber();
long schemaId();
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 6cf08769703f..d597effadfa4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -36,12 +36,21 @@ public class KeyValueDataFileRecordReader implements FileRecordReader
private final FileRecordReader reader;
private final KeyValueSerializer serializer;
private final int level;
+ private final long snapshotId;
+ private final boolean recoverSnapshotIdFromSequence;
public KeyValueDataFileRecordReader(
- FileRecordReader reader, RowType keyType, RowType valueType, int level) {
+ FileRecordReader reader,
+ RowType keyType,
+ RowType valueType,
+ int level,
+ long snapshotId,
+ boolean recoverSnapshotIdFromSequence) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
+ this.snapshotId = snapshotId;
+ this.recoverSnapshotIdFromSequence = recoverSnapshotIdFromSequence;
}
@Nullable
@@ -53,10 +62,18 @@ public FileRecordIterator readBatch() throws IOException {
}
return iterator.transform(
- internalRow ->
- internalRow == null
- ? null
- : serializer.fromRow(internalRow).setLevel(level));
+ internalRow -> {
+ if (internalRow == null) {
+ return null;
+ }
+ KeyValue kv = serializer.fromRow(internalRow).setLevel(level);
+ if (recoverSnapshotIdFromSequence) {
+ kv.setSnapshotId(kv.sequenceNumber());
+ } else {
+ kv.setSnapshotId(snapshotId);
+ }
+ return kv;
+ });
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index fc505e19c270..fb84e4d1f844 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -30,6 +30,7 @@
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
@@ -68,6 +69,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory {
private final long asyncThreshold;
private final boolean ignoreCorruptFiles;
private final boolean ignoreLostFiles;
+ private final boolean snapshotSequenceOrdering;
private final Map formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
@@ -93,6 +95,7 @@ protected KeyValueFileReaderFactory(
this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes();
this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
+ this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
@@ -168,7 +171,24 @@ private FileRecordReader createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}
- return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
+ // When snapshot-ordering is enabled, minSequenceNumber carries the commit snapshot id
+ // (stamped by FileStoreCommitImpl.stampSequenceWithSnapshotId at commit time).
+ // For compaction output files, each record's sequenceNumber already contains its
+ // snapshotId, so we recover per-record snapshotId from sequenceNumber instead of
+ // using a uniform file-level stamp.
+ boolean recoverSnapshotIdFromSequence =
+ snapshotSequenceOrdering
+ && file.fileSource().isPresent()
+ && file.fileSource().get() == FileSource.COMPACT;
+ long snapshotId =
+ snapshotSequenceOrdering ? file.minSequenceNumber() : KeyValue.UNKNOWN_SNAPSHOT_ID;
+ return new KeyValueDataFileRecordReader(
+ fileRecordReader,
+ keyType,
+ valueType,
+ file.level(),
+ snapshotId,
+ recoverSnapshotIdFromSequence);
}
public static Builder builder(
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 705ef5119f07..d1414904fd7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -65,6 +65,7 @@ public class MergeSorter {
private final SortEngine sortEngine;
private final int spillThreshold;
private final CompressOptions compression;
+ private final boolean snapshotSequenceOrdering;
private final CachelessSegmentPool memoryPool;
@@ -77,6 +78,7 @@ public MergeSorter(
@Nullable IOManager ioManager) {
this.sortEngine = options.sortEngine();
this.spillThreshold = options.sortSpillThreshold();
+ this.snapshotSequenceOrdering = options.snapshotSequenceOrdering();
this.compression = options.spillCompressOptions();
this.keyType = keyType;
this.valueType = valueType;
@@ -142,7 +144,12 @@ public RecordReader mergeSortNoSpill(
}
return SortMergeReader.createSortMergeReader(
- readers, keyComparator, userDefinedSeqComparator, mergeFunction, sortEngine);
+ readers,
+ keyComparator,
+ userDefinedSeqComparator,
+ mergeFunction,
+ sortEngine,
+ snapshotSequenceOrdering);
}
private RecordReader spillMergeSort(
@@ -170,7 +177,8 @@ private ReaderSupplier spill(ReaderSupplier readerSupplier)
FileIOChannel.ID channel = ioManager.createChannel();
KeyValueWithLevelNoReusingSerializer serializer =
- new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+ new KeyValueWithLevelNoReusingSerializer(
+ keyType, valueType, snapshotSequenceOrdering);
BlockCompressionFactory compressFactory = BlockCompressionFactory.create(compression);
int compressBlock = (int) MemorySize.parse("64 kb").getBytes();
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 00b962803605..4a61d2f44dbc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -61,14 +61,16 @@ public ChangelogMergeTreeRewriter(
MergeFunctionFactory mfFactory,
MergeSorter mergeSorter,
boolean produceChangelog,
- boolean forceDropDelete) {
+ boolean forceDropDelete,
+ boolean snapshotSequenceOrdering) {
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
- mergeSorter);
+ mergeSorter,
+ snapshotSequenceOrdering);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
this.produceChangelog = produceChangelog;
@@ -146,6 +148,9 @@ private CompactResult rewriteOrProduceChangelog(
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
+ if (snapshotSequenceOrdering) {
+ keyValue.setSequenceNumber(keyValue.snapshotId());
+ }
compactFileWriter.write(keyValue);
}
if (produceChangelog) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 0304819e92b5..6ed3023df371 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -53,7 +53,8 @@ public FullChangelogMergeTreeCompactRewriter(
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory mfFactory,
MergeSorter mergeSorter,
- @Nullable RecordEqualiser valueEqualiser) {
+ @Nullable RecordEqualiser valueEqualiser,
+ boolean snapshotSequenceOrdering) {
super(
maxLevel,
mergeEngine,
@@ -64,7 +65,8 @@ public FullChangelogMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
true,
- false);
+ false,
+ snapshotSequenceOrdering);
this.valueEqualiser = valueEqualiser;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 7283a3030d01..6ded4e813e37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -71,7 +71,8 @@ public LookupChangelogMergeFunctionWrapper(
@Nullable RecordEqualiser valueEqualiser,
LookupStrategy lookupStrategy,
@Nullable BucketedDvMaintainer deletionVectorsMaintainer,
- @Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
+ @Nullable UserDefinedSeqComparator userDefinedSeqComparator,
+ boolean snapshotSequenceOrdering) {
MergeFunction mergeFunction = mergeFunctionFactory.create();
checkArgument(
mergeFunction instanceof LookupMergeFunction,
@@ -87,7 +88,8 @@ public LookupChangelogMergeFunctionWrapper(
this.valueEqualiser = valueEqualiser;
this.lookupStrategy = lookupStrategy;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
- this.comparator = createSequenceComparator(userDefinedSeqComparator);
+ this.comparator =
+ createSequenceComparator(userDefinedSeqComparator, snapshotSequenceOrdering);
}
@Override
@@ -175,8 +177,17 @@ private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
}
private Comparator createSequenceComparator(
- @Nullable FieldsComparator userDefinedSeqComparator) {
+ @Nullable FieldsComparator userDefinedSeqComparator, boolean snapshotSequenceOrdering) {
if (userDefinedSeqComparator == null) {
+ if (snapshotSequenceOrdering) {
+ return (o1, o2) -> {
+ int result = KeyValue.compareSnapshotId(o1, o2);
+ if (result != 0) {
+ return result;
+ }
+ return Long.compare(o1.sequenceNumber(), o2.sequenceNumber());
+ };
+ }
return Comparator.comparingLong(KeyValue::sequenceNumber);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index f566aca1c831..5b0bed732cef 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -88,7 +88,8 @@ public LookupMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
produceChangelog,
- dvMaintainer != null);
+ dvMaintainer != null,
+ options.snapshotSequenceOrdering());
this.dvMaintainer = dvMaintainer;
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
@@ -189,14 +190,17 @@ public static class LookupMergeFunctionWrapperFactory
@Nullable private final RecordEqualiser valueEqualiser;
private final LookupStrategy lookupStrategy;
@Nullable private final UserDefinedSeqComparator userDefinedSeqComparator;
+ private final boolean snapshotSequenceOrdering;
public LookupMergeFunctionWrapperFactory(
@Nullable RecordEqualiser valueEqualiser,
LookupStrategy lookupStrategy,
- @Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
+ @Nullable UserDefinedSeqComparator userDefinedSeqComparator,
+ boolean snapshotSequenceOrdering) {
this.valueEqualiser = valueEqualiser;
this.lookupStrategy = lookupStrategy;
this.userDefinedSeqComparator = userDefinedSeqComparator;
+ this.snapshotSequenceOrdering = snapshotSequenceOrdering;
}
@Override
@@ -217,7 +221,8 @@ public MergeFunctionWrapper create(
valueEqualiser,
lookupStrategy,
deletionVectorsMaintainer,
- userDefinedSeqComparator);
+ userDefinedSeqComparator,
+ snapshotSequenceOrdering);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
index 4c393c3815d0..19a17b4b9e91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
@@ -253,7 +253,8 @@ private MergeTreeCompactRewriter createRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
- logDedupEqualSupplier.get());
+ logDedupEqualSupplier.get(),
+ options.snapshotSequenceOrdering());
} else if (lookupStrategy.needLookup) {
PersistProcessor.Factory> processorFactory;
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory> wrapperFactory;
@@ -286,7 +287,8 @@ private MergeTreeCompactRewriter createRewriter(
new LookupMergeFunctionWrapperFactory<>(
logDedupEqualSupplier.get(),
lookupStrategy,
- UserDefinedSeqComparator.create(valueType, options));
+ UserDefinedSeqComparator.create(valueType, options),
+ options.snapshotSequenceOrdering());
}
LookupLevels> lookupLevels =
createLookupLevels(
@@ -323,7 +325,8 @@ private MergeTreeCompactRewriter createRewriter(
keyComparator,
userDefinedSeqComparator,
mfFactory,
- mergeSorter);
+ mergeSorter,
+ options.snapshotSequenceOrdering());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 9eecce5248c4..8e8a601ea07d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -52,6 +52,7 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory mfFactory;
protected final MergeSorter mergeSorter;
+ protected final boolean snapshotSequenceOrdering;
@Nullable private CompactionMetrics.Reporter metricsReporter;
public MergeTreeCompactRewriter(
@@ -60,13 +61,15 @@ public MergeTreeCompactRewriter(
Comparator keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory mfFactory,
- MergeSorter mergeSorter) {
+ MergeSorter mergeSorter,
+ boolean snapshotSequenceOrdering) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mfFactory = mfFactory;
this.mergeSorter = mergeSorter;
+ this.snapshotSequenceOrdering = snapshotSequenceOrdering;
}
@Override
@@ -88,6 +91,9 @@ protected CompactResult rewriteCompaction(
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
+ if (snapshotSequenceOrdering) {
+ reader = stampSequenceWithSnapshotId(reader);
+ }
writer.write(new RecordReaderIterator<>(reader));
} catch (Exception e) {
collectedExceptions = e;
@@ -133,6 +139,15 @@ protected List notifyRewriteCompactAfter(List files)
return files;
}
+ protected static RecordReader stampSequenceWithSnapshotId(
+ RecordReader reader) {
+ return reader.transform(
+ kv -> {
+ kv.setSequenceNumber(kv.snapshotId());
+ return kv;
+ });
+ }
+
public void setMetricsReporter(@Nullable CompactionMetrics.Reporter reporter) {
this.metricsReporter = reporter;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
index 598ca2aa692c..91ae0c54dc81 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
@@ -43,14 +43,23 @@ static SortMergeReader createSortMergeReader(
Comparator userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper mergeFunctionWrapper,
- SortEngine sortEngine) {
+ SortEngine sortEngine,
+ boolean snapshotSequenceOrdering) {
switch (sortEngine) {
case MIN_HEAP:
return new SortMergeReaderWithMinHeap<>(
- readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);
+ readers,
+ userKeyComparator,
+ userDefinedSeqComparator,
+ mergeFunctionWrapper,
+ snapshotSequenceOrdering);
case LOSER_TREE:
return new SortMergeReaderWithLoserTree<>(
- readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);
+ readers,
+ userKeyComparator,
+ userDefinedSeqComparator,
+ mergeFunctionWrapper,
+ snapshotSequenceOrdering);
default:
throw new UnsupportedOperationException("Unsupported sort engine: " + sortEngine);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
index 3ca3d288e05b..eecf0dc8ecd0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
@@ -40,18 +40,29 @@ public SortMergeReaderWithLoserTree(
List> readers,
Comparator userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
- MergeFunctionWrapper mergeFunctionWrapper) {
+ MergeFunctionWrapper mergeFunctionWrapper,
+ boolean snapshotSequenceOrdering) {
this.mergeFunctionWrapper = mergeFunctionWrapper;
this.loserTree =
new LoserTree<>(
readers,
(e1, e2) -> userKeyComparator.compare(e2.key(), e1.key()),
- createSequenceComparator(userDefinedSeqComparator));
+ createSequenceComparator(
+ userDefinedSeqComparator, snapshotSequenceOrdering));
}
private Comparator createSequenceComparator(
- @Nullable FieldsComparator userDefinedSeqComparator) {
+ @Nullable FieldsComparator userDefinedSeqComparator, boolean snapshotSequenceOrdering) {
if (userDefinedSeqComparator == null) {
+ if (snapshotSequenceOrdering) {
+ return (e1, e2) -> {
+ int result = KeyValue.compareSnapshotId(e2, e1);
+ if (result != 0) {
+ return result;
+ }
+ return Long.compare(e2.sequenceNumber(), e1.sequenceNumber());
+ };
+ }
return (e1, e2) -> Long.compare(e2.sequenceNumber(), e1.sequenceNumber());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
index a78ef334f071..611ccef05119 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
@@ -46,7 +46,8 @@ public SortMergeReaderWithMinHeap(
List> readers,
Comparator userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
- MergeFunctionWrapper mergeFunctionWrapper) {
+ MergeFunctionWrapper mergeFunctionWrapper,
+ boolean snapshotSequenceOrdering) {
this.nextBatchReaders = new ArrayList<>(readers);
this.userKeyComparator = userKeyComparator;
this.mergeFunctionWrapper = mergeFunctionWrapper;
@@ -65,6 +66,13 @@ public SortMergeReaderWithMinHeap(
if (result != 0) {
return result;
}
+ } else {
+ if (snapshotSequenceOrdering) {
+ int snapshotCmp = KeyValue.compareSnapshotId(e1.kv, e2.kv);
+ if (snapshotCmp != 0) {
+ return snapshotCmp;
+ }
+ }
}
return Long.compare(e1.kv.sequenceNumber(), e2.kv.sequenceNumber());
});
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 73f335fc0236..e0317be4f3f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -969,6 +969,10 @@ CommitResult tryCommitOnce(
deltaFiles = assigned.assignedEntries;
}
+ if (options.snapshotSequenceOrdering()) {
+ deltaFiles = stampSequenceWithSnapshotId(newSnapshotId, commitKind, deltaFiles);
+ }
+
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
@@ -1229,4 +1233,38 @@ public void close() {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
+
+ /**
+ * When {@code sequence.snapshot-ordering} is enabled, we stamp the commit snapshot id into
+ * {@link DataFileMeta#minSequenceNumber()} and {@link DataFileMeta#maxSequenceNumber()} at file
+ * level. This avoids adding a new field to DataFileMeta and follows the same pattern used by
+ * row-tracking tables (see {@link RowTrackingCommitUtils#assignRowTracking}). At read time,
+ * {@code KeyValueFileReaderFactory} extracts the snapshot id from {@code minSequenceNumber} and
+ * stamps it onto each {@code KeyValue}, where the sort-merge readers use it as the primary
+ * tiebreaker.
+ *
+ * The per-record sequence numbers stored inside data files (the {@code _SEQUENCE_NUMBER}
+ * column in the key-value format) are unaffected for APPEND commits and still serve as a
+ * secondary tiebreaker within the same snapshot.
+ *
+ *
For {@link CommitKind#COMPACT} commits, the compaction rewriter has already written each
+ * record's snapshotId into the per-record {@code _SEQUENCE_NUMBER} column, so the file-level
+ * min/maxSequenceNumber (tracked by the writer from per-record values) already reflects the
+ * correct snapshot id range. We return the files unchanged.
+ */
+ private static List stampSequenceWithSnapshotId(
+ long snapshotId, CommitKind commitKind, List files) {
+ if (commitKind == CommitKind.COMPACT) {
+ return files;
+ }
+ List result = new ArrayList<>(files.size());
+ for (ManifestEntry entry : files) {
+ if (entry.kind() == FileKind.ADD) {
+ result.add(entry.assignSequenceNumber(snapshotId, snapshotId));
+ } else {
+ result.add(entry);
+ }
+ }
+ return result;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 13ee86cdd0cb..0398043a1c9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -254,6 +254,10 @@ public static void validateTableSchema(TableSchema schema) {
validateForDeletionVectors(options);
}
+ if (options.snapshotSequenceOrdering()) {
+ validateSnapshotSequenceOrdering(schema, options);
+ }
+
// vector field names must point to vector type
Set fieldNamesSpecifiedAsVector = options.vectorField();
schema.fields()
@@ -578,6 +582,19 @@ private static void validateFileIndex(TableSchema schema) {
}
}
+ private static void validateSnapshotSequenceOrdering(TableSchema schema, CoreOptions options) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "%s = true requires a primary-key table; append-only tables cannot use "
+ + "snapshot-based sequence ordering.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key());
+ checkArgument(
+ options.sequenceField().isEmpty(),
+ "%s = true is mutually exclusive with %s; the snapshot id is the sole tiebreaker.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+ CoreOptions.SEQUENCE_FIELD.key());
+ }
+
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
index 456dc43a0a86..bed413d36923 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
@@ -26,35 +26,53 @@
import static org.apache.paimon.data.JoinedRow.join;
-/** Serializer for {@link KeyValue} with Level. */
+/** Serializer for {@link KeyValue} with Level and optional snapshotId. */
public class KeyValueWithLevelNoReusingSerializer extends ObjectSerializer {
private static final long serialVersionUID = 1L;
private final int keyArity;
private final int valueArity;
+ private final boolean includeSnapshotId;
public KeyValueWithLevelNoReusingSerializer(RowType keyType, RowType valueType) {
- super(KeyValue.schemaWithLevel(keyType, valueType));
+ this(keyType, valueType, false);
+ }
+ public KeyValueWithLevelNoReusingSerializer(
+ RowType keyType, RowType valueType, boolean includeSnapshotId) {
+ super(
+ includeSnapshotId
+ ? KeyValue.schemaWithLevelAndSnapshotId(keyType, valueType)
+ : KeyValue.schemaWithLevel(keyType, valueType));
this.keyArity = keyType.getFieldCount();
this.valueArity = valueType.getFieldCount();
+ this.includeSnapshotId = includeSnapshotId;
}
@Override
public InternalRow toRow(KeyValue kv) {
GenericRow meta = GenericRow.of(kv.sequenceNumber(), kv.valueKind().toByteValue());
- return join(join(join(kv.key(), meta), kv.value()), GenericRow.of(kv.level()));
+ InternalRow base = join(join(join(kv.key(), meta), kv.value()), GenericRow.of(kv.level()));
+ if (includeSnapshotId) {
+ return join(base, GenericRow.of(kv.snapshotId()));
+ }
+ return base;
}
@Override
public KeyValue fromRow(InternalRow row) {
- return new KeyValue()
- .replace(
- new OffsetRow(keyArity, 0).replace(row),
- row.getLong(keyArity),
- RowKind.fromByteValue(row.getByte(keyArity + 1)),
- new OffsetRow(valueArity, keyArity + 2).replace(row))
- .setLevel(row.getInt(keyArity + 2 + valueArity));
+ KeyValue kv =
+ new KeyValue()
+ .replace(
+ new OffsetRow(keyArity, 0).replace(row),
+ row.getLong(keyArity),
+ RowKind.fromByteValue(row.getByte(keyArity + 1)),
+ new OffsetRow(valueArity, keyArity + 2).replace(row))
+ .setLevel(row.getInt(keyArity + 2 + valueArity));
+ if (includeSnapshotId) {
+ kv.setSnapshotId(row.getLong(keyArity + 2 + valueArity + 1));
+ }
+ return kv;
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
index 02ea7309a7ff..826122a787c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
@@ -314,7 +314,8 @@ public TestRewriter(
DeduplicateMergeFunction.factory(),
mergeSorter,
true,
- true);
+ true,
+ false);
this.rewriteChangelog = rewriteChangelog;
this.closeWithException = closeWithException;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 57d99557ca5f..307796802495 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -72,7 +72,8 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
null,
- null);
+ null,
+ false);
// Without level-0
function.reset();
@@ -235,7 +236,8 @@ public void testDeduplicateWithIgnoreFields() {
logDedupEqualSupplier.get(),
LookupStrategy.from(false, true, false, false),
null,
- userDefinedSeqComparator);
+ userDefinedSeqComparator,
+ false);
// With level-0 'insert' record, with level-x (x > 0) same record. Notice that the specified
// ignored
@@ -298,7 +300,8 @@ public void testSum(boolean changelogRowDeduplicate) {
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
null,
- null);
+ null,
+ false);
// Without level-0
function.reset();
@@ -392,7 +395,8 @@ public void testMergeHighLevelOrder() {
null,
UserDefinedSeqComparator.create(
RowType.builder().field("f0", DataTypes.INT()).build(),
- CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f0"))));
+ CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f0"))),
+ false);
// Only level-0 record and find record of higher sequence.field value in high level.
highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT, row(3)).setLevel(3));
@@ -528,7 +532,8 @@ public void testKeepLowestHighLevel() {
null,
LookupStrategy.from(false, true, false, false),
null,
- null);
+ null,
+ false);
// Without level-0
function.reset();
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 07406baac56d..9e41400fb298 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -385,7 +385,8 @@ public void testCompactWithKeepDelete() throws Exception {
keyComparator,
null,
DeduplicateMergeFunction.factory(),
- mergeSorter);
+ mergeSorter,
+ false);
ExecutorService executor = Executors.newSingleThreadExecutor();
MergeTreeCompactManager manager =
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 83c9cd54ebe8..92f3146b07eb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -45,7 +45,8 @@ protected RecordReader createRecordReader(
KEY_COMPARATOR,
null,
new ReducerMergeFunctionWrapper(createMergeFunction()),
- sortEngine);
+ sortEngine,
+ false);
}
@ParameterizedTest
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
new file mode 100644
index 000000000000..9ac1b257b972
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for snapshot-ordering tiebreaker in sort-merge readers. */
+public class SortMergeSnapshotOrderingTest {
+
+ private static final Comparator KEY_COMPARATOR =
+ (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+ private static final RowType VALUE_TYPE = RowType.of(DataTypes.INT());
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testLaterSnapshotWinsOverHigherSequence(SortEngine sortEngine) throws IOException {
+ KeyValue winner = merge(sortEngine, kv(1, 100, 5, 999), kv(1, 50, 6, 1));
+ assertThat(winner.value().getInt(0)).isEqualTo(1);
+ assertThat(winner.snapshotId()).isEqualTo(6);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testFallsBackToSequenceWhenSnapshotMissing(SortEngine sortEngine)
+ throws IOException {
+ KeyValue winner =
+ merge(
+ sortEngine,
+ kv(1, 100, KeyValue.UNKNOWN_SNAPSHOT_ID, 100),
+ kv(1, 50, KeyValue.UNKNOWN_SNAPSHOT_ID, 50));
+ assertThat(winner.value().getInt(0)).isEqualTo(100);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testSameSnapshotFallsBackToSequence(SortEngine sortEngine) throws IOException {
+ KeyValue winner = merge(sortEngine, kv(1, 100, 7, 999), kv(1, 50, 7, 1));
+ assertThat(winner.value().getInt(0)).isEqualTo(999);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testStampedAlwaysBeatsUnstamped(SortEngine sortEngine) throws IOException {
+ KeyValue winner =
+ merge(sortEngine, kv(1, 999, KeyValue.UNKNOWN_SNAPSHOT_ID, 999), kv(1, 1, 1, 1));
+ assertThat(winner.value().getInt(0)).isEqualTo(1);
+ assertThat(winner.snapshotId()).isEqualTo(1);
+ }
+
+ private static KeyValue kv(int key, long seq, long snapshotId, int value) {
+ return new KeyValue()
+ .replace(GenericRow.of(key), seq, RowKind.INSERT, GenericRow.of(value))
+ .setSnapshotId(snapshotId);
+ }
+
+ private static KeyValue merge(SortEngine sortEngine, KeyValue... kvs) throws IOException {
+ List> readers = new ArrayList<>();
+ for (KeyValue kv : kvs) {
+ readers.add(new SingleKvReader(kv));
+ }
+
+ MergeFunctionWrapper wrapper =
+ new ReducerMergeFunctionWrapper(DeduplicateMergeFunction.factory().create());
+
+ RecordReader reader =
+ SortMergeReader.createSortMergeReader(
+ readers, KEY_COMPARATOR, null, wrapper, sortEngine, true);
+
+ RecordReader.RecordIterator batch = reader.readBatch();
+ assertThat(batch).isNotNull();
+ KeyValue result = batch.next();
+ assertThat(result).isNotNull();
+ assertThat(batch.next()).isNull();
+ batch.releaseBatch();
+ reader.close();
+ return result;
+ }
+
+ private static class SingleKvReader implements RecordReader {
+ private KeyValue kv;
+
+ SingleKvReader(KeyValue kv) {
+ this.kv = kv;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() {
+ if (kv == null) {
+ return null;
+ }
+ KeyValue toReturn = kv;
+ kv = null;
+ return new RecordIterator() {
+ private boolean returned = false;
+
+ @Nullable
+ @Override
+ public KeyValue next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return toReturn;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index cec075724f47..56077d611e8e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -444,4 +444,43 @@ private void validateTableSchemaWithMapField(Map options) {
validateTableSchema(
new TableSchema(1, fields, 10, emptyList(), singletonList("f1"), options, ""));
}
+
+ @Test
+ public void testSnapshotSequenceOrderingHappyPath() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ assertThatNoException().isThrownBy(() -> validateTableSchemaExec(options));
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsSequenceField() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining("sequence.field");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsNonPkTable() {
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()));
+ Map options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessageContaining("primary-key");
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 9401cce83275..e86f4a979b45 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -2672,4 +2672,292 @@ protected FileStoreTable createFileStoreTable(Consumer configure, RowTy
""));
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema);
}
+
+ @Test
+ public void testSnapshotSequenceOrdering() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) many times so that the per-record sequence number is high.
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(1, 10, 999L));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) once with a lower value. Because the snapshot id (2)
+ // is larger than snapshot 1, this record should win even though its per-record sequence
+ // number is much lower.
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingFallsBackToSequenceWithinSnapshot() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Within a single snapshot, sequence number is the tiebreaker. The later write (999)
+ // gets a higher sequence number and should win.
+ write.write(rowData(1, 10, 1L));
+ write.write(rowData(1, 10, 999L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|999");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingCompactionPreservesInputSnapshotId() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) with val=200 (this should win after compaction)
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write a DIFFERENT key pk=(1,20)
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Snapshot 4: compact (processes files from snapshots 1+2+3)
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ write.close();
+ commit.close();
+
+ List splits = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ for (DataFileMeta file : split.dataFiles()) {
+ // The compacted file's minSequenceNumber should reflect the min snapshot id
+ // of records inside (from per-record _SEQUENCE_NUMBER values written during
+ // compaction), NOT the compaction commit's snapshot id (4).
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Compacted file %s should have minSequenceNumber from per-record "
+ + "snapshot ids, not the compaction commit's snapshot id",
+ file.fileName())
+ .isLessThanOrEqualTo(3);
+ }
+ }
+
+ // Also verify the read result is correct
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, toSplits(splits), toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|200", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingCompactionNoOrderingReversal() throws Exception {
+ // Reproduces the scenario from the PR review: compaction of files from
+ // snapshot 1 and 3 must NOT cause records from snapshot 1 to win over
+ // an uncompacted file from snapshot 2.
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write SAME key pk=(1,10) with val=200 — this should win
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write DIFFERENT key pk=(1,20) with val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Compact all files — after compaction, pk=(1,10) from snapshot 1 gets merged
+ // with pk=(1,20) from snapshot 3 into one output file. The key question is:
+ // does pk=(1,10) in the compacted file still correctly lose to snapshot 2's
+ // version of the same key?
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // Write pk=(1,10) again with val=999 — snapshot 5 should definitely win
+ write.write(rowData(1, 10, 999L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ write.close();
+ commit.close();
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ // pk=(1,10): snapshot 5 (val=999) wins over snapshot 2 (val=200) and snapshot 1 (val=100)
+ // pk=(1,20): snapshot 3 (val=300) is the only version
+ assertThat(result).containsExactlyInAnyOrder("1|10|999", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingMultiRoundCompaction() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: pk=(1,10) val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: pk=(1,10) val=200 — should win over snapshot 1
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: pk=(1,20) val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // First compaction (snapshot 4)
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ // Snapshot 5: pk=(1,10) val=500 — should win over everything
+ write.write(rowData(1, 10, 500L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ // Snapshot 6: pk=(1,30) val=600
+ write.write(rowData(1, 30, 600L));
+ commit.commit(5, write.prepareCommit(false, 5));
+
+ // Second compaction (snapshot 7) — first compaction's output is now input
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(6, write.prepareCommit(true, 6));
+
+ write.close();
+ commit.close();
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|500", "1|20|300", "1|30|600");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogInput() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CHANGELOG_PRODUCER, ChangelogProducer.INPUT);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogLookup() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true);
+ conf.set(CHANGELOG_PRODUCER, LOOKUP);
+ });
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingDeleteFromLaterSnapshot() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, true));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List splits = toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List result = getResult(read, splits, toString);
+ assertThat(result).isEmpty();
+
+ write.close();
+ commit.close();
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializerSnapshotIdTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializerSnapshotIdTest.java
new file mode 100644
index 000000000000..bc45574102e8
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializerSnapshotIdTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KeyValueWithLevelNoReusingSerializer} with snapshotId. */
+class KeyValueWithLevelNoReusingSerializerSnapshotIdTest {
+
+ private static final RowType KEY_TYPE =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {DataTypes.INT()}, new String[] {"k0"});
+ private static final RowType VALUE_TYPE =
+ RowType.of(
+ new org.apache.paimon.types.DataType[] {DataTypes.INT()}, new String[] {"v0"});
+
+ @Test
+ void testRoundTripWithSnapshotId() throws Exception {
+ KeyValueWithLevelNoReusingSerializer serializer =
+ new KeyValueWithLevelNoReusingSerializer(KEY_TYPE, VALUE_TYPE, true);
+
+ KeyValue kv =
+ new KeyValue()
+ .replace(GenericRow.of(1), 42L, RowKind.INSERT, GenericRow.of(100))
+ .setLevel(3)
+ .setSnapshotId(7L);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(kv, out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ KeyValue deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized.key().getInt(0)).isEqualTo(1);
+ assertThat(deserialized.sequenceNumber()).isEqualTo(42L);
+ assertThat(deserialized.valueKind()).isEqualTo(RowKind.INSERT);
+ assertThat(deserialized.value().getInt(0)).isEqualTo(100);
+ assertThat(deserialized.level()).isEqualTo(3);
+ assertThat(deserialized.snapshotId()).isEqualTo(7L);
+ }
+
+ @Test
+ void testRoundTripWithoutSnapshotId() throws Exception {
+ KeyValueWithLevelNoReusingSerializer serializer =
+ new KeyValueWithLevelNoReusingSerializer(KEY_TYPE, VALUE_TYPE, false);
+
+ KeyValue kv =
+ new KeyValue()
+ .replace(GenericRow.of(1), 42L, RowKind.INSERT, GenericRow.of(100))
+ .setLevel(3)
+ .setSnapshotId(7L);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(kv, out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ KeyValue deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized.level()).isEqualTo(3);
+ assertThat(deserialized.snapshotId()).isEqualTo(KeyValue.UNKNOWN_SNAPSHOT_ID);
+ }
+}