Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ val spark = SparkSession.builder()
| spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |
| spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective |
| spark.sql.iceberg.async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks |
| spark.sql.iceberg.split.target-size | Table default (134217728, 128 MB) | Target split size for scan planning |
| spark.sql.iceberg.split.planning-lookback | Table default (10) | Planning lookback for scan task bin packing |
| spark.sql.iceberg.split.open-file-cost | Table default (4194304, 4 MB) | File open cost used for scan planning |
| spark.sql.iceberg.split.adaptive-size.enabled | Table default (true) | Enables adaptive split sizing for scan planning |

### Read options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,39 +156,54 @@ public int orcBatchSize() {
}

public Long splitSizeOption() {
return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional();
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.parseOptional();
}

public long splitSize() {
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.tableProperty(TableProperties.SPLIT_SIZE)
.defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
.parse();
}

public Integer splitLookbackOption() {
return confParser.intConf().option(SparkReadOptions.LOOKBACK).parseOptional();
return confParser
.intConf()
.option(SparkReadOptions.LOOKBACK)
.sessionConf(SparkSQLProperties.SPLIT_LOOKBACK)
.parseOptional();
}

public int splitLookback() {
return confParser
.intConf()
.option(SparkReadOptions.LOOKBACK)
.sessionConf(SparkSQLProperties.SPLIT_LOOKBACK)
.tableProperty(TableProperties.SPLIT_LOOKBACK)
.defaultValue(TableProperties.SPLIT_LOOKBACK_DEFAULT)
.parse();
}

public Long splitOpenFileCostOption() {
return confParser.longConf().option(SparkReadOptions.FILE_OPEN_COST).parseOptional();
return confParser
.longConf()
.option(SparkReadOptions.FILE_OPEN_COST)
.sessionConf(SparkSQLProperties.SPLIT_OPEN_FILE_COST)
.parseOptional();
}

public long splitOpenFileCost() {
return confParser
.longConf()
.option(SparkReadOptions.FILE_OPEN_COST)
.sessionConf(SparkSQLProperties.SPLIT_OPEN_FILE_COST)
.tableProperty(TableProperties.SPLIT_OPEN_FILE_COST)
.defaultValue(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)
.parse();
Expand Down Expand Up @@ -279,6 +294,7 @@ public boolean aggregatePushDownEnabled() {
public boolean adaptiveSplitSizeEnabled() {
return confParser
.booleanConf()
.sessionConf(SparkSQLProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,17 @@ private SparkSQLProperties() {}
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
"spark.sql.iceberg.async-micro-batch-planning-enabled";
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;

// Target split size for scan planning
public static final String SPLIT_SIZE = "spark.sql.iceberg.split.target-size";

// Planning lookback for scan task bin packing
public static final String SPLIT_LOOKBACK = "spark.sql.iceberg.split.planning-lookback";

// File open cost used for scan planning
public static final String SPLIT_OPEN_FILE_COST = "spark.sql.iceberg.split.open-file-cost";

// Controls adaptive split sizing for scan planning
public static final String ADAPTIVE_SPLIT_SIZE_ENABLED =
"spark.sql.iceberg.split.adaptive-size.enabled";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkReadConf extends TestBaseWithCatalog {

@BeforeEach
public void before() {
super.before();
sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName);
}

@AfterEach
public void after() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@TestTemplate
public void testSplitSizePrecedence() {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(new SparkReadConf(spark, table).splitSize())
.isEqualTo(TableProperties.SPLIT_SIZE_DEFAULT);

table.updateProperties().set(TableProperties.SPLIT_SIZE, "16777216").commit();
assertThat(new SparkReadConf(spark, table).splitSize()).isEqualTo(16777216L);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.SPLIT_SIZE, "33554432"),
() -> {
assertThat(new SparkReadConf(spark, table).splitSize()).isEqualTo(33554432L);

CaseInsensitiveStringMap readOptions =
new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.SPLIT_SIZE, "8388608"));
assertThat(new SparkReadConf(spark, table, readOptions).splitSize()).isEqualTo(8388608L);
});
}

@TestTemplate
public void testSplitLookbackPrecedence() {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(new SparkReadConf(spark, table).splitLookback())
.isEqualTo(TableProperties.SPLIT_LOOKBACK_DEFAULT);

table.updateProperties().set(TableProperties.SPLIT_LOOKBACK, "5").commit();
assertThat(new SparkReadConf(spark, table).splitLookback()).isEqualTo(5);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.SPLIT_LOOKBACK, "7"),
() -> {
assertThat(new SparkReadConf(spark, table).splitLookback()).isEqualTo(7);

CaseInsensitiveStringMap readOptions =
new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.LOOKBACK, "9"));
assertThat(new SparkReadConf(spark, table, readOptions).splitLookback()).isEqualTo(9);
});
}

