Skip to content

feat(datafusion): runtime filter / dynamic predicate pushdown into IcebergTableScan #2376

@jverhoeks

Description

@jverhoeks

Is your feature request related to a problem or challenge?

IcebergTableScan (the DataFusion ExecutionPlan in crates/integrations/datafusion/src/physical_plan/scan.rs) does not participate in DataFusion's runtime / dynamic filter pushdown. Filters generated by HashJoinExec build sides at execution time never reach the Iceberg scan, so lineitem-style probe-side scans cannot prune row groups using build-side join key sets. This leaves a real performance gap on TPC-H joins.

Concrete observation

In TPC-H q14 (lineitem JOIN part ON l_partkey = p_partkey) at SF10, with everything DataFusion offers turned on (enable_dynamic_filter_pushdown = true, parquet.bloom_filter_on_read = true, parquet.pushdown_filters = true, parquet.reorder_filters = true), IcebergTableScan still leaves the full lineitem scan unpruned. The post-scan FilterExec evaluates the dynamic filter row by row, which doesn't skip Parquet row groups.

For comparison, the same query on Trino's Iceberg connector finishes ~5x faster on the same data because Trino's connector receives a DynamicFilter, samples it once at split open, and feeds the result into Parquet row-group statistics + bloom filters before any data is read.

Why the existing API can't carry this

IcebergTableScan::new takes filters: &[Expr] once and converts them via convert_filters_to_predicate. TableScanBuilder::with_filter accepts a single Predicate. Both fire at scan construction time. There is no surface for a predicate that only becomes useful mid-execution (when the join build side completes).

Internally the reader is already capable of applying the predicate at all three pruning levels (see crates/iceberg/src/arrow/reader.rs):

// reader.rs:398-444 — when final_predicate is Some:
//   1. with_row_filter(row_filter)               // post-decode RowFilter
//   2. get_selected_row_group_indices(...)       // row-group min/max skip
//   3. get_row_selection_for_filter_predicate    // page-index row selection

So the missing piece is purely the entry point — there is no way to feed the latest dynamic predicate to process_file_scan_task once the scan is in flight.

Describe the solution you'd like

A Trino-style "sample once per file scan task" hook. Sketch:

/// User-supplied factory consulted right before each FileScanTask
/// opens its Parquet file. Returns None when the dynamic source is
/// not yet useful (e.g. join build side has not completed).
pub trait DynamicPredicate: Send + Sync {
    fn current(&self) -> Option<Predicate>;
}

impl TableScanBuilder {
    pub fn with_dynamic_predicate(self, dp: Arc<dyn DynamicPredicate>) -> Self;
}

In ArrowReader::process_file_scan_task, before the existing final_predicate block, intersect the static predicate with dynamic_predicate.current(). The combined predicate flows into the three pruning paths the reader already implements. No reader-side changes beyond the intersection.

IcebergTableScan (or any DataFusion ExecutionPlan that owns DF runtime filters) wraps Vec<Arc<dyn PhysicalExpr>> as a DynamicPredicate impl that translates each filter to Predicate via the existing expr_to_predicate paths and returns None when no filter is yet selective.

Reference implementations elsewhere

Sample point What it prunes
Trino (IcebergPageSourceProvider#createPageSource) once per split via DynamicFilter.getCurrentPredicate() file-level + row-group + Parquet bloom
Spark (SupportsRuntimeV2Filtering#filter(Predicate[])) once per partition task, can re-plan partition-task pruning
iceberg-rust today n/a static only

The Trino model fits iceberg-rust's reader cleanly because the reader already has the static-predicate plumbing.

Willingness to contribute

Yes — I have a working prototype on a downstream fork that overrides gather_filters_for_pushdown / handle_child_pushdown_result on IcebergTableScan and applies runtime filters per-batch (post-decode). It produces a real win at SF1 (-21% on TPC-H) and SF10 (-9.4%), but it doesn't reach scan-time pruning since iceberg-rust's TableScan API is single-shot. The follow-up is the DynamicPredicate API above; happy to draft a PR if there's interest.

Related issue: #2363 (static Inexact -> Exact pushdown) — addresses a different bug but cleans up the same pushdown surface.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions