Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,11 +62,39 @@ public static CloseableIterable<FileScanTask> openManifestFile(
Snapshot snapshot,
ManifestFile manifestFile,
boolean scanAllFiles) {
return openManifestFileWithFilter(
io, specsById, caseSensitive, snapshot, manifestFile, scanAllFiles, Lists.newArrayList());
}

public static CloseableIterable<FileScanTask> openManifestFileWithFilter(
FileIO io,
Map<Integer, PartitionSpec> specsById,
boolean caseSensitive,
Snapshot snapshot,
ManifestFile manifestFile,
boolean scanAllFiles,
List<Expression> pushedFilters) {

// 1. Get the field IDs used in the partition spec
Expression partitionExpr = Expressions.alwaysTrue();
Expression dataExpr = Expressions.alwaysTrue();

for (Expression filter : pushedFilters) {
if (isPartitionOnly(
filter, specsById.values().iterator().next().partitionType(), caseSensitive)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tables with partition evolution will have multiple specs. The code here is grabbing an arbitrary partition spec. Take a look at ExpressionUtil.selectsPartitions() for an example.

partitionExpr = Expressions.and(partitionExpr, filter);
} else {
dataExpr = Expressions.and(dataExpr, filter);
}
}

ManifestGroup manifestGroup =
new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
.caseSensitive(caseSensitive)
.filterPartitions(partitionExpr)
.filterData(dataExpr);

if (!scanAllFiles) {
manifestGroup =
manifestGroup
Expand All @@ -76,6 +108,20 @@ public static CloseableIterable<FileScanTask> openManifestFile(
return manifestGroup.planFiles();
}

// 2. The Helper Method using Iceberg's core Visitor
private static boolean isPartitionOnly(
Expression expr, Types.StructType partitionType, boolean caseSensitive) {
try {
// If this doesn't throw an error, it means the filter
// only uses columns present in the partition schema.
Binder.bind(partitionType, expr, caseSensitive);
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good idea to do exception-driven control flow for normal operations. The code base already has ExpressionUtil.selectsPartitions() which handles this.

} catch (org.apache.iceberg.exceptions.ValidationException e) {
// Filter references columns NOT in the partition spec
return false;
}
}

/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
* manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
Expand Down Expand Up @@ -209,11 +255,22 @@ public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean
Iterables.size(
SnapshotChanges.builderFor(snapshot, io, specsById).build().addedDataFiles()),
targetSizeInBytes,
scanAllFiles);
scanAllFiles,
Lists.newArrayList());
}

public MicroBatch generate(
long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
return generate(
startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles, Lists.newArrayList());
}

public MicroBatch generate(
long startFileIndex,
long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles,
List<Expression> pushedFilters) {
Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Expand All @@ -225,7 +282,8 @@ public MicroBatch generate(
startFileIndex,
endFileIndex,
targetSizeInBytes,
scanAllFiles);
scanAllFiles,
pushedFilters);
}

