Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ public class BinPackRewriteFilePlanner
*/
public static final String MAX_FILES_TO_REWRITE = "max-files-to-rewrite";

/**
* Controls whether to rewrite files written with a partition spec different from the configured
* output spec.
*
* <p>This can be used to migrate files created before partition spec evolution (for example, when
* the spec evolved from month to month plus day).
*
* <p>Defaults to false.
*/
public static final String REWRITE_PARTITION_SPEC_MISMATCH = "rewrite-partition-spec-mismatch";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this justifies adding another flag. You should be able to use existing flags to achieve this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking @nastra !
I went though the code but couldn't find any flag to achieve this.
We can use filters to filter old data based on some column values for example timestamp for rewriting first time but that won't work if there are so many files and jobs fail half way. When we rerun, it will again pick up the same files even if we have rewritten 50% of files successfully. Currently, we can't filter data files based on the spec ID.


public static final boolean REWRITE_PARTITION_SPEC_MISMATCH_DEFAULT = false;

private static final Logger LOG = LoggerFactory.getLogger(BinPackRewriteFilePlanner.class);

private final Expression filter;
Expand All @@ -101,6 +114,7 @@ public class BinPackRewriteFilePlanner
private double deleteRatioThreshold;
private RewriteJobOrder rewriteJobOrder;
private Integer maxFilesToRewrite;
private boolean rewritePartitionSpecMismatch;

public BinPackRewriteFilePlanner(Table table) {
this(table, Expressions.alwaysTrue());
Expand Down Expand Up @@ -139,6 +153,7 @@ public Set<String> validOptions() {
.add(DELETE_RATIO_THRESHOLD)
.add(RewriteDataFiles.REWRITE_JOB_ORDER)
.add(MAX_FILES_TO_REWRITE)
.add(REWRITE_PARTITION_SPEC_MISMATCH)
.build();
}

Expand All @@ -154,6 +169,7 @@ public void init(Map<String, String> options) {
RewriteDataFiles.REWRITE_JOB_ORDER,
RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
this.maxFilesToRewrite = maxFilesToRewrite(options);
this.rewritePartitionSpecMismatch = rewritePartitionSpecMismatch(options);
}

private int deleteFileThreshold(Map<String, String> options) {
Expand Down Expand Up @@ -185,12 +201,20 @@ private Integer maxFilesToRewrite(Map<String, String> options) {
return value;
}

private boolean rewritePartitionSpecMismatch(Map<String, String> options) {
return PropertyUtil.propertyAsBoolean(
options, REWRITE_PARTITION_SPEC_MISMATCH, REWRITE_PARTITION_SPEC_MISMATCH_DEFAULT);
}

@Override
protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
return Iterables.filter(
tasks,
task ->
outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task));
outsideDesiredFileSizeRange(task)
|| tooManyDeletes(task)
|| tooHighDeleteRatio(task)
|| hasPartitionSpecMismatch(task));
}

@Override
Expand All @@ -202,7 +226,8 @@ protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>>
|| enoughContent(group)
|| tooMuchContent(group)
|| group.stream().anyMatch(this::tooManyDeletes)
|| group.stream().anyMatch(this::tooHighDeleteRatio));
|| group.stream().anyMatch(this::tooHighDeleteRatio)
|| group.stream().anyMatch(this::hasPartitionSpecMismatch));
}

@Override
Expand Down Expand Up @@ -286,6 +311,10 @@ private boolean tooHighDeleteRatio(FileScanTask task) {
return deleteRatio >= deleteRatioThreshold;
}

private boolean hasPartitionSpecMismatch(FileScanTask task) {
return rewritePartitionSpecMismatch && task.file().specId() != outputSpecId();
}

private StructLikeMap<List<List<FileScanTask>>> planFileGroups() {
TableScan scan =
table().newScan().filter(filter).caseSensitive(caseSensitive).ignoreResiduals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.actions;

import static org.apache.iceberg.TestBase.SPEC;
import static org.apache.iceberg.actions.BinPackRewriteFilePlanner.MAX_FILE_SIZE_DEFAULT_RATIO;
import static org.apache.iceberg.actions.RewriteDataFiles.REWRITE_JOB_ORDER;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -80,7 +81,7 @@ class TestBinPackRewriteFilePlanner {

@BeforeEach
public void setupTable() throws Exception {
this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3);
this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, SPEC, 3);
}

@AfterEach
Expand Down Expand Up @@ -292,7 +293,78 @@ void testValidOptions() {
BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD,
BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE));
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE,
BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH));
}

