diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 8fede277dc58..bebfcd79425c 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -30,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +62,39 @@ public static CloseableIterable openManifestFile( Snapshot snapshot, ManifestFile manifestFile, boolean scanAllFiles) { + return openManifestFileWithFilter( + io, specsById, caseSensitive, snapshot, manifestFile, scanAllFiles, Lists.newArrayList()); + } + + public static CloseableIterable openManifestFileWithFilter( + FileIO io, + Map specsById, + boolean caseSensitive, + Snapshot snapshot, + ManifestFile manifestFile, + boolean scanAllFiles, + List pushedFilters) { + + // 1. Get the field IDs used in the partition spec + Expression partitionExpr = Expressions.alwaysTrue(); + Expression dataExpr = Expressions.alwaysTrue(); + + for (Expression filter : pushedFilters) { + if (isPartitionOnly( + filter, specsById.values().iterator().next().partitionType(), caseSensitive)) { + partitionExpr = Expressions.and(partitionExpr, filter); + } else { + dataExpr = Expressions.and(dataExpr, filter); + } + } ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile)) .specsById(specsById) - .caseSensitive(caseSensitive); + .caseSensitive(caseSensitive) + .filterPartitions(partitionExpr) + .filterData(dataExpr); + if (!scanAllFiles) { manifestGroup = manifestGroup @@ -76,6 +108,20 @@ public static CloseableIterable openManifestFile( return manifestGroup.planFiles(); } + // 2. The Helper Method using Iceberg's core Visitor + private static boolean isPartitionOnly( + Expression expr, Types.StructType partitionType, boolean caseSensitive) { + try { + // If this doesn't throw an error, it means the filter + // only uses columns present in the partition schema. + Binder.bind(partitionType, expr, caseSensitive); + return true; + } catch (org.apache.iceberg.exceptions.ValidationException e) { + // Filter references columns NOT in the partition spec + return false; + } + } + /** * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, @@ -209,11 +255,22 @@ public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean Iterables.size( SnapshotChanges.builderFor(snapshot, io, specsById).build().addedDataFiles()), targetSizeInBytes, - scanAllFiles); + scanAllFiles, + Lists.newArrayList()); } public MicroBatch generate( long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { + return generate( + startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles, Lists.newArrayList()); + } + + public MicroBatch generate( + long startFileIndex, + long endFileIndex, + long targetSizeInBytes, + boolean scanAllFiles, + List pushedFilters) { Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument( startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); @@ -225,7 +282,8 @@ public MicroBatch generate( startFileIndex, endFileIndex, targetSizeInBytes, - scanAllFiles); + scanAllFiles, + pushedFilters); } /** @@ -247,7 +305,8 @@ private MicroBatch generateMicroBatch( long startFileIndex, long endFileIndex, long targetSizeInBytes, - boolean scanAllFiles) { + boolean scanAllFiles, + List pushedFilters) { if (indexedManifests.isEmpty()) { return new MicroBatch( snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true); @@ -262,13 +321,14 @@ private MicroBatch generateMicroBatch( currentFileIndex = indexedManifests.get(idx).second(); try (CloseableIterable taskIterable = - openManifestFile( + openManifestFileWithFilter( io, specsById, caseSensitive, snapshot, indexedManifests.get(idx).first(), - scanAllFiles); + scanAllFiles, + pushedFilters); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java index 5b19e1ee2d34..7fa0c1155632 100644 --- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.List; import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; @@ -141,6 +144,53 @@ public void testGenerateMicroBatchWithSmallTargetSize() { assertThat(batch5.lastIndexOfSnapshot()).isTrue(); } + @TestTemplate + public void testGenerateMicroBatchWithFilter() { + List filters = + ImmutableList.of(Expressions.equal("data_bucket", 1), Expressions.equal("id", 1)); + runMicroBatchWithTest(filters, true); + } + + @TestTemplate + public void testGenerateMicroBatchWithFilterWithCaseInsensitive() { + List filters = + ImmutableList.of(Expressions.equal("DATA_bucket", 1), Expressions.equal("Id", 1)); + runMicroBatchWithTest(filters, false); + } + + private void runMicroBatchWithTest(List filters, boolean caseSensitive) { + // 1. Setup data across different partitions + DataFile fileA = + DataFiles.builder(table.spec()) + .withPath("A.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .withPartitionPath("data_bucket=0") + .build(); + DataFile fileB = + DataFiles.builder(table.spec()) + .withPath("B.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .withPartitionPath("data_bucket=1") + .build(); + + table.newAppend().appendFile(fileA).appendFile(fileB).commit(); + + // 3. Generate batch with filters + // Note: You may need to update your MicroBatches.from() builder + // to accept .filter(filters) if you haven't exposed it yet. + MicroBatch batch = + MicroBatches.from(table.snapshot(1L), table.io()) + .specsById(table.specs()) + .caseSensitive(caseSensitive) + .generate(0, 2, Long.MAX_VALUE, true, filters); + + // 4. Verify only File B is present despite the range covering both files + assertThat(batch.sizeInBytes()).isEqualTo(10); + filesMatch(Lists.newArrayList("B"), filesToScan(batch.tasks())); + } + private static DataFile file(String name) { return DataFiles.builder(SPEC) .withPath(name + ".parquet") diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java index 3e442f9917d4..c18f6185e598 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -31,6 +31,7 @@ import org.apache.iceberg.MicroBatches; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -84,8 +85,9 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements SparkReadConf readConf, StreamingOffset initialOffset, StreamingOffset maybeEndOffset, - StreamingOffset lastOffsetForTriggerAvailableNow) { - super(table, readConf); + StreamingOffset lastOffsetForTriggerAvailableNow, + List pushedFilters) { + super(table, readConf, pushedFilters); this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; @@ -403,7 +405,12 @@ private void addMicroBatchToQueue( MicroBatches.from(snapshot, table().io()) .caseSensitive(readConf().caseSensitive()) .specsById(table().specs()) - .generate(startFileIndex, endFileIndex, Long.MAX_VALUE, shouldScanAllFile); + .generate( + startFileIndex, + endFileIndex, + Long.MAX_VALUE, + shouldScanAllFile, + getPushedFilters()); long position = startFileIndex; for (FileScanTask task : microBatch.tasks()) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java index 9298c2bbdfcc..f25f56cf1d72 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -18,10 +18,12 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; import java.util.Locale; import org.apache.iceberg.DataOperations; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; @@ -37,10 +39,12 @@ abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner { private static final Logger LOG = LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class); private final Table table; private final SparkReadConf readConf; + private final List pushedFilters; - BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) { + BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf, List filters) { this.table = table; this.readConf = readConf; + this.pushedFilters = filters; } protected Table table() { @@ -51,6 +55,10 @@ protected SparkReadConf readConf() { return readConf; } + protected List getPushedFilters() { + return pushedFilters; + } + protected boolean shouldProcess(Snapshot snapshot) { String op = snapshot.operation(); switch (op) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 7adf3c633cd0..a222c3baeb24 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -34,6 +34,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Joiner SLASH = Joiner.on("/"); private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); @@ -77,6 +79,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; + private final List filters; private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; @@ -86,6 +89,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA Supplier fileIO, SparkReadConf readConf, Schema projection, + List filters, String checkpointLocation) { this.table = table; this.fileIO = fileIO; @@ -102,11 +106,13 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); + this.filters = filters; InitialOffsetStore initialOffsetStore = new InitialOffsetStore( table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration()); this.initialOffset = initialOffsetStore.initialOffset(); + LOGGER.error("[jalpan] creating micro batch with filter {} ", filters); } @Override @@ -128,6 +134,7 @@ public Offset latestOffset() { @Override public InputPartition[] planInputPartitions(Offset start, Offset end) { + LOGGER.error("[jalpan] planning input partitions micro batch with filter {} ", filters); Preconditions.checkArgument( end instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", end); Preconditions.checkArgument( @@ -209,10 +216,11 @@ private void initializePlanner(StreamingOffset startOffset, StreamingOffset endO if (readConf.asyncMicroBatchPlanningEnabled()) { this.planner = new AsyncSparkMicroBatchPlanner( - table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow); + table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow, filters); } else { this.planner = - new SyncSparkMicroBatchPlanner(table, readConf, lastOffsetForTriggerAvailableNow); + new SyncSparkMicroBatchPlanner( + table, readConf, lastOffsetForTriggerAvailableNow, filters); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 6b80199a255c..9ffc5cfe38fb 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -185,8 +185,9 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + LOG.error("[jalpan] creating micro batch stream"); return new SparkMicroBatchStream( - sparkContext, table, fileIO, readConf, projection, checkpointLocation); + sparkContext, table, fileIO, readConf, projection, filters, checkpointLocation); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java index f1b0029c5432..ca64a8bec3e5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java @@ -27,6 +27,7 @@ import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -45,8 +46,11 @@ class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner { private final StreamingOffset lastOffsetForTriggerAvailableNow; SyncSparkMicroBatchPlanner( - Table table, SparkReadConf readConf, StreamingOffset lastOffsetForTriggerAvailableNow) { - super(table, readConf); + Table table, + SparkReadConf readConf, + StreamingOffset lastOffsetForTriggerAvailableNow, + List pushedFilters) { + super(table, readConf, pushedFilters); this.caseSensitive = readConf().caseSensitive(); this.fromTimestamp = readConf().streamFromTimestamp(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; @@ -102,7 +106,8 @@ public List planFiles(StreamingOffset startOffset, StreamingOffset currentOffset.position(), endFileIndex, Long.MAX_VALUE, - currentOffset.shouldScanAllFiles()); + currentOffset.shouldScanAllFiles(), + getPushedFilters()); fileScanTasks.addAll(latestMicroBatch.tasks()); } while (currentOffset.snapshotId() != endOffset.snapshotId()); @@ -163,13 +168,14 @@ public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit // be rest assured curPos >= startFileIndex curPos = indexedManifests.get(idx).second(); try (CloseableIterable taskIterable = - MicroBatches.openManifestFile( + MicroBatches.openManifestFileWithFilter( table().io(), table().specs(), caseSensitive, curSnapshot, indexedManifests.get(idx).first(), - scanAllFiles); + scanAllFiles, + getPushedFilters()); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 3957872be721..d4fa72136e00 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -502,6 +502,7 @@ public void testTriggerAvailableNowCapsAsyncPreloadAfterPrepare() { SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT, "10"))), table.schema(), + Lists.newArrayList(), temp.resolve("available-now-cap-checkpoint").toString()); try { @@ -1212,6 +1213,7 @@ private SparkMicroBatchStream newMicroBatchStream( table::io, new SparkReadConf(spark, table, new CaseInsensitiveStringMap(allOptions)), table.schema(), + Lists.newArrayList(), temp.resolve(checkpointDirName).toString()); } }