diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index e8e4f7e3c8c1..7cbda6cd9d2a 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -206,6 +206,10 @@ val spark = SparkSession.builder() | spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns | | spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective | | spark.sql.iceberg.async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks | +| spark.sql.iceberg.split.target-size | Table default (134217728, 128 MB) | Target split size for scan planning | +| spark.sql.iceberg.split.planning-lookback | Table default (10) | Planning lookback for scan task bin packing | +| spark.sql.iceberg.split.open-file-cost | Table default (4194304, 4 MB) | File open cost used for scan planning | +| spark.sql.iceberg.split.adaptive-size.enabled | Table default (true) | Enables adaptive split sizing for scan planning | ### Read options diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 36c34251c317..dbdea8bde9a1 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -156,39 +156,54 @@ public int orcBatchSize() { } public Long splitSizeOption() { - return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional(); + return confParser + .longConf() + .option(SparkReadOptions.SPLIT_SIZE) + .sessionConf(SparkSQLProperties.SPLIT_SIZE) + .parseOptional(); } public long splitSize() { return confParser .longConf() .option(SparkReadOptions.SPLIT_SIZE) + .sessionConf(SparkSQLProperties.SPLIT_SIZE) .tableProperty(TableProperties.SPLIT_SIZE) .defaultValue(TableProperties.SPLIT_SIZE_DEFAULT) .parse(); } public Integer splitLookbackOption() { - return confParser.intConf().option(SparkReadOptions.LOOKBACK).parseOptional(); + return confParser + .intConf() + .option(SparkReadOptions.LOOKBACK) + .sessionConf(SparkSQLProperties.SPLIT_LOOKBACK) + .parseOptional(); } public int splitLookback() { return confParser .intConf() .option(SparkReadOptions.LOOKBACK) + .sessionConf(SparkSQLProperties.SPLIT_LOOKBACK) .tableProperty(TableProperties.SPLIT_LOOKBACK) .defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT) .parse(); } public Long splitOpenFileCostOption() { - return confParser.longConf().option(SparkReadOptions.FILE_OPEN_COST).parseOptional(); + return confParser + .longConf() + .option(SparkReadOptions.FILE_OPEN_COST) + .sessionConf(SparkSQLProperties.SPLIT_OPEN_FILE_COST) + .parseOptional(); } public long splitOpenFileCost() { return confParser .longConf() .option(SparkReadOptions.FILE_OPEN_COST) + .sessionConf(SparkSQLProperties.SPLIT_OPEN_FILE_COST) .tableProperty(TableProperties.SPLIT_OPEN_FILE_COST) .defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT) .parse(); @@ -279,6 +294,7 @@ public boolean aggregatePushDownEnabled() { public boolean adaptiveSplitSizeEnabled() { return confParser .booleanConf() + .sessionConf(SparkSQLProperties.ADAPTIVE_SPLIT_SIZE_ENABLED) .tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED) .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT) .parse(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 161f09d53e2c..7c1ec03a625f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -114,4 +114,17 @@ private SparkSQLProperties() {} public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED = "spark.sql.iceberg.async-micro-batch-planning-enabled"; public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false; + + // Target split size for scan planning + public static final String SPLIT_SIZE = "spark.sql.iceberg.split.target-size"; + + // Planning lookback for scan task bin packing + public static final String SPLIT_LOOKBACK = "spark.sql.iceberg.split.planning-lookback"; + + // File open cost used for scan planning + public static final String SPLIT_OPEN_FILE_COST = "spark.sql.iceberg.split.open-file-cost"; + + // Controls adaptive split sizing for scan planning + public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = + "spark.sql.iceberg.split.adaptive-size.enabled"; } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java new file mode 100644 index 000000000000..25b7d7267f51 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkReadConf extends TestBaseWithCatalog { + + @BeforeEach + public void before() { + super.before(); + sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName); + } + + @AfterEach + public void after() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testSplitSizePrecedence() { + Table table = validationCatalog.loadTable(tableIdent); + assertThat(new SparkReadConf(spark, table).splitSize()) + .isEqualTo(TableProperties.SPLIT_SIZE_DEFAULT); + + table.updateProperties().set(TableProperties.SPLIT_SIZE, "16777216").commit(); + assertThat(new SparkReadConf(spark, table).splitSize()).isEqualTo(16777216L); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.SPLIT_SIZE, "33554432"), + () -> { + assertThat(new SparkReadConf(spark, table).splitSize()).isEqualTo(33554432L); + + CaseInsensitiveStringMap readOptions = + new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.SPLIT_SIZE, "8388608")); + assertThat(new SparkReadConf(spark, table, readOptions).splitSize()).isEqualTo(8388608L); + }); + } + + @TestTemplate + public void testSplitLookbackPrecedence() { + Table table = validationCatalog.loadTable(tableIdent); + assertThat(new SparkReadConf(spark, table).splitLookback()) + .isEqualTo(TableProperties.SPLIT_LOOKBACK_DEFAULT); + + table.updateProperties().set(TableProperties.SPLIT_LOOKBACK, "5").commit(); + assertThat(new SparkReadConf(spark, table).splitLookback()).isEqualTo(5); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.SPLIT_LOOKBACK, "7"), + () -> { + assertThat(new SparkReadConf(spark, table).splitLookback()).isEqualTo(7); + + CaseInsensitiveStringMap readOptions = + new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.LOOKBACK, "9")); + assertThat(new SparkReadConf(spark, table, readOptions).splitLookback()).isEqualTo(9); + }); + } + + @TestTemplate + public void testSplitOpenFileCostPrecedence() { + Table table = validationCatalog.loadTable(tableIdent); + assertThat(new SparkReadConf(spark, table).splitOpenFileCost()) + .isEqualTo(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + table.updateProperties().set(TableProperties.SPLIT_OPEN_FILE_COST, "1048576").commit(); + assertThat(new SparkReadConf(spark, table).splitOpenFileCost()).isEqualTo(1048576L); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.SPLIT_OPEN_FILE_COST, "2097152"), + () -> { + assertThat(new SparkReadConf(spark, table).splitOpenFileCost()).isEqualTo(2097152L); + + CaseInsensitiveStringMap readOptions = + new CaseInsensitiveStringMap( + ImmutableMap.of(SparkReadOptions.FILE_OPEN_COST, "4194304")); + assertThat(new SparkReadConf(spark, table, readOptions).splitOpenFileCost()) + .isEqualTo(4194304L); + }); + } + + @TestTemplate + public void testAdaptiveSplitSizeEnabledPrecedence() { + Table table = validationCatalog.loadTable(tableIdent); + assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled()) + .isEqualTo(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT); + + table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "false").commit(); + assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled()).isFalse(); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "true"), + () -> assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled()).isTrue()); + } + + @TestTemplate + public void testSplitOptionAccessorsTreatSessionConfAsExplicitOverride() { + Table table = validationCatalog.loadTable(tableIdent); + table + .updateProperties() + .set(TableProperties.SPLIT_SIZE, "16777216") + .set(TableProperties.SPLIT_LOOKBACK, "5") + .set(TableProperties.SPLIT_OPEN_FILE_COST, "1048576") + .commit(); + + SparkReadConf tableOnlyConf = new SparkReadConf(spark, table); + assertThat(tableOnlyConf.splitSizeOption()).isNull(); + assertThat(tableOnlyConf.splitLookbackOption()).isNull(); + assertThat(tableOnlyConf.splitOpenFileCostOption()).isNull(); + + withSQLConf( + ImmutableMap.of( + SparkSQLProperties.SPLIT_SIZE, "33554432", + SparkSQLProperties.SPLIT_LOOKBACK, "7", + SparkSQLProperties.SPLIT_OPEN_FILE_COST, "2097152"), + () -> { + SparkReadConf sessionConf = new SparkReadConf(spark, table); + assertThat(sessionConf.splitSizeOption()).isEqualTo(33554432L); + assertThat(sessionConf.splitLookbackOption()).isEqualTo(7); + assertThat(sessionConf.splitOpenFileCostOption()).isEqualTo(2097152L); + + CaseInsensitiveStringMap readOptions = + new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.SPLIT_SIZE, "8388608")); + assertThat(new SparkReadConf(spark, table, readOptions).splitSizeOption()) + .isEqualTo(8388608L); + }); + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index e67ec5fd62d4..d5e9563bb9b5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.iceberg.types.Types; @@ -229,6 +230,47 @@ public void testSplitOptionsOverridesTableProperties() throws IOException { .isEqualTo(2); } + @TestTemplate + public void testSplitSizeSessionConfigDrivesScanPartitions() throws IOException { + String tableLocation = temp.resolve("iceberg-table").toFile().toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SPLIT_SIZE, String.valueOf(128L * 1024 * 1024)); // 128MB + options.put(TableProperties.DEFAULT_FILE_FORMAT, String.valueOf(FileFormat.AVRO)); + // Disable adaptive sizing so the configured split size is honored exactly. + options.put(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "false"); + Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation); + + List expectedRecords = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf + .select("id", "data") + .repartition(1) + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + List files = + Lists.newArrayList(SnapshotChanges.builderFor(icebergTable).build().addedDataFiles()); + assertThat(files).as("Should have written 1 file").hasSize(1); + + long fileSize = files.get(0).fileSizeInBytes(); + long splitSize = LongMath.divide(fileSize, 2, RoundingMode.CEILING); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.SPLIT_SIZE, String.valueOf(splitSize)), + () -> { + Dataset resultDf = spark.read().format("iceberg").load(tableLocation); + assertThat(resultDf.javaRDD().getNumPartitions()) + .as("Session conf split size should drive Spark partitions") + .isEqualTo(2); + }); + } + @TestTemplate public void testIncrementalScanOptions() throws IOException { String tableLocation = temp.resolve("iceberg-table").toFile().toString();