Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private static StaticDataTask.Row convertPartition(Partition partition) {

private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
PartitionData partitionDataTemplate = new PartitionData(partitionType);

StructLikeMap<Partition> partitions =
StructLikeMap.create(partitionType, new PartitionComparator(partitionType));
Expand All @@ -180,7 +181,7 @@ private static Iterable<Partition> partitions(Table table, StaticTableScan scan)
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()), file.partition());
partitions
.computeIfAbsent(key, () -> new Partition(key, partitionType))
.computeIfAbsent(key, () -> new Partition(key, partitionDataTemplate))
.update(file, snapshot);
}
} catch (IOException e) {
Expand Down Expand Up @@ -282,8 +283,8 @@ static class Partition {
private Long lastUpdatedAt;
private Long lastUpdatedSnapshotId;

Partition(StructLike key, Types.StructType keyType) {
this.partitionData = toPartitionData(key, keyType);
Partition(StructLike key, PartitionData partitionDataTemplate) {
this.partitionData = toPartitionData(key, partitionDataTemplate);
this.specId = 0;
this.dataRecordCount = 0L;
this.dataFileCount = 0;
Expand Down Expand Up @@ -326,9 +327,9 @@ void update(ContentFile<?> file, Snapshot snapshot) {
}

/** Needed because StructProjection is not serializable */
private static PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
PartitionData keyTemplate = new PartitionData(keyType);
return keyTemplate.copyFor(key);
private static PartitionData toPartitionData(
StructLike key, PartitionData partitionDataTemplate) {
return partitionDataTemplate.copyFor(key);
}
}
}
29 changes: 29 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -521,6 +523,33 @@ public void testPartitionsTableScanWithProjection() {
validateSingleFieldPartition(entries, 3);
}

@TestTemplate
public void testPartitionsTableReusesPartitionDataSchema() throws Exception {
preparePartitionedTable();

Table partitionsTable = new PartitionsTable(table);
StaticTableScan scan = (StaticTableScan) partitionsTable.newScan();
List<org.apache.avro.Schema> partitionSchemas = Lists.newArrayList();

Method partitionsMethod =
PartitionsTable.class.getDeclaredMethod("partitions", Table.class, StaticTableScan.class);
partitionsMethod.setAccessible(true);
@SuppressWarnings("unchecked")
Iterable<PartitionsTable.Partition> partitions =
(Iterable<PartitionsTable.Partition>) partitionsMethod.invoke(null, table, scan);
Field partitionDataField = PartitionsTable.Partition.class.getDeclaredField("partitionData");
partitionDataField.setAccessible(true);

for (PartitionsTable.Partition partition : partitions) {
PartitionData partitionData = (PartitionData) partitionDataField.get(partition);
partitionSchemas.add(partitionData.getSchema());
}

assertThat(partitionSchemas).hasSize(4);
org.apache.avro.Schema expectedSchema = partitionSchemas.get(0);
assertThat(partitionSchemas).allSatisfy(schema -> assertThat(schema).isSameAs(expectedSchema));
}

@TestTemplate
public void testPartitionsTableScanNoStats() {
table.newFastAppend().appendFile(FILE_WITH_STATS).commit();
Expand Down
Loading