Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.types.Types;

/** Summary information about a manifest referenced by a root manifest entry. */
Expand Down Expand Up @@ -53,6 +55,28 @@ interface ManifestInfo {
"min_sequence_number",
Types.LongType.get(),
"Minimum sequence number of files in this manifest");
Types.StructType PARTITION_SUMMARY_TYPE =
Types.StructType.of(
Types.NestedField.required(
509,
"contains_null",
Types.BooleanType.get(),
"True if any file has a null partition value"),
Types.NestedField.optional(
518,
"contains_nan",
Types.BooleanType.get(),
"True if any file has a nan partition value"),
Types.NestedField.optional(
510, "lower_bound", Types.BinaryType.get(), "Partition lower bound for all files"),
Types.NestedField.optional(
511, "upper_bound", Types.BinaryType.get(), "Partition upper bound for all files"));
Types.NestedField PARTITION_SUMMARIES =
Comment on lines +58 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we keep partition tuples for data file entries, I don't think we neccessarily need to keep partition field summaries for manifests as well, we can still use stats at this level in the tree. In fact, I largely think we shouldn't since it adds more complexity to the structure.

Ultimately, at the manifest level we want to preserve pruning but I think we can do that with stats, so long as we ensure that stats on source columns are collected and we have the rules to aggregate them correct. That was independent of the decision to keep the tuple or not. Let me know if I'm missing something @nastra @anoopj @rdblue

Types.NestedField.optional(
507,
"partitions",
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
"Summary for each partition");
Types.NestedField DV =
Types.NestedField.optional(
522, "dv", Types.BinaryType.get(), "Deletion vector for manifest entries");
Expand All @@ -74,6 +98,7 @@ static Types.StructType schema() {
DELETED_ROWS_COUNT,
REPLACED_ROWS_COUNT,
MIN_SEQUENCE_NUMBER,
PARTITION_SUMMARIES,
DV,
DV_CARDINALITY);
}
Expand Down Expand Up @@ -105,6 +130,9 @@ static Types.StructType schema() {
/** Returns the minimum sequence number of files in this manifest. */
long minSequenceNumber();

/** Returns a list of {@link PartitionFieldSummary partition field summaries}, or null. */
List<PartitionFieldSummary> partitions();

/** Returns the deletion vector bitmap, or null if not present. */
ByteBuffer dv();

Expand Down
38 changes: 36 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -40,6 +43,7 @@ class ManifestInfoStruct extends SupportsIndexProjection implements ManifestInfo
ManifestInfo.DELETED_ROWS_COUNT,
ManifestInfo.REPLACED_ROWS_COUNT,
ManifestInfo.MIN_SEQUENCE_NUMBER,
ManifestInfo.PARTITION_SUMMARIES,
ManifestInfo.DV,
ManifestInfo.DV_CARDINALITY);

Expand All @@ -52,6 +56,7 @@ class ManifestInfoStruct extends SupportsIndexProjection implements ManifestInfo
private long deletedRowsCount = -1L;
private long replacedRowsCount = -1L;
private long minSequenceNumber = -1L;
private PartitionFieldSummary[] partitions = null;
private byte[] dv = null;
private Long dvCardinality = null;

