Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static TableMetadata replacePaths(
snapshotId,
newSnapshots,
null,
null,
metadata.snapshotLog(),
metadataLogEntries,
metadata.refs(),
Expand Down
66 changes: 53 additions & 13 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,19 @@ public String toString() {
private final Map<Integer, Schema> schemasById;
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, SortOrder> sortOrdersById;
private final List<HistoryEntry> snapshotLog;
private volatile List<HistoryEntry> snapshotLog;
private final List<MetadataLogEntry> previousFiles;
private final List<StatisticsFile> statisticsFiles;
private final List<PartitionStatisticsFile> partitionStatisticsFiles;
private final List<MetadataUpdate> changes;
private final long nextRowId;
private final List<EncryptedKey> encryptionKeys;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
@Deprecated private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private SerializableSupplier<TableMetadata> deferredMetadataSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;
private volatile boolean deferredLoaded;

@SuppressWarnings("checkstyle:CyclomaticComplexity")
TableMetadata(
Expand All @@ -292,6 +293,7 @@ public String toString() {
long currentSnapshotId,
List<Snapshot> snapshots,
SerializableSupplier<List<Snapshot>> snapshotsSupplier,
SerializableSupplier<TableMetadata> deferredMetadataSupplier,
List<HistoryEntry> snapshotLog,
List<MetadataLogEntry> previousFiles,
Map<String, SnapshotRef> refs,
Expand Down Expand Up @@ -319,6 +321,9 @@ public String toString() {
metadataFileLocation == null || changes.isEmpty(),
"Cannot create TableMetadata with a metadata location and changes");
Preconditions.checkArgument(encryptionKeys != null, "Encryption keys cannot be null");
Preconditions.checkArgument(
snapshotsSupplier == null || deferredMetadataSupplier == null,
"Cannot set both snapshotsSupplier and deferredMetadataSupplier");

this.metadataFileLocation = metadataFileLocation;
this.formatVersion = formatVersion;
Expand All @@ -338,7 +343,8 @@ public String toString() {
this.currentSnapshotId = currentSnapshotId;
this.snapshots = snapshots;
this.snapshotsSupplier = snapshotsSupplier;
this.snapshotsLoaded = snapshotsSupplier == null;
this.deferredMetadataSupplier = deferredMetadataSupplier;
this.deferredLoaded = snapshotsSupplier == null && deferredMetadataSupplier == null;
this.snapshotLog = snapshotLog;
this.previousFiles = previousFiles;
this.encryptionKeys = encryptionKeys;
Expand Down Expand Up @@ -515,7 +521,7 @@ public long propertyAsLong(String property, long defaultValue) {

public Snapshot snapshot(long snapshotId) {
if (!snapshotsById.containsKey(snapshotId)) {
ensureSnapshotsLoaded();
ensureDeferredLoaded();
}

return snapshotsById.get(snapshotId);
Expand All @@ -526,14 +532,24 @@ public Snapshot currentSnapshot() {
}

public List<Snapshot> snapshots() {
ensureSnapshotsLoaded();
ensureDeferredLoaded();

return snapshots;
}

private synchronized void ensureSnapshotsLoaded() {
if (!snapshotsLoaded) {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
private synchronized void ensureDeferredLoaded() {
if (!deferredLoaded) {
List<Snapshot> loadedSnapshots;
if (deferredMetadataSupplier != null) {
TableMetadata fullMetadata = deferredMetadataSupplier.get();
loadedSnapshots = Lists.newArrayList(fullMetadata.snapshots());
this.snapshotLog = ImmutableList.copyOf(fullMetadata.snapshotLog());
this.deferredMetadataSupplier = null;
} else {
loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
this.snapshotsSupplier = null;
}

loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
Expand All @@ -542,8 +558,7 @@ private synchronized void ensureSnapshotsLoaded() {

this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);

this.snapshotsLoaded = true;
this.snapshotsSupplier = null;
this.deferredLoaded = true;
}
}

Expand All @@ -564,6 +579,7 @@ public List<PartitionStatisticsFile> partitionStatisticsFiles() {
}

public List<HistoryEntry> snapshotLog() {
ensureDeferredLoaded();
return snapshotLog;
}

Expand Down Expand Up @@ -913,6 +929,7 @@ public static class Builder {
private long currentSnapshotId;
private List<Snapshot> snapshots;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private SerializableSupplier<TableMetadata> deferredMetadataSupplier;
private final Map<String, SnapshotRef> refs;
private final Map<Long, List<StatisticsFile>> statisticsFiles;
private final Map<Long, List<PartitionStatisticsFile>> partitionStatisticsFiles;
Expand Down Expand Up @@ -993,7 +1010,7 @@ private Builder(TableMetadata base) {
this.changes = Lists.newArrayList(base.changes);
this.startingChangeCount = changes.size();

this.snapshotLog = Lists.newArrayList(base.snapshotLog);
this.snapshotLog = Lists.newArrayList(base.snapshotLog());
this.previousFileLocation = base.metadataFileLocation;
this.previousFiles = base.previousFiles;
this.refs = Maps.newHashMap(base.refs);
Expand Down Expand Up @@ -1278,11 +1295,31 @@ public Builder addSnapshot(Snapshot snapshot) {
return this;
}

/**
* Lazily loads table snapshots
*
* @param snapshotsSupplier supplier that lazily loads snapshots
* @return this for method chaining
* @deprecated use {@link #setDeferredMetadataSupplier(SerializableSupplier)} instead.
*/
@Deprecated
public Builder setSnapshotsSupplier(SerializableSupplier<List<Snapshot>> snapshotsSupplier) {
this.snapshotsSupplier = snapshotsSupplier;
return this;
}

/**
* Lazily loads complete table metadata
*
* @param deferredMetadataSupplier supplier that lazily loads complete TableMetadata
* @return this for method chaining
*/
public Builder setDeferredMetadataSupplier(
SerializableSupplier<TableMetadata> deferredMetadataSupplier) {
this.deferredMetadataSupplier = deferredMetadataSupplier;
return this;
}

public Builder setBranchSnapshot(Snapshot snapshot, String branch) {
addSnapshot(snapshot);
setBranchSnapshotInternal(snapshot, branch);
Expand Down Expand Up @@ -1385,6 +1422,7 @@ public Builder suppressHistoricalSnapshots() {
refs.values().stream().map(SnapshotRef::snapshotId).collect(Collectors.toSet());
Set<Long> suppressedSnapshotIds = Sets.difference(snapshotsById.keySet(), refSnapshotIds);
rewriteSnapshotsInternal(suppressedSnapshotIds, true);
this.snapshotLog.removeIf(entry -> !refSnapshotIds.contains(entry.snapshotId()));
return this;
}

Expand Down Expand Up @@ -1528,7 +1566,8 @@ private boolean hasChanges() {
|| (discardChanges && !changes.isEmpty())
|| metadataLocation != null
|| suppressHistoricalSnapshots
|| null != snapshotsSupplier;
|| null != snapshotsSupplier
|| null != deferredMetadataSupplier;
}

public TableMetadata build() {
Expand Down Expand Up @@ -1583,6 +1622,7 @@ public TableMetadata build() {
currentSnapshotId,
ImmutableList.copyOf(snapshots),
snapshotsSupplier,
deferredMetadataSupplier,
ImmutableList.copyOf(newSnapshotLog),
ImmutableList.copyOf(metadataHistory),
ImmutableMap.copyOf(refs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
currentSnapshotId,
snapshots,
null,
null,
entries.build(),
metadataEntries.build(),
refs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,10 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
TableMetadata.buildFrom(response.tableMetadata())
.withMetadataLocation(response.metadataLocation())
.setPreviousFileLocation(null)
.setSnapshotsSupplier(
.setDeferredMetadataSupplier(
() ->
loadInternal(context, finalIdentifier, SnapshotMode.ALL, Map.of(), h -> {})
.tableMetadata()
.snapshots())
.tableMetadata())
.discardChanges()
.build();
} else {
Expand Down
134 changes: 134 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotLoading.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,140 @@ public void testBuildingNewMetadataTriggersSnapshotLoad() {
verify(snapshotsSupplierMock, times(1)).get();
}

@TestTemplate
public void testDeferredMetadataSnapshotsAreLoadedOnce() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

deferred.snapshots();
deferred.snapshots();
deferred.snapshots();

verify(mock, times(1)).get();

assertThat(deferred.snapshots()).containsExactlyElementsOf(originalTableMetadata.snapshots());
}

@TestTemplate
public void testDeferredMetadataCurrentSnapshotDoesNotLoad() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

deferred.currentSnapshot();
deferred.snapshot(deferred.ref(SnapshotRef.MAIN_BRANCH).snapshotId());

verify(mock, times(0)).get();
}

@TestTemplate
public void testDeferredMetadataSnapshotLogLoadedOnce() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

deferred.snapshotLog();
deferred.snapshotLog();

verify(mock, times(1)).get();

assertThat(deferred.snapshotLog())
.containsExactlyElementsOf(originalTableMetadata.snapshotLog());
}

@TestTemplate
public void testDeferredMetadataSnapshotLogNotLoadedUntilAccessed() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

deferred.currentSnapshot();

verify(mock, times(0)).get();

deferred.snapshotLog();

verify(mock, times(1)).get();

assertThat(deferred.snapshotLog())
.containsExactlyElementsOf(originalTableMetadata.snapshotLog());
}

@TestTemplate
public void testDeferredMetadataSnapshotsLoadAlsoPopulatesSnapshotLog() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

deferred.snapshots();

verify(mock, times(1)).get();

deferred.snapshotLog();

verify(mock, times(1)).get();

assertThat(deferred.snapshotLog())
.containsExactlyElementsOf(originalTableMetadata.snapshotLog());
}

@TestTemplate
public void testDeferredMetadataBuildFromPreservesSnapshotLog() {
SerializableSupplier<TableMetadata> mock = mockDeferredMetadataSupplier();
TableMetadata deferred = deferredTableMetadata(mock);

TableMetadata rebuilt = TableMetadata.buildFrom(deferred).discardChanges().build();

verify(mock, times(1)).get();

assertThat(rebuilt.snapshotLog())
.containsExactlyElementsOf(originalTableMetadata.snapshotLog());
}

@TestTemplate
public void testSuppressHistoricalSnapshotsFiltersSnapshotLog() {
TableMetadata suppressed =
TableMetadata.buildFrom(originalTableMetadata)
.suppressHistoricalSnapshots()
.discardChanges()
.build();

assertThat(suppressed.snapshots()).hasSize(1);
assertThat(suppressed.snapshotLog()).hasSize(1);
assertThat(suppressed.snapshotLog().get(0).snapshotId())
.isEqualTo(currentSnapshot.snapshotId());
}

@TestTemplate
public void testCannotSetBothSuppliers() {
assertThatThrownBy(
() ->
TableMetadata.buildFrom(originalTableMetadata)
.setSnapshotsSupplier(ImmutableList::of)
.setDeferredMetadataSupplier(() -> originalTableMetadata)
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot set both snapshotsSupplier and deferredMetadataSupplier");
}

private SerializableSupplier<TableMetadata> mockDeferredMetadataSupplier() {
return Mockito.spy(
new SerializableSupplier<TableMetadata>() {
@Override
public TableMetadata get() {
return originalTableMetadata;
}
});
}

private TableMetadata deferredTableMetadata(
SerializableSupplier<TableMetadata> deferredMetadataSupplier) {
return TableMetadata.buildFrom(originalTableMetadata)
.removeSnapshots(
allSnapshots.stream()
.filter(Predicate.isEqual(currentSnapshot).negate())
.collect(Collectors.toList()))
.setDeferredMetadataSupplier(deferredMetadataSupplier)
.discardChanges()
.build();
}

private static class MetadataTableOperations implements TableOperations {
private final FileIO io;
private final TableMetadata currentMetadata;
Expand Down
Loading
Loading