Skip to content
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,12 @@
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible values:<ul><li>"ascending": specifies sequence.field sort order is ascending.</li><li>"descending": specifies sequence.field sort order is descending.</li></ul></td>
</tr>
<tr>
<td><h5>sequence.snapshot-ordering</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When enabled, merge uses the commit snapshot id as the primary tiebreaker for primary-key conflicts: records from later snapshots always win, with the existing sequence number used as a secondary tiebreaker. Designed for multi-writer scenarios on the same primary-key table where wall-clock sequence numbers cannot be globally ordered. Mutually exclusive with sequence.field. Requires a primary-key table.</td>
</tr>
<tr>
<td><h5>sink.process-time-zone</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
18 changes: 18 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,20 @@ public InlineElement getDescription() {
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");

@Immutable
public static final ConfigOption<Boolean> SEQUENCE_SNAPSHOT_ORDERING =
key("sequence.snapshot-ordering")
.booleanType()
.defaultValue(false)
.withDescription(
"When enabled, merge uses the commit snapshot id as the primary "
+ "tiebreaker for primary-key conflicts: records from later "
+ "snapshots always win, with the existing sequence number used "
+ "as a secondary tiebreaker. Designed for multi-writer "
+ "scenarios on the same primary-key table where wall-clock "
+ "sequence numbers cannot be globally ordered. Mutually "
+ "exclusive with sequence.field. Requires a primary-key table.");

@Immutable
public static final ConfigOption<Boolean> AGGREGATION_REMOVE_RECORD_ON_DELETE =
key("aggregation.remove-record-on-delete")
Expand Down Expand Up @@ -3341,6 +3355,10 @@ public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}

public boolean snapshotSequenceOrdering() {
return options.get(SEQUENCE_SNAPSHOT_ORDERING);
}

public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
* ID = 2147483645.
* <li><code>_LEVEL</code>: Which LSM tree level does this key-value stay in. ID = 2147483644.
* <li><code>rowkind</code>: THw rowkind field in audit-log system tables. ID = 2147483643.
* <li><code>_SNAPSHOT_ID</code>: Commit snapshot id for snapshot-based merge ordering. ID =
* 2147483641.
* </ul>
*
* <p><b>Structured type fields</b>:
Expand Down Expand Up @@ -93,13 +95,17 @@ public class SpecialFields {
public static final DataField ROW_ID =
new DataField(Integer.MAX_VALUE - 5, "_ROW_ID", DataTypes.BIGINT().notNull());

public static final DataField SNAPSHOT_ID =
new DataField(Integer.MAX_VALUE - 6, "_SNAPSHOT_ID", DataTypes.BIGINT());

public static final Set<String> SYSTEM_FIELD_NAMES =
Stream.of(
SEQUENCE_NUMBER.name(),
VALUE_KIND.name(),
LEVEL.name(),
ROW_KIND.name(),
ROW_ID.name())
ROW_ID.name(),
SNAPSHOT_ID.name())
.collect(Collectors.toSet());

public static boolean isSystemField(int fieldId) {
Expand Down
35 changes: 34 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static org.apache.paimon.table.SpecialFields.LEVEL;
import static org.apache.paimon.table.SpecialFields.SEQUENCE_NUMBER;
import static org.apache.paimon.table.SpecialFields.SNAPSHOT_ID;
import static org.apache.paimon.table.SpecialFields.VALUE_KIND;

/**
Expand All @@ -43,6 +44,7 @@ public class KeyValue {

public static final long UNKNOWN_SEQUENCE = -1;
public static final int UNKNOWN_LEVEL = -1;
public static final long UNKNOWN_SNAPSHOT_ID = -1;

private InternalRow key;
// determined after written into memory table or read from file
Expand All @@ -51,6 +53,8 @@ public class KeyValue {
private InternalRow value;
// determined after read from file
private int level;
// determined after read from file; UNKNOWN_SNAPSHOT_ID if snapshot-ordering is not enabled
private long snapshotId;

public KeyValue replace(InternalRow key, RowKind valueKind, InternalRow value) {
return replace(key, UNKNOWN_SEQUENCE, valueKind, value);
Expand All @@ -63,6 +67,7 @@ public KeyValue replace(
this.valueKind = valueKind;
this.value = value;
this.level = UNKNOWN_LEVEL;
this.snapshotId = UNKNOWN_SNAPSHOT_ID;
return this;
}

Expand All @@ -89,6 +94,11 @@ public long sequenceNumber() {
return sequenceNumber;
}

public KeyValue setSequenceNumber(long sequenceNumber) {
this.sequenceNumber = sequenceNumber;
return this;
}

public RowKind valueKind() {
return valueKind;
}
Expand All @@ -110,6 +120,21 @@ public KeyValue setLevel(int level) {
return this;
}

public long snapshotId() {
return snapshotId;
}

public KeyValue setSnapshotId(long snapshotId) {
this.snapshotId = snapshotId;
return this;
}

public static int compareSnapshotId(KeyValue a, KeyValue b) {
long sa = a.snapshotId == UNKNOWN_SNAPSHOT_ID ? Long.MIN_VALUE : a.snapshotId;
long sb = b.snapshotId == UNKNOWN_SNAPSHOT_ID ? Long.MIN_VALUE : b.snapshotId;
return Long.compare(sa, sb);
}

public static RowType schema(RowType keyType, RowType valueType) {
return new RowType(false, createKeyValueFields(keyType.getFields(), valueType.getFields()));
}
Expand All @@ -120,6 +145,13 @@ public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
return new RowType(fields);
}

public static RowType schemaWithLevelAndSnapshotId(RowType keyType, RowType valueType) {
List<DataField> fields = new ArrayList<>(schema(keyType, valueType).getFields());
fields.add(LEVEL);
fields.add(SNAPSHOT_ID);
return new RowType(fields);
}

/**
* Create key-value fields.
*
Expand Down Expand Up @@ -173,7 +205,8 @@ public KeyValue copy(
sequenceNumber,
valueKind,
valueSerializer.copy(value))
.setLevel(level);
.setLevel(level)
.setSnapshotId(snapshotId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,15 @@ static DataFileMeta create(

SimpleStats valueStats();

/**
* Minimum sequence number of records in this file. When {@code sequence.snapshot-ordering} is
* enabled for a primary-key table, this field is repurposed to carry the commit snapshot id
* instead of the per-record sequence number range (see {@code
* FileStoreCommitImpl.assignSnapshotSequenceOrdering}).
*/
long minSequenceNumber();

/** @see #minSequenceNumber() */
long maxSequenceNumber();

long schemaId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,21 @@ public class KeyValueDataFileRecordReader implements FileRecordReader<KeyValue>
private final FileRecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final long snapshotId;
private final boolean recoverSnapshotIdFromSequence;

public KeyValueDataFileRecordReader(
FileRecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
FileRecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
long snapshotId,
boolean recoverSnapshotIdFromSequence) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.snapshotId = snapshotId;
this.recoverSnapshotIdFromSequence = recoverSnapshotIdFromSequence;
}

