diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java index ee768fcde460..caac0779f09c 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java @@ -91,6 +91,19 @@ public class BinPackRewriteFilePlanner */ public static final String MAX_FILES_TO_REWRITE = "max-files-to-rewrite"; + /** + * Controls whether to rewrite files written with a partition spec different from the configured + * output spec. + * + *

This can be used to migrate files created before partition spec evolution (for example, when + * the spec evolved from month to month plus day). + * + *

Defaults to false. + */ + public static final String REWRITE_PARTITION_SPEC_MISMATCH = "rewrite-partition-spec-mismatch"; + + public static final boolean REWRITE_PARTITION_SPEC_MISMATCH_DEFAULT = false; + private static final Logger LOG = LoggerFactory.getLogger(BinPackRewriteFilePlanner.class); private final Expression filter; @@ -101,6 +114,7 @@ public class BinPackRewriteFilePlanner private double deleteRatioThreshold; private RewriteJobOrder rewriteJobOrder; private Integer maxFilesToRewrite; + private boolean rewritePartitionSpecMismatch; public BinPackRewriteFilePlanner(Table table) { this(table, Expressions.alwaysTrue()); @@ -139,6 +153,7 @@ public Set validOptions() { .add(DELETE_RATIO_THRESHOLD) .add(RewriteDataFiles.REWRITE_JOB_ORDER) .add(MAX_FILES_TO_REWRITE) + .add(REWRITE_PARTITION_SPEC_MISMATCH) .build(); } @@ -154,6 +169,7 @@ public void init(Map options) { RewriteDataFiles.REWRITE_JOB_ORDER, RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT)); this.maxFilesToRewrite = maxFilesToRewrite(options); + this.rewritePartitionSpecMismatch = rewritePartitionSpecMismatch(options); } private int deleteFileThreshold(Map options) { @@ -185,12 +201,20 @@ private Integer maxFilesToRewrite(Map options) { return value; } + private boolean rewritePartitionSpecMismatch(Map options) { + return PropertyUtil.propertyAsBoolean( + options, REWRITE_PARTITION_SPEC_MISMATCH, REWRITE_PARTITION_SPEC_MISMATCH_DEFAULT); + } + @Override protected Iterable filterFiles(Iterable tasks) { return Iterables.filter( tasks, task -> - outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); + outsideDesiredFileSizeRange(task) + || tooManyDeletes(task) + || tooHighDeleteRatio(task) + || hasPartitionSpecMismatch(task)); } @Override @@ -202,7 +226,8 @@ protected Iterable> filterFileGroups(List> || enoughContent(group) || tooMuchContent(group) || group.stream().anyMatch(this::tooManyDeletes) - || group.stream().anyMatch(this::tooHighDeleteRatio)); + || group.stream().anyMatch(this::tooHighDeleteRatio) + || group.stream().anyMatch(this::hasPartitionSpecMismatch)); } @Override @@ -286,6 +311,10 @@ private boolean tooHighDeleteRatio(FileScanTask task) { return deleteRatio >= deleteRatioThreshold; } + private boolean hasPartitionSpecMismatch(FileScanTask task) { + return rewritePartitionSpecMismatch && task.file().specId() != outputSpecId(); + } + private StructLikeMap>> planFileGroups() { TableScan scan = table().newScan().filter(filter).caseSensitive(caseSensitive).ignoreResiduals(); diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java index aa65140c0b89..ed173c8db100 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.actions; +import static org.apache.iceberg.TestBase.SPEC; import static org.apache.iceberg.actions.BinPackRewriteFilePlanner.MAX_FILE_SIZE_DEFAULT_RATIO; import static org.apache.iceberg.actions.RewriteDataFiles.REWRITE_JOB_ORDER; import static org.assertj.core.api.Assertions.assertThat; @@ -80,7 +81,7 @@ class TestBinPackRewriteFilePlanner { @BeforeEach public void setupTable() throws Exception { - this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3); + this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, SPEC, 3); } @AfterEach @@ -292,7 +293,78 @@ void testValidOptions() { BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD, BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE)); + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, + BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH)); + } + + @Test + void testPartitionSpecMismatchFilteringDisabledByDefault() { + addFiles(); + table + .updateSpec() + .removeField("data_bucket") + .addField(Expressions.truncate("data", 3)) + .commit(); + table + .newAppend() + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 10)) + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 20)) + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 30)) + .commit(); + table.refresh(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init( + ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, + "0", + BinPackRewriteFilePlanner.MIN_INPUT_FILES, + "1")); + + FileRewritePlan plan = planner.plan(); + + assertThat(plan.totalGroupCount()) + .as("all old and new spec files are filtered by default") + .isZero(); + } + + @Test + void testPartitionSpecMismatchFilteringEnabled() { + // add 6 files with old spec. + addFiles(); + table + .updateSpec() + .removeField("data_bucket") + .addField(Expressions.truncate("data", 3)) + .commit(); + // add 3 new files with new spec. + table + .newAppend() + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 10)) + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 20)) + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 30)) + .appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 20)) + .commit(); + table.refresh(); + BinPackRewriteFilePlanner oldSpecPlanner = new BinPackRewriteFilePlanner(table); + oldSpecPlanner.init( + ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, + "0", + BinPackRewriteFilePlanner.MIN_INPUT_FILES, + "1", + BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH, + "true")); + + FileRewritePlan oldSpecPlan = + oldSpecPlanner.plan(); + List groups = Lists.newArrayList(oldSpecPlan.groups().iterator()); + assertThat(countTotalFilesInAllGroups(groups)) + .as("All 6 old partition files must be present") + .isEqualTo(6); + assertThat(groups.get(0).outputSpecId()) + .as("Output spec id must be the current spec id of the table") + .isEqualTo(table.spec().specId()); } @Test @@ -594,11 +666,24 @@ private void addFiles() { } private static DataFile newDataFile(String partitionPath, long fileSize) { - return DataFiles.builder(TestBase.SPEC) + return DataFiles.builder(SPEC) + .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(fileSize) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } + + private DataFile newDataFileWithCurrentSpec(String partitionPath, long fileSize) { + return DataFiles.builder(table.spec()) .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") .withFileSizeInBytes(fileSize) .withPartitionPath(partitionPath) .withRecordCount(1) .build(); } + + private long countTotalFilesInAllGroups(List allGroups) { + return allGroups.stream().mapToLong(RewriteGroupBase::inputFileNum).sum(); + } } diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..fd777cb9f2fc 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile | `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. | | `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. | | `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | +| `rewrite-partition-spec-mismatch` | false | Rewrite data files whose partition spec differs from the configured output spec. This helps migrate files written under older specs after partition evolution. | !!! info Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..8c18c9cc1160 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH)); } @Test diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..8c18c9cc1160 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH)); } @Test diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..8c18c9cc1160 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH)); } @Test diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 9524b0e7167d..02eb4ad6e072 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1971,6 +1971,49 @@ public void testBinPackRewriterWithSpecificOutputSpec() { shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); } + @TestTemplate + public void testBinPackRewritePartitionSpecMismatchOption() { + Table table = createTable(10); + shouldHaveFiles(table, 10); + + int oldSpecId = table.spec().specId(); + table.updateSpec().addField(Expressions.truncate("c2", 2)).commit(); + int currentSpecId = table.spec().specId(); + + writeRecords(2, SCALE); + table.refresh(); + + List expectedRecords = currentData(); + long oldSpecFileCountBeforeRewrite = + currentDataFiles(table).stream().filter(file -> file.specId() == oldSpecId).count(); + + assertThat(oldSpecFileCountBeforeRewrite) + .as("Test setup must include files written with the previous partition spec") + .isGreaterThan(0); + + RewriteDataFiles.Result skippedByDefault = + basicRewrite(table) + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .binPack() + .execute(); + + assertThat(skippedByDefault.rewrittenDataFilesCount()).isZero(); + assertThat(currentDataFiles(table).stream().filter(file -> file.specId() == oldSpecId).count()) + .isEqualTo(oldSpecFileCountBeforeRewrite); + + RewriteDataFiles.Result rewriteMismatchedSpecFiles = + basicRewrite(table) + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH, "true") + .binPack() + .execute(); + + assertThat(rewriteMismatchedSpecFiles.rewrittenDataFilesCount()) + .isEqualTo(oldSpecFileCountBeforeRewrite); + assertThat(currentDataFiles(table)).allMatch(file -> file.specId() == currentSpecId); + assertThat(currentData()).containsExactlyInAnyOrderElementsOf(expectedRecords); + } + @TestTemplate public void testBinpackRewriteWithInvalidOutputSpecId() { Table table = createTable(10); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..8c18c9cc1160 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH)); } @Test