diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml
index 16aac23a5683..3735367053ce 100644
--- a/.github/workflows/labeler.yml
+++ b/.github/workflows/labeler.yml
@@ -28,6 +28,6 @@ jobs:
triage:
runs-on: ubuntu-slim
steps:
- - uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # v6
+ - uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # v6.0.1
with:
sync-labels: true
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotChanges.java b/core/src/main/java/org/apache/iceberg/SnapshotChanges.java
index 38a81bb966c8..65ed9ff59336 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotChanges.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotChanges.java
@@ -29,19 +29,29 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.ThreadPools;
/**
- * Helper class for retrieving file changes in a snapshot with caching.
+ * Helper class for retrieving file changes across one or more snapshots, with caching.
*
*
This class caches the results of file change detection operations, making it efficient to
- * query multiple file change types for the same snapshot. By default, manifests are read
- * single-threaded. Use {@link Builder#executeWith(ExecutorService)} to enable parallel manifest
- * reading.
+ * query multiple file change types for the same set of snapshots. The accessors return the union of
+ * changes across all configured snapshots.
+ *
+ *
By default, manifests are read single-threaded when only one snapshot is configured. When more
+ * than one snapshot is configured the shared {@link ThreadPools#getWorkerPool()} is used so that
+ * manifest reads are parallelised across snapshot boundaries. Use {@link
+ * Builder#executeWith(ExecutorService)} to supply a custom executor in either case.
+ *
+ *
Each manifest is attributed to exactly one snapshot via {@link ManifestFile#snapshotId()}, so
+ * the multi-snapshot path never reads the same manifest twice even when the configured snapshots
+ * share an ancestor chain.
*/
public class SnapshotChanges {
- private final Snapshot snapshot;
+ private final List snapshots;
private final FileIO io;
private final Map specsById;
private final ExecutorService executorService;
@@ -52,14 +62,18 @@ public class SnapshotChanges {
private List removedDeleteFiles = null;
private SnapshotChanges(
- Snapshot snapshot,
+ List snapshots,
FileIO io,
Map specsById,
ExecutorService executorService) {
- Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null");
+ Preconditions.checkArgument(snapshots != null, "Snapshots cannot be null");
+ Preconditions.checkArgument(!snapshots.isEmpty(), "Snapshots cannot be empty");
Preconditions.checkArgument(io != null, "FileIO cannot be null");
Preconditions.checkArgument(specsById != null, "Partition specs cannot be null");
- this.snapshot = snapshot;
+ for (Snapshot snapshot : snapshots) {
+ Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null");
+ }
+ this.snapshots = ImmutableList.copyOf(snapshots);
this.io = io;
this.specsById = specsById;
this.executorService = executorService;
@@ -72,22 +86,42 @@ private SnapshotChanges(
* @return a new Builder
*/
public static Builder builderFor(Table table) {
- return new Builder(table.currentSnapshot(), table.io(), table.specs());
+ Builder builder = new Builder(table.io(), table.specs());
+ if (table.currentSnapshot() != null) {
+ builder.snapshot(table.currentSnapshot());
+ }
+ return builder;
+ }
+
+ /**
+ * Create a builder for SnapshotChanges over a fixed set of snapshots.
+ *
+ * @param table the table the snapshots belong to (used for {@link FileIO} and partition specs)
+ * @param snapshots the snapshots to detect file changes for; must be non-empty
+ * @return a new Builder
+ */
+ public static Builder builderFor(Table table, Iterable snapshots) {
+ return new Builder(table.io(), table.specs()).snapshots(snapshots);
}
static Builder builderFor(Snapshot snapshot, FileIO io, Map specsById) {
- return new Builder(snapshot, io, specsById);
+ return new Builder(io, specsById).snapshot(snapshot);
}
private CloseableIterable iterate(Iterable> tasks) {
if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
+ } else if (snapshots.size() > 1) {
+ // Multi-snapshot mode defaults to the shared worker pool to keep manifest reads
+ // saturated across snapshot boundaries. Single-snapshot mode keeps the historical
+ // serial-by-default behaviour to avoid surprising existing callers.
+ return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool());
} else {
return CloseableIterable.concat(tasks);
}
}
- /** Returns all data files added to the table in this snapshot */
+ /** Returns all data files added across the configured snapshots. */
public Iterable addedDataFiles() {
if (addedDataFiles == null) {
cacheDataFileChanges();
@@ -96,7 +130,7 @@ public Iterable addedDataFiles() {
return addedDataFiles;
}
- /** Returns all data files removed from the table in this snapshot. */
+ /** Returns all data files removed across the configured snapshots. */
public Iterable removedDataFiles() {
if (removedDataFiles == null) {
cacheDataFileChanges();
@@ -105,7 +139,7 @@ public Iterable removedDataFiles() {
return removedDataFiles;
}
- /** Returns all delete files added to the table in this snapshot. */
+ /** Returns all delete files added across the configured snapshots. */
public Iterable addedDeleteFiles() {
if (addedDeleteFiles == null) {
cacheDeleteFileChanges();
@@ -114,7 +148,7 @@ public Iterable addedDeleteFiles() {
return addedDeleteFiles;
}
- /** Returns all delete files removed from the table in this snapshot. */
+ /** Returns all delete files removed across the configured snapshots. */
public Iterable removedDeleteFiles() {
if (removedDeleteFiles == null) {
cacheDeleteFileChanges();
@@ -127,13 +161,17 @@ private void cacheDataFileChanges() {
ImmutableList.Builder adds = ImmutableList.builder();
ImmutableList.Builder deletes = ImmutableList.builder();
- Iterable relevantDataManifests =
- Iterables.filter(
- snapshot.dataManifests(io),
- manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));
-
Iterable>> manifestReadTasks =
- Iterables.transform(relevantDataManifests, this::readDataManifest);
+ Iterables.concat(
+ Iterables.transform(
+ snapshots,
+ snapshot ->
+ Iterables.transform(
+ Iterables.filter(
+ snapshot.dataManifests(io),
+ manifest ->
+ Objects.equals(manifest.snapshotId(), snapshot.snapshotId())),
+ this::readDataManifest)));
try (CloseableIterable> changedDataFiles =
iterate(manifestReadTasks)) {
@@ -178,13 +216,17 @@ private void cacheDeleteFileChanges() {
ImmutableList.Builder adds = ImmutableList.builder();
ImmutableList.Builder deletes = ImmutableList.builder();
- Iterable relevantDeleteManifests =
- Iterables.filter(
- snapshot.deleteManifests(io),
- manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));
-
Iterable>> manifestReadTasks =
- Iterables.transform(relevantDeleteManifests, this::readDeleteManifest);
+ Iterables.concat(
+ Iterables.transform(
+ snapshots,
+ snapshot ->
+ Iterables.transform(
+ Iterables.filter(
+ snapshot.deleteManifests(io),
+ manifest ->
+ Objects.equals(manifest.snapshotId(), snapshot.snapshotId())),
+ this::readDeleteManifest)));
try (CloseableIterable> changedDeleteFiles =
iterate(manifestReadTasks)) {
@@ -226,30 +268,46 @@ private CloseableIterable> readDeleteMani
}
public static class Builder {
- private Snapshot snapshot;
private final FileIO io;
private final Map specsById;
+ private final List snapshots = Lists.newArrayList();
private ExecutorService executorService = null;
- private Builder(Snapshot snapshot, FileIO io, Map specsById) {
- this.snapshot = snapshot;
+ private Builder(FileIO io, Map specsById) {
this.io = io;
this.specsById = specsById;
}
/**
- * Set the snapshot to detect file changes for, overriding the default.
+ * Set the snapshot to detect file changes for, replacing any previously configured snapshots.
+ *
+ * @param snapshot the snapshot to use
+ * @return this builder for method chaining
+ */
+ public Builder snapshot(Snapshot snapshot) {
+ this.snapshots.clear();
+ this.snapshots.add(snapshot);
+ return this;
+ }
+
+ /**
+ * Set the snapshots to detect file changes for, replacing any previously configured snapshots.
+ * The accessors on the resulting {@link SnapshotChanges} return the union of changes across all
+ * of these snapshots.
*
- * @param snapshotOverride the snapshot to use
+ * @param newSnapshots the snapshots to use; must be non-empty
* @return this builder for method chaining
*/
- public Builder snapshot(Snapshot snapshotOverride) {
- this.snapshot = snapshotOverride;
+ public Builder snapshots(Iterable newSnapshots) {
+ Preconditions.checkArgument(newSnapshots != null, "Snapshots cannot be null");
+ this.snapshots.clear();
+ Iterables.addAll(this.snapshots, newSnapshots);
return this;
}
/**
- * Configure an executor service to use for parallel manifest reading.
+ * Configure an executor service to use for parallel manifest reading. When unset and more than
+ * one snapshot is configured, the shared {@link ThreadPools#getWorkerPool()} is used.
*
* @param executor the executor service to use for parallel execution
* @return this builder for method chaining
@@ -265,7 +323,7 @@ public Builder executeWith(ExecutorService executor) {
* @return a new SnapshotChanges instance
*/
public SnapshotChanges build() {
- return new SnapshotChanges(snapshot, io, specsById, executorService);
+ return new SnapshotChanges(snapshots, io, specsById, executorService);
}
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotChanges.java b/core/src/test/java/org/apache/iceberg/TestSnapshotChanges.java
index 337fcedfda39..9d575f6302ff 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotChanges.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotChanges.java
@@ -22,7 +22,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -146,4 +158,166 @@ public void testSnapshotChangesCaching() {
// Both calls should return the same reference (cached)
assertThat(firstCallResult).isSameAs(secondCallResult);
}
+
+ @Test
+ public void testSingleSnapshotBackCompatThroughMultiSnapshotFactory() {
+ DataFile file = newDataFile("/path/to/single-snap.parquet");
+ table.newFastAppend().appendFile(file).commit();
+ Snapshot snapshot = table.currentSnapshot();
+
+ Iterable viaSingle =
+ SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles();
+ Iterable viaMulti =
+ SnapshotChanges.builderFor(table, ImmutableList.of(snapshot)).build().addedDataFiles();
+
+ assertThat(paths(viaMulti)).containsExactlyInAnyOrderElementsOf(paths(viaSingle));
+ assertThat(viaMulti).hasSize(1);
+ }
+
+ @Test
+ public void testMultiSnapshotUnionForDataAndDeleteFiles() {
+ DataFile fileA = newDataFile("/path/to/A.parquet");
+ DataFile fileB = newDataFile("/path/to/B.parquet");
+ DataFile fileC = newDataFile("/path/to/C.parquet");
+
+ table.newFastAppend().appendFile(fileA).commit();
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newFastAppend().appendFile(fileB).appendFile(fileC).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ table.newDelete().deleteFile(fileA).commit();
+ Snapshot snap3 = table.currentSnapshot();
+
+ SnapshotChanges union =
+ SnapshotChanges.builderFor(table, ImmutableList.of(snap1, snap2, snap3)).build();
+
+ assertThat(paths(union.addedDataFiles()))
+ .containsExactlyInAnyOrder(
+ fileA.path().toString(), fileB.path().toString(), fileC.path().toString());
+ assertThat(paths(union.removedDataFiles())).containsExactly(fileA.path().toString());
+ }
+
+ @Test
+ public void testMultiSnapshotUnionForDeleteFiles() {
+ DataFile fileA = newDataFile("/path/to/A.parquet");
+ table.newFastAppend().appendFile(fileA).commit();
+ Snapshot dataSnap = table.currentSnapshot();
+
+ DeleteFile delA = newDeleteFile("/path/to/A-deletes.parquet");
+ DeleteFile delB = newDeleteFile("/path/to/B-deletes.parquet");
+ table.newRowDelta().addDeletes(delA).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ table.newRowDelta().addDeletes(delB).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ SnapshotChanges union =
+ SnapshotChanges.builderFor(table, ImmutableList.of(dataSnap, snap1, snap2)).build();
+
+ assertThat(paths(union.addedDeleteFiles()))
+ .containsExactlyInAnyOrder(delA.path().toString(), delB.path().toString());
+ }
+
+ @Test
+ public void testMultiSnapshotCachingReturnsSameInstance() {
+ DataFile fileA = newDataFile("/path/to/A.parquet");
+ DataFile fileB = newDataFile("/path/to/B.parquet");
+
+ table.newFastAppend().appendFile(fileA).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ table.newFastAppend().appendFile(fileB).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ SnapshotChanges changes =
+ SnapshotChanges.builderFor(table, ImmutableList.of(snap1, snap2)).build();
+
+ Iterable first = changes.addedDataFiles();
+ Iterable second = changes.addedDataFiles();
+ assertThat(first).isSameAs(second);
+ assertThat(paths(first))
+ .containsExactlyInAnyOrder(fileA.path().toString(), fileB.path().toString());
+ }
+
+ @Test
+ public void testWithCustomExecutor() throws Exception {
+ DataFile fileA = newDataFile("/path/to/A.parquet");
+ DataFile fileB = newDataFile("/path/to/B.parquet");
+ DataFile fileC = newDataFile("/path/to/C.parquet");
+
+ table.newFastAppend().appendFile(fileA).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ table.newFastAppend().appendFile(fileB).commit();
+ Snapshot snap2 = table.currentSnapshot();
+ table.newFastAppend().appendFile(fileC).commit();
+ Snapshot snap3 = table.currentSnapshot();
+
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ try {
+ SnapshotChanges changes =
+ SnapshotChanges.builderFor(table, ImmutableList.of(snap1, snap2, snap3))
+ .executeWith(executor)
+ .build();
+
+ assertThat(paths(changes.addedDataFiles()))
+ .containsExactlyInAnyOrder(
+ fileA.path().toString(), fileB.path().toString(), fileC.path().toString());
+ } finally {
+ executor.shutdownNow();
+ assertThat(executor.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testEquivalenceWithDeprecatedNewFilesBetween() throws Exception {
+ DataFile fileA = newDataFile("/path/to/A.parquet");
+ DataFile fileB = newDataFile("/path/to/B.parquet");
+ DataFile fileC = newDataFile("/path/to/C.parquet");
+
+ table.newFastAppend().appendFile(fileA).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ table.newFastAppend().appendFile(fileB).commit();
+ table.newFastAppend().appendFile(fileC).commit();
+ Snapshot snap3 = table.currentSnapshot();
+
+ Set viaDeprecated = Sets.newHashSet();
+ try (CloseableIterable deprecated =
+ SnapshotUtil.newFilesBetween(
+ snap1.snapshotId(), snap3.snapshotId(), table::snapshot, table.io())) {
+ for (DataFile f : deprecated) {
+ viaDeprecated.add(f.path().toString());
+ }
+ }
+
+ List ancestorsAfterSnap1 =
+ Lists.newArrayList(
+ SnapshotUtil.ancestorsBetween(snap3.snapshotId(), snap1.snapshotId(), table::snapshot));
+
+ SnapshotChanges changes = SnapshotChanges.builderFor(table, ancestorsAfterSnap1).build();
+
+ assertThat(paths(changes.addedDataFiles())).containsExactlyInAnyOrderElementsOf(viaDeprecated);
+ }
+
+ private static DataFile newDataFile(String path) {
+ return DataFiles.builder(SPEC)
+ .withPath(path)
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static DeleteFile newDeleteFile(String path) {
+ return FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath(path)
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static Set paths(Iterable extends ContentFile>> files) {
+ return StreamSupport.stream(files.spliterator(), false)
+ .map(f -> f.path().toString())
+ .collect(Collectors.toSet());
+ }
}