Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-keys</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
Expand All @@ -406,6 +412,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud.gcs.analytics</groupId>
<artifactId>gcs-analytics-core</artifactId>
<version>1.2.1</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;

public class IcebergFileWriterFactory
{
private static final Schema POSITION_DELETE_SCHEMA = pathPosSchema();
private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full"));
private static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();

private final TypeManager typeManager;
Expand Down Expand Up @@ -151,8 +149,8 @@ public IcebergFileWriter createPositionDeleteWriter(
Map<String, String> storageProperties)
{
return switch (fileFormat) {
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case PARQUET -> createParquetWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3324,7 +3324,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
switch (task.content()) {
case DATA -> dataTasks.add(task);
case POSITION_DELETES -> deleteTasks.add(task);
case EQUALITY_DELETES -> throw new UnsupportedOperationException("Unsupported task content: " + task.content());
case EQUALITY_DELETES, DATA_MANIFEST, DELETE_MANIFEST -> throw new UnsupportedOperationException("Unsupported task content: " + task.content());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public IcebergPageSink(
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null"));
this.metricsConfig = MetricsConfig.from(requireNonNull(storageProperties, "storageProperties is null"), null, null);
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(partitionColumns, partitionSpec, outputSchema));
this.targetMaxFileSize = IcebergSessionProperties.getTargetMaxFileSize(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private synchronized Iterator<FileScanTaskWithDomain> prepareFileTasksIterator(L
}
yield isUnconstrainedPathAndTimeDomain();
}
case DATA -> throw new IllegalStateException("Unexpected delete file: " + deleteFile);
case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new IllegalStateException("Unexpected delete file: " + deleteFile);
})
.collect(toImmutableList());
scannedFiles.add(new DataFileWithDeleteFiles(wholeFileTask.file(), fullyAppliedDeletes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Optional<RowPredicate> getDeletePredicate(
}
}
case EQUALITY_DELETES -> equalityDeleteFiles.add(deleteFile);
case DATA -> throw new VerifyException("DATA is not delete file type");
case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new VerifyException("DATA is not delete file type");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestListFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -149,6 +150,12 @@ public InputFile newInputFile(DeleteFile file)
return SupportsBulkOperations.super.newInputFile(file);
}

@Override
public InputFile newInputFile(ManifestListFile manifestList)
{
return SupportsBulkOperations.super.newInputFile(manifestList);
}

private void deleteBatch(List<String> filesToDelete)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7981,6 +7981,77 @@ public void testDescribeOutputWithVersionedTable()
}
}

@Test
public void testTimeTravelWithFilterOnRenamedColumn()
{
testTimeTravelWithFilterOnRenamedColumn(false);
testTimeTravelWithFilterOnRenamedColumn(true);
}

private void testTimeTravelWithFilterOnRenamedColumn(boolean partitioned)
{
String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : "";
try (TestTable table = newTrinoTable("time_travel_with_filter_on_rename_", "(x int, y int, part int)" + partition)) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN x TO renamed_x");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2, 3)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1"))
.matches("VALUES (1, 1, 1), (1, 2, 2)");
}
}

@Test
public void testTimeTravelWithFilterOnDroppedColumn()
{
testTimeTravelWithFilterOnDroppedColumn(false);
testTimeTravelWithFilterOnDroppedColumn(true);
}

private void testTimeTravelWithFilterOnDroppedColumn(boolean partitioned)
{
String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : "";
try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, y int, part int)" + partition)) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN x");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1"))
.matches("VALUES (1, 1, 1), (1, 2, 2)");
}
}

@Test
public void testTimeTravelWithFilterOnRenamedPartitionColumn()
{
try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, part1 int, part2 int) WITH (partitioning = ARRAY['part1', 'part2'])")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN part1 TO renamed_part");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 3)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE part1 = 1"))
.matches("VALUES (1, 1, 1), (1, 1, 2)");
}
}

@Test
public void testDeleteRetainsTableHistory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,45 @@ void testAnalyzeNoSnapshot()
catalog.dropTable(SESSION, schemaTableName);
}

@Test // regression test for https://github.com/trinodb/trino/issues/20511
void testRequiredField()
{
testRequiredField(true);
testRequiredField(false);
}

private void testRequiredField(boolean projectionPushdown)
{
Session projectionPushdownEnabled = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "projection_pushdown_enabled", Boolean.toString(projectionPushdown))
.build();

String table = "test_required_field" + randomNameSuffix();
SchemaTableName schemaTableName = new SchemaTableName("tpch", table);

catalog.newCreateTableTransaction(
SESSION,
schemaTableName,
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "struct", Types.StructType.of(
Types.NestedField.required(3, "field", Types.IntegerType.get())))),
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
Optional.ofNullable(catalog.defaultTableLocation(SESSION, schemaTableName)),
ImmutableMap.of())
.commitTransaction();

assertUpdate("INSERT INTO " + table + " VALUES (1, row(10)), (2, NULL)", 2);

