Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ jobs:
triage:
runs-on: ubuntu-slim
steps:
- uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # v6
- uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # v6.0.1
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolves the zizmor ref-version-mismatch finding that was failing the workflows lint job.

with:
sync-labels: true
128 changes: 93 additions & 35 deletions core/src/main/java/org/apache/iceberg/SnapshotChanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,29 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

/**
* Helper class for retrieving file changes in a snapshot with caching.
* Helper class for retrieving file changes across one or more snapshots, with caching.
*
* <p>This class caches the results of file change detection operations, making it efficient to
* query multiple file change types for the same snapshot. By default, manifests are read
* single-threaded. Use {@link Builder#executeWith(ExecutorService)} to enable parallel manifest
* reading.
* query multiple file change types for the same set of snapshots. The accessors return the union of
* changes across all configured snapshots.
*
* <p>By default, manifests are read single-threaded when only one snapshot is configured. When more
* than one snapshot is configured the shared {@link ThreadPools#getWorkerPool()} is used so that
* manifest reads are parallelised across snapshot boundaries. Use {@link
* Builder#executeWith(ExecutorService)} to supply a custom executor in either case.
*
* <p>Each manifest is attributed to exactly one snapshot via {@link ManifestFile#snapshotId()}, so
* the multi-snapshot path never reads the same manifest twice even when the configured snapshots
* share an ancestor chain.
*/
public class SnapshotChanges {
private final Snapshot snapshot;
private final List<Snapshot> snapshots;
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ExecutorService executorService;
Expand All @@ -52,14 +62,18 @@ public class SnapshotChanges {
private List<DeleteFile> removedDeleteFiles = null;

private SnapshotChanges(
Snapshot snapshot,
List<Snapshot> snapshots,
FileIO io,
Map<Integer, PartitionSpec> specsById,
ExecutorService executorService) {
Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null");
Preconditions.checkArgument(snapshots != null, "Snapshots cannot be null");
Preconditions.checkArgument(!snapshots.isEmpty(), "Snapshots cannot be empty");
Preconditions.checkArgument(io != null, "FileIO cannot be null");
Preconditions.checkArgument(specsById != null, "Partition specs cannot be null");
this.snapshot = snapshot;
for (Snapshot snapshot : snapshots) {
Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null");
}
this.snapshots = ImmutableList.copyOf(snapshots);
this.io = io;
this.specsById = specsById;
this.executorService = executorService;
Expand All @@ -72,22 +86,42 @@ private SnapshotChanges(
* @return a new Builder
*/
public static Builder builderFor(Table table) {
return new Builder(table.currentSnapshot(), table.io(), table.specs());
Builder builder = new Builder(table.io(), table.specs());
if (table.currentSnapshot() != null) {
builder.snapshot(table.currentSnapshot());
}
return builder;
}

/**
* Create a builder for SnapshotChanges over a fixed set of snapshots.
*
* @param table the table the snapshots belong to (used for {@link FileIO} and partition specs)
* @param snapshots the snapshots to detect file changes for; must be non-empty
* @return a new Builder
*/
public static Builder builderFor(Table table, Iterable<Snapshot> snapshots) {
return new Builder(table.io(), table.specs()).snapshots(snapshots);
}

static Builder builderFor(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
return new Builder(snapshot, io, specsById);
return new Builder(io, specsById).snapshot(snapshot);
}

private <T> CloseableIterable<T> iterate(Iterable<CloseableIterable<T>> tasks) {
if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else if (snapshots.size() > 1) {
// Multi-snapshot mode defaults to the shared worker pool to keep manifest reads
// saturated across snapshot boundaries. Single-snapshot mode keeps the historical
// serial-by-default behaviour to avoid surprising existing callers.
return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool());
} else {
return CloseableIterable.concat(tasks);
}
}

/** Returns all data files added to the table in this snapshot */
/** Returns all data files added across the configured snapshots. */
public Iterable<DataFile> addedDataFiles() {
if (addedDataFiles == null) {
cacheDataFileChanges();
Expand All @@ -96,7 +130,7 @@ public Iterable<DataFile> addedDataFiles() {
return addedDataFiles;
}

/** Returns all data files removed from the table in this snapshot. */
/** Returns all data files removed across the configured snapshots. */
public Iterable<DataFile> removedDataFiles() {
if (removedDataFiles == null) {
cacheDataFileChanges();
Expand All @@ -105,7 +139,7 @@ public Iterable<DataFile> removedDataFiles() {
return removedDataFiles;
}

/** Returns all delete files added to the table in this snapshot. */
/** Returns all delete files added across the configured snapshots. */
public Iterable<DeleteFile> addedDeleteFiles() {
if (addedDeleteFiles == null) {
cacheDeleteFileChanges();
Expand All @@ -114,7 +148,7 @@ public Iterable<DeleteFile> addedDeleteFiles() {
return addedDeleteFiles;
}

/** Returns all delete files removed from the table in this snapshot. */
/** Returns all delete files removed across the configured snapshots. */
public Iterable<DeleteFile> removedDeleteFiles() {
if (removedDeleteFiles == null) {
cacheDeleteFileChanges();
Expand All @@ -127,13 +161,17 @@ private void cacheDataFileChanges() {
ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();

Iterable<ManifestFile> relevantDataManifests =
Iterables.filter(
snapshot.dataManifests(io),
manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));

Iterable<CloseableIterable<Pair<ManifestEntry.Status, DataFile>>> manifestReadTasks =
Iterables.transform(relevantDataManifests, this::readDataManifest);
Iterables.concat(
Iterables.transform(
snapshots,
snapshot ->
Iterables.transform(
Iterables.filter(
snapshot.dataManifests(io),
manifest ->
Objects.equals(manifest.snapshotId(), snapshot.snapshotId())),
this::readDataManifest)));

try (CloseableIterable<Pair<ManifestEntry.Status, DataFile>> changedDataFiles =
iterate(manifestReadTasks)) {
Expand Down Expand Up @@ -178,13 +216,17 @@ private void cacheDeleteFileChanges() {
ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();

Iterable<ManifestFile> relevantDeleteManifests =
Iterables.filter(
snapshot.deleteManifests(io),
manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));

Iterable<CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>>> manifestReadTasks =
Iterables.transform(relevantDeleteManifests, this::readDeleteManifest);
Iterables.concat(
Iterables.transform(
snapshots,
snapshot ->
Iterables.transform(
Iterables.filter(
snapshot.deleteManifests(io),
manifest ->
Objects.equals(manifest.snapshotId(), snapshot.snapshotId())),
this::readDeleteManifest)));

try (CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> changedDeleteFiles =
iterate(manifestReadTasks)) {
Expand Down Expand Up @@ -226,30 +268,46 @@ private CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> readDeleteMani
}

public static class Builder {
private Snapshot snapshot;
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final List<Snapshot> snapshots = Lists.newArrayList();
private ExecutorService executorService = null;

private Builder(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
this.snapshot = snapshot;
private Builder(FileIO io, Map<Integer, PartitionSpec> specsById) {
this.io = io;
this.specsById = specsById;
}

/**
* Set the snapshot to detect file changes for, overriding the default.
* Set the snapshot to detect file changes for, replacing any previously configured snapshots.
*
* @param snapshot the snapshot to use
* @return this builder for method chaining
*/
public Builder snapshot(Snapshot snapshot) {
this.snapshots.clear();
this.snapshots.add(snapshot);
return this;
}

/**
* Set the snapshots to detect file changes for, replacing any previously configured snapshots.
* The accessors on the resulting {@link SnapshotChanges} return the union of changes across all
* of these snapshots.
*
* @param snapshotOverride the snapshot to use
* @param newSnapshots the snapshots to use; must be non-empty
* @return this builder for method chaining
*/
public Builder snapshot(Snapshot snapshotOverride) {
this.snapshot = snapshotOverride;
public Builder snapshots(Iterable<Snapshot> newSnapshots) {
Preconditions.checkArgument(newSnapshots != null, "Snapshots cannot be null");
this.snapshots.clear();
Iterables.addAll(this.snapshots, newSnapshots);
return this;
}

/**
* Configure an executor service to use for parallel manifest reading.
* Configure an executor service to use for parallel manifest reading. When unset and more than
* one snapshot is configured, the shared {@link ThreadPools#getWorkerPool()} is used.
*
* @param executor the executor service to use for parallel execution
* @return this builder for method chaining
Expand All @@ -265,7 +323,7 @@ public Builder executeWith(ExecutorService executor) {
* @return a new SnapshotChanges instance
*/
public SnapshotChanges build() {
return new SnapshotChanges(snapshot, io, specsById, executorService);
return new SnapshotChanges(snapshots, io, specsById, executorService);
}
}
}
Loading