@Nullable
Expand All @@ -53,10 +62,18 @@ public FileRecordIterator<KeyValue> readBatch() throws IOException {
}

return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
internalRow -> {
if (internalRow == null) {
return null;
}
KeyValue kv = serializer.fromRow(internalRow).setLevel(level);
if (recoverSnapshotIdFromSequence) {
kv.setSnapshotId(kv.sequenceNumber());
} else {
kv.setSnapshotId(snapshotId);
}
return kv;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final long asyncThreshold;
private final boolean ignoreCorruptFiles;
private final boolean ignoreLostFiles;
private final boolean snapshotSequenceOrdering;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
Expand All @@ -93,6 +95,7 @@ protected KeyValueFileReaderFactory(
this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes();
this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
Expand Down Expand Up @@ -168,7 +171,24 @@ private FileRecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
// When snapshot-ordering is enabled, minSequenceNumber carries the commit snapshot id
// (stamped by FileStoreCommitImpl.stampSequenceWithSnapshotId at commit time).
// For compaction output files, each record's sequenceNumber already contains its
// snapshotId, so we recover per-record snapshotId from sequenceNumber instead of
// using a uniform file-level stamp.
boolean recoverSnapshotIdFromSequence =
snapshotSequenceOrdering
&& file.fileSource().isPresent()
&& file.fileSource().get() == FileSource.COMPACT;
long snapshotId =
snapshotSequenceOrdering ? file.minSequenceNumber() : KeyValue.UNKNOWN_SNAPSHOT_ID;
return new KeyValueDataFileRecordReader(
fileRecordReader,
keyType,
valueType,
file.level(),
snapshotId,
recoverSnapshotIdFromSequence);
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class MergeSorter {
private final SortEngine sortEngine;
private final int spillThreshold;
private final CompressOptions compression;
private final boolean snapshotSequenceOrdering;

private final CachelessSegmentPool memoryPool;

Expand All @@ -77,6 +78,7 @@ public MergeSorter(
@Nullable IOManager ioManager) {
this.sortEngine = options.sortEngine();
this.spillThreshold = options.sortSpillThreshold();
this.snapshotSequenceOrdering = options.snapshotSequenceOrdering();
this.compression = options.spillCompressOptions();
this.keyType = keyType;
this.valueType = valueType;
Expand Down Expand Up @@ -142,7 +144,12 @@ public <T> RecordReader<T> mergeSortNoSpill(
}

return SortMergeReader.createSortMergeReader(
readers, keyComparator, userDefinedSeqComparator, mergeFunction, sortEngine);
readers,
keyComparator,
userDefinedSeqComparator,
mergeFunction,
sortEngine,
snapshotSequenceOrdering);
}

private <T> RecordReader<T> spillMergeSort(
Expand Down Expand Up @@ -170,7 +177,8 @@ private ReaderSupplier<KeyValue> spill(ReaderSupplier<KeyValue> readerSupplier)

FileIOChannel.ID channel = ioManager.createChannel();
KeyValueWithLevelNoReusingSerializer serializer =
new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
new KeyValueWithLevelNoReusingSerializer(
keyType, valueType, snapshotSequenceOrdering);
BlockCompressionFactory compressFactory = BlockCompressionFactory.create(compression);
int compressBlock = (int) MemorySize.parse("64 kb").getBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ public ChangelogMergeTreeRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
boolean produceChangelog,
boolean forceDropDelete) {
boolean forceDropDelete,
boolean snapshotSequenceOrdering) {
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter);
mergeSorter,
snapshotSequenceOrdering);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
this.produceChangelog = produceChangelog;
Expand Down Expand Up @@ -146,6 +148,9 @@ private CompactResult rewriteOrProduceChangelog(
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
if (snapshotSequenceOrdering) {
keyValue.setSequenceNumber(keyValue.snapshotId());
}
compactFileWriter.write(keyValue);
}
if (produceChangelog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public FullChangelogMergeTreeCompactRewriter(
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
@Nullable RecordEqualiser valueEqualiser) {
@Nullable RecordEqualiser valueEqualiser,
boolean snapshotSequenceOrdering) {
super(
maxLevel,
mergeEngine,
Expand All @@ -64,7 +65,8 @@ public FullChangelogMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
true,
false);
false,
snapshotSequenceOrdering);
this.valueEqualiser = valueEqualiser;
}

Expand Down
Loading