From 3bb865ba2d56c28886f0eb32bd5b5a443599d9e4 Mon Sep 17 00:00:00 2001 From: Jalpan Randeri Date: Sun, 3 May 2026 23:31:05 -0700 Subject: [PATCH] Support Filter Pushdown in Spark Structured Streaming Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream. --- .../java/org/apache/iceberg/MicroBatches.java | 72 +++++++++++++++++-- .../apache/iceberg/TestMicroBatchBuilder.java | 50 +++++++++++++ .../source/AsyncSparkMicroBatchPlanner.java | 13 +++- .../source/BaseSparkMicroBatchPlanner.java | 10 ++- .../spark/source/SparkMicroBatchStream.java | 12 +++- .../iceberg/spark/source/SparkScan.java | 3 +- .../source/SyncSparkMicroBatchPlanner.java | 16 +++-- .../source/TestStructuredStreamingRead3.java | 2 + 8 files changed, 160 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 8fede277dc58..bebfcd79425c 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -30,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +62,39 @@ public static CloseableIterable openManifestFile( Snapshot snapshot, ManifestFile manifestFile, boolean scanAllFiles) { + return openManifestFileWithFilter( + io, specsById, caseSensitive, snapshot, manifestFile, scanAllFiles, Lists.newArrayList()); + } + + public static CloseableIterable openManifestFileWithFilter( + FileIO io, + Map specsById, + boolean caseSensitive, + Snapshot snapshot, + ManifestFile manifestFile, + boolean scanAllFiles, + List pushedFilters) { + + // 1. Get the field IDs used in the partition spec + Expression partitionExpr = Expressions.alwaysTrue(); + Expression dataExpr = Expressions.alwaysTrue(); + + for (Expression filter : pushedFilters) { + if (isPartitionOnly( + filter, specsById.values().iterator().next().partitionType(), caseSensitive)) { + partitionExpr = Expressions.and(partitionExpr, filter); + } else { + dataExpr = Expressions.and(dataExpr, filter); + } + } ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile)) .specsById(specsById) - .caseSensitive(caseSensitive); + .caseSensitive(caseSensitive) + .filterPartitions(partitionExpr) + .filterData(dataExpr); + if (!scanAllFiles) { manifestGroup = manifestGroup @@ -76,6 +108,20 @@ public static CloseableIterable openManifestFile( return manifestGroup.planFiles(); } + // 2. The Helper Method using Iceberg's core Visitor + private static boolean isPartitionOnly( + Expression expr, Types.StructType partitionType, boolean caseSensitive) { + try { + // If this doesn't throw an error, it means the filter + // only uses columns present in the partition schema. + Binder.bind(partitionType, expr, caseSensitive); + return true; + } catch (org.apache.iceberg.exceptions.ValidationException e) { + // Filter references columns NOT in the partition spec + return false; + } + } + /** * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, @@ -209,11 +255,22 @@ public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean Iterables.size( SnapshotChanges.builderFor(snapshot, io, specsById).build().addedDataFiles()), targetSizeInBytes, - scanAllFiles); + scanAllFiles, + Lists.newArrayList()); } public MicroBatch generate( long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { + return generate( + startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles, Lists.newArrayList()); + } + + public MicroBatch generate( + long startFileIndex, + long endFileIndex, + long targetSizeInBytes, + boolean scanAllFiles, + List pushedFilters) { Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0"); Preconditions.checkArgument( startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); @@ -225,7 +282,8 @@ public MicroBatch generate( startFileIndex, endFileIndex, targetSizeInBytes, - scanAllFiles); + scanAllFiles, + pushedFilters); } /** @@ -247,7 +305,8 @@ private MicroBatch generateMicroBatch( long startFileIndex, long endFileIndex, long targetSizeInBytes, - boolean scanAllFiles) { + boolean scanAllFiles, + List pushedFilters) { if (indexedManifests.isEmpty()) { return new MicroBatch( snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true); @@ -262,13 +321,14 @@ private MicroBatch generateMicroBatch( currentFileIndex = indexedManifests.get(idx).second(); try (CloseableIterable taskIterable = - openManifestFile( + openManifestFileWithFilter( io, specsById, caseSensitive, snapshot, indexedManifests.get(idx).first(), - scanAllFiles); + scanAllFiles, + pushedFilters); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java index 5b19e1ee2d34..7fa0c1155632 100644 --- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java +++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.List; import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; @@ -141,6 +144,53 @@ public void testGenerateMicroBatchWithSmallTargetSize() { assertThat(batch5.lastIndexOfSnapshot()).isTrue(); } + @TestTemplate + public void testGenerateMicroBatchWithFilter() { + List filters = + ImmutableList.of(Expressions.equal("data_bucket", 1), Expressions.equal("id", 1)); + runMicroBatchWithTest(filters, true); + } + + @TestTemplate + public void testGenerateMicroBatchWithFilterWithCaseInsensitive() { + List filters = + ImmutableList.of(Expressions.equal("DATA_bucket", 1), Expressions.equal("Id", 1)); + runMicroBatchWithTest(filters, false); + } + + private void runMicroBatchWithTest(List filters, boolean caseSensitive) { + // 1. Setup data across different partitions + DataFile fileA = + DataFiles.builder(table.spec()) + .withPath("A.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .withPartitionPath("data_bucket=0") + .build(); + DataFile fileB = + DataFiles.builder(table.spec()) + .withPath("B.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .withPartitionPath("data_bucket=1") + .build(); + + table.newAppend().appendFile(fileA).appendFile(fileB).commit(); + + // 3. Generate batch with filters + // Note: You may need to update your MicroBatches.from() builder + // to accept .filter(filters) if you haven't exposed it yet. + MicroBatch batch = + MicroBatches.from(table.snapshot(1L), table.io()) + .specsById(table.specs()) + .caseSensitive(caseSensitive) + .generate(0, 2, Long.MAX_VALUE, true, filters); + + // 4. Verify only File B is present despite the range covering both files + assertThat(batch.sizeInBytes()).isEqualTo(10); + filesMatch(Lists.newArrayList("B"), filesToScan(batch.tasks())); + } + private static DataFile file(String name) { return DataFiles.builder(SPEC) .withPath(name + ".parquet") diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java index 3e442f9917d4..c18f6185e598 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -31,6 +31,7 @@ import org.apache.iceberg.MicroBatches; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -84,8 +85,9 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements SparkReadConf readConf, StreamingOffset initialOffset, StreamingOffset maybeEndOffset, - StreamingOffset lastOffsetForTriggerAvailableNow) { - super(table, readConf); + StreamingOffset lastOffsetForTriggerAvailableNow, + List pushedFilters) { + super(table, readConf, pushedFilters); this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; @@ -403,7 +405,12 @@ private void addMicroBatchToQueue( MicroBatches.from(snapshot, table().io()) .caseSensitive(readConf().caseSensitive()) .specsById(table().specs()) - .generate(startFileIndex, endFileIndex, Long.MAX_VALUE, shouldScanAllFile); + .generate( + startFileIndex, + endFileIndex, + Long.MAX_VALUE, + shouldScanAllFile, + getPushedFilters()); long position = startFileIndex; for (FileScanTask task : microBatch.tasks()) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java index 9298c2bbdfcc..f25f56cf1d72 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -18,10 +18,12 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; import java.util.Locale; import org.apache.iceberg.DataOperations; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; @@ -37,10 +39,12 @@ abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner { private static final Logger LOG = LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class); private final Table table; private final SparkReadConf readConf; + private final List pushedFilters; - BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) { + BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf, List filters) { this.table = table; this.readConf = readConf; + this.pushedFilters = filters; } protected Table table() { @@ -51,6 +55,10 @@ protected SparkReadConf readConf() { return readConf; } + protected List getPushedFilters() { + return pushedFilters; + } + protected boolean shouldProcess(Snapshot snapshot) { String op = snapshot.operation(); switch (op) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 7adf3c633cd0..a222c3baeb24 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -34,6 +34,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Joiner SLASH = Joiner.on("/"); private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); @@ -77,6 +79,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; + private final List filters; private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; @@ -86,6 +89,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA Supplier fileIO, SparkReadConf readConf, Schema projection, + List filters, String checkpointLocation) { this.table = table; this.fileIO = fileIO; @@ -102,11 +106,13 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); + this.filters = filters; InitialOffsetStore initialOffsetStore = new InitialOffsetStore( table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration()); this.initialOffset = initialOffsetStore.initialOffset(); + LOGGER.error("[jalpan] creating micro batch with filter {} ", filters); } @Override @@ -128,6 +134,7 @@ public Offset latestOffset() { @Override public InputPartition[] planInputPartitions(Offset start, Offset end) { + LOGGER.error("[jalpan] planning input partitions micro batch with filter {} ", filters); Preconditions.checkArgument( end instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", end); Preconditions.checkArgument( @@ -209,10 +216,11 @@ private void initializePlanner(StreamingOffset startOffset, StreamingOffset endO if (readConf.asyncMicroBatchPlanningEnabled()) { this.planner = new AsyncSparkMicroBatchPlanner( - table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow); + table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow, filters); } else { this.planner = - new SyncSparkMicroBatchPlanner(table, readConf, lastOffsetForTriggerAvailableNow); + new SyncSparkMicroBatchPlanner( + table, readConf, lastOffsetForTriggerAvailableNow, filters); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 6b80199a255c..9ffc5cfe38fb 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -185,8 +185,9 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + LOG.error("[jalpan] creating micro batch stream"); return new SparkMicroBatchStream( - sparkContext, table, fileIO, readConf, projection, checkpointLocation); + sparkContext, table, fileIO, readConf, projection, filters, checkpointLocation); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java index f1b0029c5432..ca64a8bec3e5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java @@ -27,6 +27,7 @@ import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -45,8 +46,11 @@ class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner { private final StreamingOffset lastOffsetForTriggerAvailableNow; SyncSparkMicroBatchPlanner( - Table table, SparkReadConf readConf, StreamingOffset lastOffsetForTriggerAvailableNow) { - super(table, readConf); + Table table, + SparkReadConf readConf, + StreamingOffset lastOffsetForTriggerAvailableNow, + List pushedFilters) { + super(table, readConf, pushedFilters); this.caseSensitive = readConf().caseSensitive(); this.fromTimestamp = readConf().streamFromTimestamp(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; @@ -102,7 +106,8 @@ public List planFiles(StreamingOffset startOffset, StreamingOffset currentOffset.position(), endFileIndex, Long.MAX_VALUE, - currentOffset.shouldScanAllFiles()); + currentOffset.shouldScanAllFiles(), + getPushedFilters()); fileScanTasks.addAll(latestMicroBatch.tasks()); } while (currentOffset.snapshotId() != endOffset.snapshotId()); @@ -163,13 +168,14 @@ public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit // be rest assured curPos >= startFileIndex curPos = indexedManifests.get(idx).second(); try (CloseableIterable taskIterable = - MicroBatches.openManifestFile( + MicroBatches.openManifestFileWithFilter( table().io(), table().specs(), caseSensitive, curSnapshot, indexedManifests.get(idx).first(), - scanAllFiles); + scanAllFiles, + getPushedFilters()); CloseableIterator taskIter = taskIterable.iterator()) { while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 3957872be721..d4fa72136e00 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -502,6 +502,7 @@ public void testTriggerAvailableNowCapsAsyncPreloadAfterPrepare() { SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT, "10"))), table.schema(), + Lists.newArrayList(), temp.resolve("available-now-cap-checkpoint").toString()); try { @@ -1212,6 +1213,7 @@ private SparkMicroBatchStream newMicroBatchStream( table::io, new SparkReadConf(spark, table, new CaseInsensitiveStringMap(allOptions)), table.schema(), + Lists.newArrayList(), temp.resolve(checkpointDirName).toString()); } }