/**
Expand All @@ -247,7 +305,8 @@ private MicroBatch generateMicroBatch(
long startFileIndex,
long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles) {
boolean scanAllFiles,
List<Expression> pushedFilters) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(
snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
Expand All @@ -262,13 +321,14 @@ private MicroBatch generateMicroBatch(
currentFileIndex = indexedManifests.get(idx).second();

try (CloseableIterable<FileScanTask> taskIterable =
openManifestFile(
openManifestFileWithFilter(
io,
specsById,
caseSensitive,
snapshot,
indexedManifests.get(idx).first(),
scanAllFiles);
scanAllFiles,
pushedFilters);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
Expand Down
50 changes: 50 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.Collections;
import java.util.List;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -141,6 +144,53 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
assertThat(batch5.lastIndexOfSnapshot()).isTrue();
}

@TestTemplate
public void testGenerateMicroBatchWithFilter() {
List<Expression> filters =
ImmutableList.of(Expressions.equal("data_bucket", 1), Expressions.equal("id", 1));
runMicroBatchWithTest(filters, true);
}

@TestTemplate
public void testGenerateMicroBatchWithFilterWithCaseInsensitive() {
List<Expression> filters =
ImmutableList.of(Expressions.equal("DATA_bucket", 1), Expressions.equal("Id", 1));
runMicroBatchWithTest(filters, false);
}

private void runMicroBatchWithTest(List<Expression> filters, boolean caseSensitive) {
// 1. Setup data across different partitions
DataFile fileA =
DataFiles.builder(table.spec())
.withPath("A.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.withPartitionPath("data_bucket=0")
.build();
DataFile fileB =
DataFiles.builder(table.spec())
.withPath("B.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.withPartitionPath("data_bucket=1")
.build();

table.newAppend().appendFile(fileA).appendFile(fileB).commit();

// 3. Generate batch with filters
// Note: You may need to update your MicroBatches.from() builder
// to accept .filter(filters) if you haven't exposed it yet.
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.caseSensitive(caseSensitive)
.generate(0, 2, Long.MAX_VALUE, true, filters);

// 4. Verify only File B is present despite the range covering both files
assertThat(batch.sizeInBytes()).isEqualTo(10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch.tasks()));
}

private static DataFile file(String name) {
return DataFiles.builder(SPEC)
.withPath(name + ".parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -84,8 +85,9 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements
SparkReadConf readConf,
StreamingOffset initialOffset,
StreamingOffset maybeEndOffset,
StreamingOffset lastOffsetForTriggerAvailableNow) {
super(table, readConf);
StreamingOffset lastOffsetForTriggerAvailableNow,
List<Expression> pushedFilters) {
super(table, readConf, pushedFilters);
this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
Expand Down Expand Up @@ -403,7 +405,12 @@ private void addMicroBatchToQueue(
MicroBatches.from(snapshot, table().io())
.caseSensitive(readConf().caseSensitive())
.specsById(table().specs())
.generate(startFileIndex, endFileIndex, Long.MAX_VALUE, shouldScanAllFile);
.generate(
startFileIndex,
endFileIndex,
Long.MAX_VALUE,
shouldScanAllFile,
getPushedFilters());

long position = startFileIndex;
for (FileScanTask task : microBatch.tasks()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Locale;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
Expand All @@ -37,10 +39,12 @@ abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner {
private static final Logger LOG = LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class);
private final Table table;
private final SparkReadConf readConf;
private final List<Expression> pushedFilters;

BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) {
BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf, List<Expression> filters) {
this.table = table;
this.readConf = readConf;
this.pushedFilters = filters;
}

protected Table table() {
Expand All @@ -51,6 +55,10 @@ protected SparkReadConf readConf() {
return readConf;
}

protected List<Expression> getPushedFilters() {
return pushedFilters;
}

protected boolean shouldProcess(Snapshot snapshot) {
String op = snapshot.operation();
switch (op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
Expand All @@ -57,6 +58,7 @@
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class already has a logger called LOG a few lines below. This is initializing a duplicate logger.

private static final Joiner SLASH = Joiner.on("/");
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of();
Expand All @@ -77,6 +79,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
private final List<Expression> filters;
private SparkMicroBatchPlanner planner;
private StreamingOffset lastOffsetForTriggerAvailableNow;

Expand All @@ -86,6 +89,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
Supplier<FileIO> fileIO,
SparkReadConf readConf,
Schema projection,
List<Expression> filters,
String checkpointLocation) {
this.table = table;
this.fileIO = fileIO;
Expand All @@ -102,11 +106,13 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors();
this.filters = filters;

InitialOffsetStore initialOffsetStore =
new InitialOffsetStore(
table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration());
this.initialOffset = initialOffsetStore.initialOffset();
LOGGER.error("[jalpan] creating micro batch with filter {} ", filters);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these? Also below on L137

}

@Override
Expand All @@ -128,6 +134,7 @@ public Offset latestOffset() {

@Override
public InputPartition[] planInputPartitions(Offset start, Offset end) {
LOGGER.error("[jalpan] planning input partitions micro batch with filter {} ", filters);
Preconditions.checkArgument(
end instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", end);
Preconditions.checkArgument(
Expand Down Expand Up @@ -209,10 +216,11 @@ private void initializePlanner(StreamingOffset startOffset, StreamingOffset endO
if (readConf.asyncMicroBatchPlanningEnabled()) {
this.planner =
new AsyncSparkMicroBatchPlanner(
table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow);
table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow, filters);
} else {
this.planner =
new SyncSparkMicroBatchPlanner(table, readConf, lastOffsetForTriggerAvailableNow);
new SyncSparkMicroBatchPlanner(
table, readConf, lastOffsetForTriggerAvailableNow, filters);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ public Batch toBatch() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
LOG.error("[jalpan] creating micro batch stream");
return new SparkMicroBatchStream(
sparkContext, table, fileIO, readConf, projection, checkpointLocation);
sparkContext, table, fileIO, readConf, projection, filters, checkpointLocation);
}

@Override
Expand Down
Loading
Loading