@TestTemplate
public void testSplitOpenFileCostPrecedence() {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(new SparkReadConf(spark, table).splitOpenFileCost())
.isEqualTo(TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);

table.updateProperties().set(TableProperties.SPLIT_OPEN_FILE_COST, "1048576").commit();
assertThat(new SparkReadConf(spark, table).splitOpenFileCost()).isEqualTo(1048576L);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.SPLIT_OPEN_FILE_COST, "2097152"),
() -> {
assertThat(new SparkReadConf(spark, table).splitOpenFileCost()).isEqualTo(2097152L);

CaseInsensitiveStringMap readOptions =
new CaseInsensitiveStringMap(
ImmutableMap.of(SparkReadOptions.FILE_OPEN_COST, "4194304"));
assertThat(new SparkReadConf(spark, table, readOptions).splitOpenFileCost())
.isEqualTo(4194304L);
});
}

@TestTemplate
public void testAdaptiveSplitSizeEnabledPrecedence() {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled())
.isEqualTo(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT);

table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "false").commit();
assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled()).isFalse();

withSQLConf(
ImmutableMap.of(SparkSQLProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "true"),
() -> assertThat(new SparkReadConf(spark, table).adaptiveSplitSizeEnabled()).isTrue());
}

@TestTemplate
public void testSplitOptionAccessorsTreatSessionConfAsExplicitOverride() {
Table table = validationCatalog.loadTable(tableIdent);
table
.updateProperties()
.set(TableProperties.SPLIT_SIZE, "16777216")
.set(TableProperties.SPLIT_LOOKBACK, "5")
.set(TableProperties.SPLIT_OPEN_FILE_COST, "1048576")
.commit();

SparkReadConf tableOnlyConf = new SparkReadConf(spark, table);
assertThat(tableOnlyConf.splitSizeOption()).isNull();
assertThat(tableOnlyConf.splitLookbackOption()).isNull();
assertThat(tableOnlyConf.splitOpenFileCostOption()).isNull();

withSQLConf(
ImmutableMap.of(
SparkSQLProperties.SPLIT_SIZE, "33554432",
SparkSQLProperties.SPLIT_LOOKBACK, "7",
SparkSQLProperties.SPLIT_OPEN_FILE_COST, "2097152"),
() -> {
SparkReadConf sessionConf = new SparkReadConf(spark, table);
assertThat(sessionConf.splitSizeOption()).isEqualTo(33554432L);
assertThat(sessionConf.splitLookbackOption()).isEqualTo(7);
assertThat(sessionConf.splitOpenFileCostOption()).isEqualTo(2097152L);

CaseInsensitiveStringMap readOptions =
new CaseInsensitiveStringMap(ImmutableMap.of(SparkReadOptions.SPLIT_SIZE, "8388608"));
assertThat(new SparkReadConf(spark, table, readOptions).splitSizeOption())
.isEqualTo(8388608L);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -229,6 +230,47 @@ public void testSplitOptionsOverridesTableProperties() throws IOException {
.isEqualTo(2);
}

@TestTemplate
public void testSplitSizeSessionConfigDrivesScanPartitions() throws IOException {
String tableLocation = temp.resolve("iceberg-table").toFile().toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SPLIT_SIZE, String.valueOf(128L * 1024 * 1024)); // 128MB
options.put(TableProperties.DEFAULT_FILE_FORMAT, String.valueOf(FileFormat.AVRO));
// Disable adaptive sizing so the configured split size is honored exactly.
options.put(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED, "false");
Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);

List<SimpleRecord> expectedRecords =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
originalDf
.select("id", "data")
.repartition(1)
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

List<DataFile> files =
Lists.newArrayList(SnapshotChanges.builderFor(icebergTable).build().addedDataFiles());
assertThat(files).as("Should have written 1 file").hasSize(1);

long fileSize = files.get(0).fileSizeInBytes();
long splitSize = LongMath.divide(fileSize, 2, RoundingMode.CEILING);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.SPLIT_SIZE, String.valueOf(splitSize)),
() -> {
Dataset<Row> resultDf = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDf.javaRDD().getNumPartitions())
.as("Session conf split size should drive Spark partitions")
.isEqualTo(2);
});
}

@TestTemplate
public void testIncrementalScanOptions() throws IOException {
String tableLocation = temp.resolve("iceberg-table").toFile().toString();
Expand Down
Loading