diff --git a/core/src/main/java/org/apache/iceberg/ManifestInfo.java b/core/src/main/java/org/apache/iceberg/ManifestInfo.java index e87287911426..826c5c8f6995 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestInfo.java +++ b/core/src/main/java/org/apache/iceberg/ManifestInfo.java @@ -19,6 +19,8 @@ package org.apache.iceberg; import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iceberg.ManifestFile.PartitionFieldSummary; import org.apache.iceberg.types.Types; /** Summary information about a manifest referenced by a root manifest entry. */ @@ -53,6 +55,28 @@ interface ManifestInfo { "min_sequence_number", Types.LongType.get(), "Minimum sequence number of files in this manifest"); + Types.StructType PARTITION_SUMMARY_TYPE = + Types.StructType.of( + Types.NestedField.required( + 509, + "contains_null", + Types.BooleanType.get(), + "True if any file has a null partition value"), + Types.NestedField.optional( + 518, + "contains_nan", + Types.BooleanType.get(), + "True if any file has a nan partition value"), + Types.NestedField.optional( + 510, "lower_bound", Types.BinaryType.get(), "Partition lower bound for all files"), + Types.NestedField.optional( + 511, "upper_bound", Types.BinaryType.get(), "Partition upper bound for all files")); + Types.NestedField PARTITION_SUMMARIES = + Types.NestedField.optional( + 507, + "partitions", + Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE), + "Summary for each partition"); Types.NestedField DV = Types.NestedField.optional( 522, "dv", Types.BinaryType.get(), "Deletion vector for manifest entries"); @@ -74,6 +98,7 @@ static Types.StructType schema() { DELETED_ROWS_COUNT, REPLACED_ROWS_COUNT, MIN_SEQUENCE_NUMBER, + PARTITION_SUMMARIES, DV, DV_CARDINALITY); } @@ -105,6 +130,9 @@ static Types.StructType schema() { /** Returns the minimum sequence number of files in this manifest. */ long minSequenceNumber(); + /** Returns a list of {@link PartitionFieldSummary partition field summaries}, or null. */ + List partitions(); + /** Returns the deletion vector bitmap, or null if not present. */ ByteBuffer dv(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java b/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java index 922047bffedd..35914142bdd1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java +++ b/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java @@ -21,6 +21,9 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; +import org.apache.iceberg.ManifestFile.PartitionFieldSummary; import org.apache.iceberg.avro.SupportsIndexProjection; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -40,6 +43,7 @@ class ManifestInfoStruct extends SupportsIndexProjection implements ManifestInfo ManifestInfo.DELETED_ROWS_COUNT, ManifestInfo.REPLACED_ROWS_COUNT, ManifestInfo.MIN_SEQUENCE_NUMBER, + ManifestInfo.PARTITION_SUMMARIES, ManifestInfo.DV, ManifestInfo.DV_CARDINALITY); @@ -52,6 +56,7 @@ class ManifestInfoStruct extends SupportsIndexProjection implements ManifestInfo private long deletedRowsCount = -1L; private long replacedRowsCount = -1L; private long minSequenceNumber = -1L; + private PartitionFieldSummary[] partitions = null; private byte[] dv = null; private Long dvCardinality = null; @@ -70,6 +75,12 @@ private ManifestInfoStruct(ManifestInfoStruct toCopy) { this.deletedRowsCount = toCopy.deletedRowsCount; this.replacedRowsCount = toCopy.replacedRowsCount; this.minSequenceNumber = toCopy.minSequenceNumber; + this.partitions = + toCopy.partitions != null + ? Stream.of(toCopy.partitions) + .map(PartitionFieldSummary::copy) + .toArray(PartitionFieldSummary[]::new) + : null; this.dv = toCopy.dv != null ? Arrays.copyOf(toCopy.dv, toCopy.dv.length) : null; this.dvCardinality = toCopy.dvCardinality; } @@ -84,6 +95,7 @@ private ManifestInfoStruct( long deletedRowsCount, long replacedRowsCount, long minSequenceNumber, + PartitionFieldSummary[] partitions, byte[] dv, Long dvCardinality) { super(BASE_TYPE, BASE_TYPE); @@ -96,6 +108,7 @@ private ManifestInfoStruct( this.deletedRowsCount = deletedRowsCount; this.replacedRowsCount = replacedRowsCount; this.minSequenceNumber = minSequenceNumber; + this.partitions = partitions; this.dv = dv; this.dvCardinality = dvCardinality; } @@ -145,6 +158,11 @@ public long minSequenceNumber() { return minSequenceNumber; } + @Override + public List partitions() { + return partitions != null ? Arrays.asList(partitions) : null; + } + @Override public ByteBuffer dv() { return dv != null ? ByteBuffer.wrap(dv) : null; @@ -186,8 +204,10 @@ private Object getByPos(int pos) { case 8: return minSequenceNumber; case 9: - return dv(); + return partitions(); case 10: + return dv(); + case 11: return dvCardinality; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -225,9 +245,15 @@ protected void internalSet(int pos, T value) { this.minSequenceNumber = (Long) value; break; case 9: - this.dv = ByteBuffers.toByteArray((ByteBuffer) value); + @SuppressWarnings("unchecked") + List summaries = (List) value; + this.partitions = + summaries != null ? summaries.toArray(new PartitionFieldSummary[0]) : null; break; case 10: + this.dv = ByteBuffers.toByteArray((ByteBuffer) value); + break; + case 11: this.dvCardinality = (Long) value; break; default: @@ -251,6 +277,7 @@ public String toString() { .add("deleted_rows_count", deletedRowsCount) .add("replaced_rows_count", replacedRowsCount) .add("min_sequence_number", minSequenceNumber) + .add("partitions", Arrays.toString(partitions)) .add("dv", dv == null ? "null" : "(binary)") .add("dv_cardinality", dvCardinality == null ? "null" : dvCardinality) .toString(); @@ -266,6 +293,7 @@ static class Builder { private long deletedRowsCount = -1L; private long replacedRowsCount = -1L; private long minSequenceNumber = -1L; + private PartitionFieldSummary[] partitions = null; private byte[] dv = null; private Long dvCardinality = null; @@ -314,6 +342,11 @@ Builder minSequenceNumber(long sequenceNumber) { return this; } + Builder partitions(List summaries) { + this.partitions = summaries != null ? summaries.toArray(new PartitionFieldSummary[0]) : null; + return this; + } + Builder dv(ByteBuffer buffer) { this.dv = buffer != null ? ByteBuffers.toByteArray(buffer) : null; return this; @@ -377,6 +410,7 @@ ManifestInfoStruct build() { deletedRowsCount, replacedRowsCount, minSequenceNumber, + partitions, dv, dvCardinality); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestInfoStruct.java b/core/src/test/java/org/apache/iceberg/TestManifestInfoStruct.java index 3a694f1a38f2..38e4b79db5e2 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestInfoStruct.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestInfoStruct.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iceberg.ManifestFile.PartitionFieldSummary; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -32,6 +35,10 @@ class TestManifestInfoStruct { void testFieldAccess() { ManifestInfoStruct info = new ManifestInfoStruct(ManifestInfo.schema()); + PartitionFieldSummary summary = + new GenericPartitionFieldSummary( + false, false, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})); + info.set(0, 10); info.set(1, 20); info.set(2, 3); @@ -41,8 +48,9 @@ void testFieldAccess() { info.set(6, 300L); info.set(7, 200L); info.set(8, 5L); - info.set(9, ByteBuffer.wrap(new byte[] {0xF})); - info.set(10, 1L); + info.set(9, ImmutableList.of(summary)); + info.set(10, ByteBuffer.wrap(new byte[] {0xF})); + info.set(11, 1L); assertThat(info.addedFilesCount()).isEqualTo(10); assertThat(info.existingFilesCount()).isEqualTo(20); @@ -53,12 +61,16 @@ void testFieldAccess() { assertThat(info.deletedRowsCount()).isEqualTo(300L); assertThat(info.replacedRowsCount()).isEqualTo(200L); assertThat(info.minSequenceNumber()).isEqualTo(5L); + assertThat(info.partitions()).hasSize(1).first().isSameAs(summary); assertThat(info.dv()).isNotNull(); assertThat(info.dvCardinality()).isEqualTo(1L); } @Test void testCopy() { + PartitionFieldSummary summary = + new GenericPartitionFieldSummary( + false, false, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})); ManifestInfoStruct info = ManifestInfoStruct.builder() .addedFilesCount(10) @@ -70,6 +82,7 @@ void testCopy() { .deletedRowsCount(300L) .replacedRowsCount(200L) .minSequenceNumber(5L) + .partitions(ImmutableList.of(summary)) .dv(new byte[] {0xF}) .dvCardinality(1L) .build(); @@ -89,6 +102,15 @@ void testCopy() { // verify deep copy of dv byte array assertThat(copy.dv().array()).isNotSameAs(info.dv().array()); + + // verify deep copy of partitions + List copiedPartitions = copy.partitions(); + assertThat(copiedPartitions).hasSize(1); + assertThat(copiedPartitions.get(0)).isNotSameAs(summary); + assertThat(copiedPartitions.get(0).containsNull()).isEqualTo(summary.containsNull()); + assertThat(copiedPartitions.get(0).containsNaN()).isEqualTo(summary.containsNaN()); + assertThat(copiedPartitions.get(0).lowerBound()).isEqualTo(summary.lowerBound()); + assertThat(copiedPartitions.get(0).upperBound()).isEqualTo(summary.upperBound()); } @Test @@ -106,6 +128,7 @@ void testNullableFields() { .minSequenceNumber(0L) .build(); + assertThat(info.partitions()).isNull(); assertThat(info.dv()).isNull(); assertThat(info.dvCardinality()).isNull(); } @@ -132,6 +155,9 @@ void testProjectedStructLike() { @Test void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException { + PartitionFieldSummary summary = + new GenericPartitionFieldSummary( + true, true, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})); ManifestInfoStruct info = ManifestInfoStruct.builder() .addedFilesCount(10) @@ -143,6 +169,7 @@ void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException .deletedRowsCount(300L) .replacedRowsCount(200L) .minSequenceNumber(5L) + .partitions(ImmutableList.of(summary)) .dv(new byte[] {0xF}) .dvCardinality(1L) .build(); @@ -158,6 +185,12 @@ void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException assertThat(deserialized.deletedRowsCount()).isEqualTo(300L); assertThat(deserialized.replacedRowsCount()).isEqualTo(200L); assertThat(deserialized.minSequenceNumber()).isEqualTo(5L); + assertThat(deserialized.partitions()).hasSize(1); + PartitionFieldSummary partition = deserialized.partitions().get(0); + assertThat(partition.containsNaN()).isTrue(); + assertThat(partition.containsNull()).isTrue(); + assertThat(partition.lowerBound()).isEqualTo(ByteBuffer.wrap(new byte[] {1})); + assertThat(partition.upperBound()).isEqualTo(ByteBuffer.wrap(new byte[] {2})); assertThat(deserialized.dv()).isEqualTo(ByteBuffer.wrap(new byte[] {0xF})); assertThat(deserialized.dvCardinality()).isEqualTo(1L); } @@ -357,6 +390,9 @@ void testBuilderDvPairingValidation() { @Test void testKryoSerializationRoundTrip() throws IOException { + PartitionFieldSummary summary = + new GenericPartitionFieldSummary( + true, true, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})); ManifestInfoStruct info = ManifestInfoStruct.builder() .addedFilesCount(10) @@ -368,6 +404,7 @@ void testKryoSerializationRoundTrip() throws IOException { .deletedRowsCount(300L) .replacedRowsCount(200L) .minSequenceNumber(5L) + .partitions(ImmutableList.of(summary)) .dv(new byte[] {0xF}) .dvCardinality(1L) .build(); @@ -383,6 +420,12 @@ void testKryoSerializationRoundTrip() throws IOException { assertThat(deserialized.deletedRowsCount()).isEqualTo(300L); assertThat(deserialized.replacedRowsCount()).isEqualTo(200L); assertThat(deserialized.minSequenceNumber()).isEqualTo(5L); + assertThat(deserialized.partitions()).hasSize(1); + PartitionFieldSummary partition = deserialized.partitions().get(0); + assertThat(partition.containsNaN()).isTrue(); + assertThat(partition.containsNull()).isTrue(); + assertThat(partition.lowerBound()).isEqualTo(ByteBuffer.wrap(new byte[] {1})); + assertThat(partition.upperBound()).isEqualTo(ByteBuffer.wrap(new byte[] {2})); assertThat(deserialized.dv()).isEqualTo(ByteBuffer.wrap(new byte[] {0xF})); assertThat(deserialized.dvCardinality()).isEqualTo(1L); }