@Test
void testPartitionSpecMismatchFilteringDisabledByDefault() {
addFiles();
table
.updateSpec()
.removeField("data_bucket")
.addField(Expressions.truncate("data", 3))
.commit();
table
.newAppend()
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 10))
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 20))
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 30))
.commit();
table.refresh();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(
ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES,
"0",
BinPackRewriteFilePlanner.MIN_INPUT_FILES,
"1"));

FileRewritePlan<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> plan = planner.plan();

assertThat(plan.totalGroupCount())
.as("all old and new spec files are filtered by default")
.isZero();
}

@Test
void testPartitionSpecMismatchFilteringEnabled() {
// add 6 files with old spec.
addFiles();
table
.updateSpec()
.removeField("data_bucket")
.addField(Expressions.truncate("data", 3))
.commit();
// add 3 new files with new spec.
table
.newAppend()
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 10))
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=foo", 20))
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 30))
.appendFile(newDataFileWithCurrentSpec("data_trunc_3=bar", 20))
.commit();
table.refresh();
BinPackRewriteFilePlanner oldSpecPlanner = new BinPackRewriteFilePlanner(table);
oldSpecPlanner.init(
ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES,
"0",
BinPackRewriteFilePlanner.MIN_INPUT_FILES,
"1",
BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH,
"true"));

FileRewritePlan<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> oldSpecPlan =
oldSpecPlanner.plan();
List<RewriteFileGroup> groups = Lists.newArrayList(oldSpecPlan.groups().iterator());
assertThat(countTotalFilesInAllGroups(groups))
.as("All 6 old partition files must be present")
.isEqualTo(6);
assertThat(groups.get(0).outputSpecId())
.as("Output spec id must be the current spec id of the table")
.isEqualTo(table.spec().specId());
}

@Test
Expand Down Expand Up @@ -594,11 +666,24 @@ private void addFiles() {
}

private static DataFile newDataFile(String partitionPath, long fileSize) {
return DataFiles.builder(TestBase.SPEC)
return DataFiles.builder(SPEC)
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(fileSize)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

private DataFile newDataFileWithCurrentSpec(String partitionPath, long fileSize) {
return DataFiles.builder(table.spec())
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(fileSize)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

private long countTotalFilesInAllGroups(List<RewriteFileGroup> allGroups) {
return allGroups.stream().mapToLong(RewriteGroupBase::inputFileNum).sum();
}
}
1 change: 1 addition & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
| `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. |
| `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. |
| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. |
| `rewrite-partition-spec-mismatch` | false | Rewrite data files whose partition spec differs from the configured output spec. This helps migrate files written under older specs after partition evolution. |

!!! info
Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() {
SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD,
SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE));
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE,
SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() {
SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD,
SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE));
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE,
SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() {
SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD,
SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE));
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE,
SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,49 @@ public void testBinPackRewriterWithSpecificOutputSpec() {
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}

@TestTemplate
public void testBinPackRewritePartitionSpecMismatchOption() {
Table table = createTable(10);
shouldHaveFiles(table, 10);

int oldSpecId = table.spec().specId();
table.updateSpec().addField(Expressions.truncate("c2", 2)).commit();
int currentSpecId = table.spec().specId();

writeRecords(2, SCALE);
table.refresh();

List<Object[]> expectedRecords = currentData();
long oldSpecFileCountBeforeRewrite =
currentDataFiles(table).stream().filter(file -> file.specId() == oldSpecId).count();

assertThat(oldSpecFileCountBeforeRewrite)
.as("Test setup must include files written with the previous partition spec")
.isGreaterThan(0);

RewriteDataFiles.Result skippedByDefault =
basicRewrite(table)
.option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0")
.binPack()
.execute();

assertThat(skippedByDefault.rewrittenDataFilesCount()).isZero();
assertThat(currentDataFiles(table).stream().filter(file -> file.specId() == oldSpecId).count())
.isEqualTo(oldSpecFileCountBeforeRewrite);

RewriteDataFiles.Result rewriteMismatchedSpecFiles =
basicRewrite(table)
.option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0")
.option(BinPackRewriteFilePlanner.REWRITE_PARTITION_SPEC_MISMATCH, "true")
.binPack()
.execute();

assertThat(rewriteMismatchedSpecFiles.rewrittenDataFilesCount())
.isEqualTo(oldSpecFileCountBeforeRewrite);
assertThat(currentDataFiles(table)).allMatch(file -> file.specId() == currentSpecId);
assertThat(currentData()).containsExactlyInAnyOrderElementsOf(expectedRecords);
}

@TestTemplate
public void testBinpackRewriteWithInvalidOutputSpecId() {
Table table = createTable(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() {
SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD,
SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE));
SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE,
SparkShufflingDataRewritePlanner.REWRITE_PARTITION_SPEC_MISMATCH));
}

@Test
Expand Down