assertThat(query(projectionPushdownEnabled, "SELECT id FROM " + table + " WHERE struct.field IS NOT NULL"))
.matches("VALUES 1");
assertThat(query(projectionPushdownEnabled, "SELECT id FROM " + table + " WHERE struct.field IS NULL"))
.matches("VALUES 2");

catalog.dropTable(SESSION, schemaTableName);
}

private void testHighlyNestedFieldPartitioningWithTimestampTransform(String partitioning, String partitionDirectoryRegex, Set<String> expectedPartitionDirectories)
{
String tableName = "test_highly_nested_field_partitioning_with_timestamp_transform_" + randomNameSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -704,7 +705,7 @@ void testV3WriteDefault()

BaseTable tempTable = loadTable(temp);
loadTable(tableName).newFastAppend()
.appendFile(getOnlyElement(tempTable.currentSnapshot().addedDataFiles(tempTable.io())))
.appendFile(getOnlyElement(SnapshotChanges.builderFor(tempTable).build().addedDataFiles()))
.commit();

// The 'value' column is missing from the data file and has no initial-default, so it should return NULL
Expand Down Expand Up @@ -989,7 +990,7 @@ private void assertV3InsertProducesRowLineageMetadata(String fileFormat)
long totalRecords = 0;
Long expectedLastUpdatedSequenceNumber = null;

for (DataFile file : snapshot.addedDataFiles(table.io())) {
for (DataFile file : SnapshotChanges.builderFor(table).build().addedDataFiles()) {
fileCount++;
totalRecords += file.recordCount();

Expand Down Expand Up @@ -1291,7 +1292,7 @@ void testV3RejectsEncryptionKeysInMetadata()
hadoopTableLocation.toString());

icebergTable.newFastAppend()
.appendFile(getOnlyElement(tempTable.currentSnapshot().addedDataFiles(tempTable.io())))
.appendFile(getOnlyElement(SnapshotChanges.builderFor(tempTable).build().addedDataFiles()))
.commit();

// Inject encryption-keys + snapshot key-id into the current metadata.json.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testCreateTableWithTrailingSpaceInLocation()
public void testDropTableWithMissingMetadataFile()
{
assertThatThrownBy(super::testDropTableWithMissingMetadataFile)
.hasMessageMatching(".* Table '.*' does not exist");
.hasMessageMatching("Failed to load table: (.*)");
}

@Test
Expand All @@ -243,7 +243,7 @@ public void testDropTableWithMissingManifestListFile()
public void testDropTableWithNonExistentTableLocation()
{
assertThatThrownBy(super::testDropTableWithNonExistentTableLocation)
.hasMessageMatching(".* Table '.*' does not exist");
.hasMessageMatching("Failed to load table: (.*)");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testDropTableWithMissingSnapshotFile()
assertThatThrownBy(super::testDropTableWithMissingSnapshotFile)
.isInstanceOf(QueryFailedException.class)
.cause()
.hasMessageContaining("Failed to drop table")
.hasMessageMatching("Failed to open input stream for file: .*avro")
.hasNoCause();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testCreateTableWithTrailingSpaceInLocation()
public void testRenameTable()
{
assertThatThrownBy(super::testRenameTable)
.hasStackTraceContaining("Unable to process: RenameTable endpoint is not supported for Glue Catalog");
.hasStackTraceContaining("RenameTable endpoint is not supported for Glue Catalog");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testDropTableWithMissingSnapshotFile()
assertThatThrownBy(super::testDropTableWithMissingSnapshotFile)
.isInstanceOf(QueryFailedException.class)
.cause()
.hasMessageContaining("Failed to drop table")
.hasMessageMatching("Failed to open input stream for file: .*avro")
.hasNoCause();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogAdapter.Route;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wasn't this a checkstyle issue on master?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import is used in the current master branch.

apache/iceberg#14313 extracted Route enum from RESTCatalogAdapter.

import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.util.Pair;

Expand Down
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
<dep.frontend-node.version>v24.12.0</dep.frontend-node.version>
<dep.frontend-npm.version>11.7.0</dep.frontend-npm.version>
<dep.httpcore5.version>5.4.2</dep.httpcore5.version>
<dep.iceberg.version>1.10.1</dep.iceberg.version>
<dep.iceberg.version>1.11.0</dep.iceberg.version>
<dep.jna.version>5.18.1</dep.jna.version>
<dep.jsonwebtoken.version>0.13.0</dep.jsonwebtoken.version>
<dep.jts.version>1.20.0</dep.jts.version>
Expand Down Expand Up @@ -2365,6 +2365,14 @@
</dependencies>
</dependencyManagement>

<repositories>
<repository>
<id>iceberg-release-candidate</id>
<name>Iceberg Release Candidate</name>
<url>https://repository.apache.org/content/repositories/orgapacheiceberg-1278/</url>
</repository>
</repositories>

<build>
<pluginManagement>
<plugins>
Expand Down