Expand All @@ -70,6 +75,12 @@ private ManifestInfoStruct(ManifestInfoStruct toCopy) {
this.deletedRowsCount = toCopy.deletedRowsCount;
this.replacedRowsCount = toCopy.replacedRowsCount;
this.minSequenceNumber = toCopy.minSequenceNumber;
this.partitions =
toCopy.partitions != null
? Stream.of(toCopy.partitions)
.map(PartitionFieldSummary::copy)
.toArray(PartitionFieldSummary[]::new)
: null;
this.dv = toCopy.dv != null ? Arrays.copyOf(toCopy.dv, toCopy.dv.length) : null;
this.dvCardinality = toCopy.dvCardinality;
}
Expand All @@ -84,6 +95,7 @@ private ManifestInfoStruct(
long deletedRowsCount,
long replacedRowsCount,
long minSequenceNumber,
PartitionFieldSummary[] partitions,
byte[] dv,
Long dvCardinality) {
super(BASE_TYPE, BASE_TYPE);
Expand All @@ -96,6 +108,7 @@ private ManifestInfoStruct(
this.deletedRowsCount = deletedRowsCount;
this.replacedRowsCount = replacedRowsCount;
this.minSequenceNumber = minSequenceNumber;
this.partitions = partitions;
this.dv = dv;
this.dvCardinality = dvCardinality;
}
Expand Down Expand Up @@ -145,6 +158,11 @@ public long minSequenceNumber() {
return minSequenceNumber;
}

@Override
public List<PartitionFieldSummary> partitions() {
return partitions != null ? Arrays.asList(partitions) : null;
}

@Override
public ByteBuffer dv() {
return dv != null ? ByteBuffer.wrap(dv) : null;
Expand Down Expand Up @@ -186,8 +204,10 @@ private Object getByPos(int pos) {
case 8:
return minSequenceNumber;
case 9:
return dv();
return partitions();
case 10:
return dv();
case 11:
return dvCardinality;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -225,9 +245,15 @@ protected <T> void internalSet(int pos, T value) {
this.minSequenceNumber = (Long) value;
break;
case 9:
this.dv = ByteBuffers.toByteArray((ByteBuffer) value);
@SuppressWarnings("unchecked")
List<PartitionFieldSummary> summaries = (List<PartitionFieldSummary>) value;
this.partitions =
summaries != null ? summaries.toArray(new PartitionFieldSummary[0]) : null;
break;
case 10:
this.dv = ByteBuffers.toByteArray((ByteBuffer) value);
break;
case 11:
this.dvCardinality = (Long) value;
break;
default:
Expand All @@ -251,6 +277,7 @@ public String toString() {
.add("deleted_rows_count", deletedRowsCount)
.add("replaced_rows_count", replacedRowsCount)
.add("min_sequence_number", minSequenceNumber)
.add("partitions", Arrays.toString(partitions))
.add("dv", dv == null ? "null" : "(binary)")
.add("dv_cardinality", dvCardinality == null ? "null" : dvCardinality)
.toString();
Expand All @@ -266,6 +293,7 @@ static class Builder {
private long deletedRowsCount = -1L;
private long replacedRowsCount = -1L;
private long minSequenceNumber = -1L;
private PartitionFieldSummary[] partitions = null;
private byte[] dv = null;
private Long dvCardinality = null;

Expand Down Expand Up @@ -314,6 +342,11 @@ Builder minSequenceNumber(long sequenceNumber) {
return this;
}

Builder partitions(List<PartitionFieldSummary> summaries) {
this.partitions = summaries != null ? summaries.toArray(new PartitionFieldSummary[0]) : null;
return this;
}

Builder dv(ByteBuffer buffer) {
this.dv = buffer != null ? ByteBuffers.toByteArray(buffer) : null;
return this;
Expand Down Expand Up @@ -377,6 +410,7 @@ ManifestInfoStruct build() {
deletedRowsCount,
replacedRowsCount,
minSequenceNumber,
partitions,
dv,
dvCardinality);
}
Expand Down
47 changes: 45 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestManifestInfoStruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

Expand All @@ -32,6 +35,10 @@ class TestManifestInfoStruct {
void testFieldAccess() {
ManifestInfoStruct info = new ManifestInfoStruct(ManifestInfo.schema());

PartitionFieldSummary summary =
new GenericPartitionFieldSummary(
false, false, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}));

info.set(0, 10);
info.set(1, 20);
info.set(2, 3);
Expand All @@ -41,8 +48,9 @@ void testFieldAccess() {
info.set(6, 300L);
info.set(7, 200L);
info.set(8, 5L);
info.set(9, ByteBuffer.wrap(new byte[] {0xF}));
info.set(10, 1L);
info.set(9, ImmutableList.of(summary));
info.set(10, ByteBuffer.wrap(new byte[] {0xF}));
info.set(11, 1L);

assertThat(info.addedFilesCount()).isEqualTo(10);
assertThat(info.existingFilesCount()).isEqualTo(20);
Expand All @@ -53,12 +61,16 @@ void testFieldAccess() {
assertThat(info.deletedRowsCount()).isEqualTo(300L);
assertThat(info.replacedRowsCount()).isEqualTo(200L);
assertThat(info.minSequenceNumber()).isEqualTo(5L);
assertThat(info.partitions()).hasSize(1).first().isSameAs(summary);
assertThat(info.dv()).isNotNull();
assertThat(info.dvCardinality()).isEqualTo(1L);
}

@Test
void testCopy() {
PartitionFieldSummary summary =
new GenericPartitionFieldSummary(
false, false, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}));
ManifestInfoStruct info =
ManifestInfoStruct.builder()
.addedFilesCount(10)
Expand All @@ -70,6 +82,7 @@ void testCopy() {
.deletedRowsCount(300L)
.replacedRowsCount(200L)
.minSequenceNumber(5L)
.partitions(ImmutableList.of(summary))
.dv(new byte[] {0xF})
.dvCardinality(1L)
.build();
Expand All @@ -89,6 +102,15 @@ void testCopy() {

// verify deep copy of dv byte array
assertThat(copy.dv().array()).isNotSameAs(info.dv().array());

// verify deep copy of partitions
List<PartitionFieldSummary> copiedPartitions = copy.partitions();
assertThat(copiedPartitions).hasSize(1);
assertThat(copiedPartitions.get(0)).isNotSameAs(summary);
assertThat(copiedPartitions.get(0).containsNull()).isEqualTo(summary.containsNull());
assertThat(copiedPartitions.get(0).containsNaN()).isEqualTo(summary.containsNaN());
assertThat(copiedPartitions.get(0).lowerBound()).isEqualTo(summary.lowerBound());
assertThat(copiedPartitions.get(0).upperBound()).isEqualTo(summary.upperBound());
}

@Test
Expand All @@ -106,6 +128,7 @@ void testNullableFields() {
.minSequenceNumber(0L)
.build();

assertThat(info.partitions()).isNull();
assertThat(info.dv()).isNull();
assertThat(info.dvCardinality()).isNull();
}
Expand All @@ -132,6 +155,9 @@ void testProjectedStructLike() {

@Test
void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException {
PartitionFieldSummary summary =
new GenericPartitionFieldSummary(
true, true, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}));
ManifestInfoStruct info =
ManifestInfoStruct.builder()
.addedFilesCount(10)
Expand All @@ -143,6 +169,7 @@ void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException
.deletedRowsCount(300L)
.replacedRowsCount(200L)
.minSequenceNumber(5L)
.partitions(ImmutableList.of(summary))
.dv(new byte[] {0xF})
.dvCardinality(1L)
.build();
Expand All @@ -158,6 +185,12 @@ void testJavaSerializationRoundTrip() throws IOException, ClassNotFoundException
assertThat(deserialized.deletedRowsCount()).isEqualTo(300L);
assertThat(deserialized.replacedRowsCount()).isEqualTo(200L);
assertThat(deserialized.minSequenceNumber()).isEqualTo(5L);
assertThat(deserialized.partitions()).hasSize(1);
PartitionFieldSummary partition = deserialized.partitions().get(0);
assertThat(partition.containsNaN()).isTrue();
assertThat(partition.containsNull()).isTrue();
assertThat(partition.lowerBound()).isEqualTo(ByteBuffer.wrap(new byte[] {1}));
assertThat(partition.upperBound()).isEqualTo(ByteBuffer.wrap(new byte[] {2}));
assertThat(deserialized.dv()).isEqualTo(ByteBuffer.wrap(new byte[] {0xF}));
assertThat(deserialized.dvCardinality()).isEqualTo(1L);
}
Expand Down Expand Up @@ -357,6 +390,9 @@ void testBuilderDvPairingValidation() {

@Test
void testKryoSerializationRoundTrip() throws IOException {
PartitionFieldSummary summary =
new GenericPartitionFieldSummary(
true, true, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}));
ManifestInfoStruct info =
ManifestInfoStruct.builder()
.addedFilesCount(10)
Expand All @@ -368,6 +404,7 @@ void testKryoSerializationRoundTrip() throws IOException {
.deletedRowsCount(300L)
.replacedRowsCount(200L)
.minSequenceNumber(5L)
.partitions(ImmutableList.of(summary))
.dv(new byte[] {0xF})
.dvCardinality(1L)
.build();
Expand All @@ -383,6 +420,12 @@ void testKryoSerializationRoundTrip() throws IOException {
assertThat(deserialized.deletedRowsCount()).isEqualTo(300L);
assertThat(deserialized.replacedRowsCount()).isEqualTo(200L);
assertThat(deserialized.minSequenceNumber()).isEqualTo(5L);
assertThat(deserialized.partitions()).hasSize(1);
PartitionFieldSummary partition = deserialized.partitions().get(0);
assertThat(partition.containsNaN()).isTrue();
assertThat(partition.containsNull()).isTrue();
assertThat(partition.lowerBound()).isEqualTo(ByteBuffer.wrap(new byte[] {1}));
assertThat(partition.upperBound()).isEqualTo(ByteBuffer.wrap(new byte[] {2}));
assertThat(deserialized.dv()).isEqualTo(ByteBuffer.wrap(new byte[] {0xF}));
assertThat(deserialized.dvCardinality()).isEqualTo(1L);
}
Expand Down
Loading