Skip to content

Support Filter Pushdown in Spark Structured Streaming#16210

Open
jalpan-randeri wants to merge 2 commits intoapache:mainfrom
jalpan-randeri:jalpan/streaming-filter-pushdown
Open

Support Filter Pushdown in Spark Structured Streaming#16210
jalpan-randeri wants to merge 2 commits intoapache:mainfrom
jalpan-randeri:jalpan/streaming-filter-pushdown

Conversation

@jalpan-randeri
Copy link
Copy Markdown

@jalpan-randeri jalpan-randeri commented May 5, 2026

Overview

This PR introduces the core Iceberg-side support for predicate pushdown within Spark Structured Streaming.

Currently, Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream. By moving filter evaluation from the Spark executor level to the Iceberg metadata level (during partition planning), we significantly reduce:

  1. I/O Overhead: Fewer manifests and data files are scanned.
  2. Driver Memory Pressure: Reduced metadata processing during task planning.
  3. Compute Costs: Lower scheduling overhead for micro-batches on high-cardinality partitioned tables.

Note on Dependencies: This commit provides the necessary interface and implementation in the Iceberg connector. To fully leverage this, a corresponding PR will be raised in the Apache Spark repository to ensure the engine correctly passes these predicates to the V2 streaming source.

Spark changes - apache/spark#55679

Fixes #15692

Testing & Verification

scala> val streamingDF = spark.readStream
     |   .format("iceberg")
     |   .load("local.db.filtered_stream")
     |   .filter("part = 'active'") // This filter is currently NOT pushed down to Iceberg
     |
     | val query = streamingDF.writeStream
     |   .format("console")
     |   .option("checkpointLocation", s"/tmp/iceberg_stream_${System.currentTimeMillis()}")
     |   .start()

val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, part: string]
val query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.runtime.StreamingQueryWrapper@577c2d74

scala> 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch stream
26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+
| id|  part|
+---+------+
|  1|active|
+---+------+

Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans.
This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream.
@jalpan-randeri jalpan-randeri force-pushed the jalpan/streaming-filter-pushdown branch from 99c5b40 to 3bb865b Compare May 5, 2026 02:16
@singhpk234 singhpk234 self-requested a review May 5, 2026 15:34
Copy link
Copy Markdown
Contributor

@anoopj anoopj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jalpan, thanks for this PR. A few comments.

Comment thread core/src/main/java/org/apache/iceberg/MicroBatches.java Outdated
Comment thread core/src/main/java/org/apache/iceberg/MicroBatches.java Outdated
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class already has a logger called LOG a few lines below. This is initializing a duplicate logger.

@github-actions github-actions Bot added the API label May 9, 2026
@jalpan-randeri jalpan-randeri force-pushed the jalpan/streaming-filter-pushdown branch from ff2acdf to acac84a Compare May 9, 2026 22:23
@jalpan-randeri jalpan-randeri requested a review from anoopj May 9, 2026 22:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Filter Pushdown for Spark Structured Streaming Reads

2 participants