From 48bc8c18fee3c05ec56b7d0f446dd090c95ae1a5 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Wed, 20 May 2026 21:43:51 +0200 Subject: [PATCH 01/13] compute: extend temporal bucketing to Reduce's input arrangement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fec2af8 introduced temporal bucketing on `PlanNode::ArrangeBy`: the lowering picks an `ArrangementStrategy` from a `has_future_updates` analysis bit, and `ensure_collections` inserts a bucketing operator in front of the arrangement when the strategy is `TemporalBucketing`. Of the other bucketing-eligible operators, only `Threshold` inherits this coverage transitively, because its lowering wraps the input in a synthetic `ArrangeBy` whose target is `new_arranged([required])` (so `ensure_collections` actually inserts a bucket op there). `TopK` and `Negate` also wrap their inputs, but the wrap targets `new_raw()` (i.e., no actual arrangement), so `ensure_collections` sees `will_create_arrangement = false` and the wrap's strategy is decorative; neither operator is bucketed today. `Reduce` is left out of both hooks: its LIR node has no strategy field, and its render path (`render_reduce` via `KeyValPlan`) bypasses `ensure_collections`. As a result, a `Temporal Filter -> GroupAggregate` pattern — where no `ArrangeBy` sits between the temporal MFP and the reduce — is never bucketed today. Add a `temporal_bucketing_strategy: ArrangementStrategy` field to the `Reduce` LIR node and set it during lowering from the input's `has_future_updates` flag. At render time, `render_reduce` applies `apply_bucketing_strategy` to the `(key, val)` stream right before `render_reduce_plan` arranges it internally. The lowering also clears `LoweredExpr::has_future_updates` on bucketing absorption, so a stack of bucketing-eligible operators only buckets at the lowest one; a trailing temporal MFP fused above naturally re-arms the flag. The render-side `bucketed: bool` safety net in `ensure_collections` is still needed to prevent double-bucketing within a single call (the raw-collection site vs. the `collections.arranged` loop, or across multiple keys in that loop). TODO: Union with consolidation, TopK. See next commit. --- src/compute-types/src/explain/text.rs | 9 ++ src/compute-types/src/plan.rs | 24 ++++ src/compute-types/src/plan/interpret/api.rs | 2 + src/compute-types/src/plan/lowering.rs | 39 +++++- src/compute-types/src/plan/render_plan.rs | 6 + src/compute/src/render.rs | 51 +++++-- src/compute/src/render/context.rs | 98 ++++++++++---- src/compute/src/render/reduce.rs | 33 +++-- .../explain/physical_plan_as_json.slt | 78 +++++++---- test/sqllogictest/temporal_bucketing.slt | 127 ++++++++++++++++++ 10 files changed, 393 insertions(+), 74 deletions(-) create mode 100644 test/sqllogictest/temporal_bucketing.slt diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 04d099cc702fb..0a6aab33d265a 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -290,6 +290,7 @@ impl Plan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy: _, } => { ctx.indent.set(); if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() { @@ -739,6 +740,7 @@ impl Plan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { use crate::plan::reduce::ReducePlan; match plan { @@ -764,6 +766,13 @@ impl Plan { let key = CompactScalars(key); writeln!(f, "{}input_key={}", ctx.indent, key)?; } + if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) { + writeln!( + f, + "{}temporal_bucketing_strategy={}", + ctx.indent, temporal_bucketing_strategy + )?; + } if key_val_plan.key_plan.deref().is_identity() { writeln!(f, "{}key_plan=id", ctx.indent)?; } else { diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index edabc5f1c947b..35168618f054f 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -130,6 +130,15 @@ pub enum ArrangementStrategy { TemporalBucketing, } +impl std::fmt::Display for ArrangementStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ArrangementStrategy::Direct => write!(f, "Direct"), + ArrangementStrategy::TemporalBucketing => write!(f, "TemporalBucketing"), + } + } +} + /// An identifier for an LIR node. #[derive( Clone, @@ -308,6 +317,20 @@ pub enum PlanNode { /// predicates so that it can be readily evaluated. /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]? mfp_after: MapFilterProject, + /// Strategy for forming the internal input arrangement built by `Reduce` + /// (materialized via `key_val_plan`). + /// + /// Set by the lowering from the input's `has_future_updates` flag. The + /// renderer applies it to the keyed `(key, val)` stream feeding the + /// reduce. See `render_reduce` for the rationale on why this is + /// plumbed through `Reduce` rather than handled at the arrangement site. + /// + /// Note: unrelated to the hash buckets used by hierarchical reductions + /// (e.g. `ReducePlan::Hierarchical`'s `buckets`), which are an internal + /// sharding scheme for `min`/`max`-style aggregations. Here "bucketing" + /// refers exclusively to temporal (time-domain) bucketing of + /// future-stamped updates. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -805,6 +828,7 @@ impl CollectionPlan for PlanNode { key_val_plan: _, plan: _, mfp_after: _, + temporal_bucketing_strategy: _, } | PlanNode::TopK { input, diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 60b5736f01c35..1898e542df0af 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -393,6 +393,7 @@ where key_val_plan, plan, mfp_after, + temporal_bucketing_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -676,6 +677,7 @@ where key_val_plan, plan, mfp_after, + temporal_bucketing_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 0545222982906..1662fa3f91407 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -31,6 +31,11 @@ use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Pla /// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped /// updates. Future updates are the only case where temporal bucketing pays off. +/// +/// Convention: every caller that returns `TemporalBucketing` must also clear +/// `LoweredExpr::has_future_updates` on the resulting `LoweredExpr`, so that a stack of +/// bucketing-eligible operators only buckets at the lowest one. A trailing temporal MFP +/// fused on top naturally re-arms the flag. fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy { if has_future_updates { ArrangementStrategy::TemporalBucketing @@ -294,6 +299,12 @@ impl Context { // Even with a non-temporal MFP, we must propagate `has_future_updates` // from the underlying binding — applying an MFP doesn't drop future- // timestamped updates that already exist on the input. + // + // TODO(temporal-bucketing): `has_future_updates` is computed per + // dataflow; we don't currently propagate it across `Id::Global` + // boundaries (e.g., from an MV's dataflow to its consumer's), so a + // downstream-only `Get`-then-`ArrangeBy` won't bucket unless the + // consumer has its own local temporal MFP. let has_future_updates = self.has_future_updates.contains(id) || match &plan { GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => { @@ -1031,17 +1042,24 @@ This is not expected to cause incorrect results, but could indicate a performanc // Return the plan and extended keys. let lir_id = self.allocate_lir_id(); + let strategy = strategy_from_future(future); + // Bucketing absorption: see `strategy_from_future`. If we bucket, clear + // the future-updates flag so the immediate parent is lowered as `Direct`. + let has_future_updates = match strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => future, + }; LoweredExpr { plan: PlanNode::ArrangeBy { input_key, input: Box::new(input), input_mfp, forms, - strategy: strategy_from_future(future), + strategy, } .as_plan(lir_id), keys: input_keys, - has_future_updates: future, + has_future_updates, } } } @@ -1203,9 +1221,21 @@ This is not expected to cause incorrect results, but could indicate a performanc ); let output_keys = reduce_plan.keys(group_key.len(), output_arity); let lir_id = self.allocate_lir_id(); + // `Reduce` builds its own input arrangement inside `render_reduce` (via `KeyValPlan`), + // bypassing `ensure_collections`. So we can't piggy-back on an upstream `ArrangeBy`'s + // strategy to request temporal bucketing on a temporal-MFP-fed input: there is no such + // `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and + // `render_reduce` applies bucketing to the keyed `(key, val)` stream itself. + let temporal_bucketing_strategy = strategy_from_future(input_future); // `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual // MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce - // future updates. The output's future flag is just whatever the input had. + // future updates. + // + // Bucketing absorption: see `strategy_from_future`. + let has_future_updates = match temporal_bucketing_strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => input_future, + }; Ok(LoweredExpr { plan: PlanNode::Reduce { input_key, @@ -1213,10 +1243,11 @@ This is not expected to cause incorrect results, but could indicate a performanc key_val_plan, plan: reduce_plan, mfp_after, + temporal_bucketing_strategy, } .as_plan(lir_id), keys: output_keys, - has_future_updates: input_future, + has_future_updates, }) } diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 09cc55da15cfa..2dc21dc133d2b 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -215,6 +215,9 @@ pub enum Expr { /// the key for the reduction; otherwise, the results become undefined. Additionally, the /// MFP must be free from temporal predicates so that it can be readily evaluated. mfp_after: MapFilterProject, + /// How the renderer should form the internal input arrangement built by `Reduce`. + /// Mirrors [`PlanNode::Reduce::temporal_bucketing_strategy`]. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -438,6 +441,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { let expr = Reduce { input_key, @@ -445,6 +449,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -911,6 +916,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { key_val_plan: _key_val_plan, plan, mfp_after: _mfp_after, + temporal_bucketing_strategy: _, } => match plan { ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"), ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"), diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..8041d3b513dee 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1264,10 +1264,18 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { let input = expect_input(input); let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after); - self.render_reduce(input_key, input, key_val_plan, plan, mfp_option) + self.render_reduce( + input_key, + input, + key_val_plan, + plan, + mfp_option, + temporal_bucketing_strategy, + ) } TopK { input, top_k_plan } => { let input = expect_input(input); @@ -1481,11 +1489,18 @@ pub trait RenderTimestamp: MzTimestamp + Refines { /// Total-ordered timestamps perform real bucketing; partially-ordered timestamps /// (e.g. `Product<…>` in iterative scopes) implement this as a no-op. pub trait MaybeBucketByTime: Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff>; + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable; } impl RenderTimestamp for mz_repr::Timestamp { @@ -1510,11 +1525,19 @@ impl RenderTimestamp for mz_repr::Timestamp { } impl MaybeBucketByTime for mz_repr::Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { stream .bucket::>(as_of, summary) .as_collection() @@ -1551,11 +1574,19 @@ impl RenderTimestamp for Product> { } impl MaybeBucketByTime for Product> { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, _as_of: Antichain, _summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { // TODO: Implement bucketing on outer timestamp for iterative scopes. stream.as_collection() } diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 5ba42239c74b6..828adf08ff012 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -1004,7 +1004,21 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { ); } - // Track whether we applied temporal bucketing, to avoid double-bucketing. + // Track whether we already applied temporal bucketing in this call, to + // avoid bucketing the same updates twice. + // + // Stacked bucketing across operators is prevented at the LIR level: in + // `strategy_from_future` (see `src/compute-types/src/plan/lowering.rs`), + // a bucketing consumer clears `LoweredExpr::has_future_updates`, so its + // parent is lowered with `ArrangementStrategy::Direct`. + // + // This flag is a belt-and-suspenders check for the case where a single + // `ensure_collections` call would otherwise fire bucketing twice on the + // same collection: once when forming the raw collection (via + // `as_collection_core` below) and again when building an arrangement in + // `collections.arranged`, or across two arrangements in that loop. The + // same `strategy` argument applies to all sites; the flag downgrades + // every application after the first to `Direct`. let mut bucketed = false; // True iff at least one new arrangement will actually be built below. Bucketing only @@ -1023,19 +1037,17 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // Apply temporal bucketing when the lowering selected `TemporalBucketing` and // we will build at least one arrangement. This path fires when the collection // must be formed from scratch (e.g., from an arrangement via as_collection_core). - let oks = if will_create_arrangement - && matches!(strategy, ArrangementStrategy::TemporalBucketing) - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) - { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(config_set) - .try_into() - .expect("must fit"); - bucketed = true; - T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary) - } else { - oks - }; + let (oks, applied) = apply_bucketing_strategy( + oks, + if will_create_arrangement { + strategy + } else { + ArrangementStrategy::Direct + }, + as_of.clone(), + config_set, + ); + bucketed |= applied; self.collection = Some((oks, errs)); } for (key, _, thinning) in collections.arranged { @@ -1051,19 +1063,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // the bundle (e.g., from an upstream temporal Mfp or Get) and we // haven't bucketed yet. This is the common path for temporal-MFP // → ArrangeBy flows. - let oks = if !bucketed - && matches!(strategy, ArrangementStrategy::TemporalBucketing) - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) - { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(config_set) - .try_into() - .expect("must fit"); - bucketed = true; - T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary) + let effective_strategy = if bucketed { + ArrangementStrategy::Direct } else { - oks + strategy }; + let (oks, applied) = + apply_bucketing_strategy(oks, effective_strategy, as_of.clone(), config_set); + bucketed |= applied; let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); @@ -1353,3 +1360,44 @@ fn walk_cursor( } *fuel -= work; } + +/// Apply temporal bucketing to a per-row stream when the requested `strategy` selects it and the +/// `enable_compute_temporal_bucketing` dyncfg is on; otherwise return `oks` unchanged. +/// +/// Returns `(stream, applied)` where `applied` is true iff bucketing actually fired. Callers +/// use this flag to avoid double-bucketing the same stream within a single `ensure_collections` +/// invocation. +/// +/// Generic over the row data type `D` so the helper can serve both `Row` streams (the +/// `ArrangeBy` rendering in `ensure_collections`) and `(Row, Row)` streams (the internal +/// keyed input arrangement built by `render_reduce`). +pub(crate) fn apply_bucketing_strategy<'scope, T, D>( + oks: VecCollection<'scope, T, D, Diff>, + strategy: ArrangementStrategy, + as_of: Antichain, + config_set: &ConfigSet, +) -> (VecCollection<'scope, T, D, Diff>, bool) +where + T: RenderTimestamp + MaybeBucketByTime, + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, +{ + if matches!(strategy, ArrangementStrategy::TemporalBucketing) + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) + { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(config_set) + .try_into() + .expect("must fit"); + ( + T::maybe_apply_temporal_bucketing(oks.inner, as_of, summary), + true, + ) + } else { + (oks, false) + } +} diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index e165bd567c5e2..013ef6594b2ea 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,6 +27,7 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, ReducePlan, ReductionType, SingleBasicPlan, reduction_type, @@ -68,7 +69,11 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { key_val_plan: KeyValPlan, reduce_plan: ReducePlan, mfp_after: Option, - ) -> CollectionBundle<'scope, T> { + temporal_bucketing_strategy: ArrangementStrategy, + ) -> CollectionBundle<'scope, T> + where + T: crate::render::MaybeBucketByTime, + { // Convert `mfp_after` to an actionable plan. let mfp_after = mfp_after.map(|m| { m.into_plan() @@ -156,15 +161,25 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { }, ); - // Render the reduce plan - self.render_reduce_plan( - reduce_plan, + // Bucket the keyed `(key, val)` stream when lowering chose `TemporalBucketing`. + // `Reduce` builds its own arrangement via `KeyValPlan`, bypassing + // `ensure_collections`, so the strategy is plumbed through `PlanNode::Reduce` + // rather than inferred at the arrangement site. `apply_bucketing_strategy` is a + // no-op for `Direct`. + // + // Unlike `ensure_collections`, there's only one bucketing call site here, so we + // don't need to track an `already_bucketed` flag. If a second site is ever added + // in this function, it must consult `_bucketed`. + let (key_val_collection, _bucketed) = crate::render::context::apply_bucketing_strategy( key_val_input.as_collection(), - err, - key_arity, - mfp_after, - ) - .leave_region(self.scope) + temporal_bucketing_strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); + + // Render the reduce plan + self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after) + .leave_region(self.scope) }) } diff --git a/test/sqllogictest/explain/physical_plan_as_json.slt b/test/sqllogictest/explain/physical_plan_as_json.slt index c068f82066a64..9490192b838d9 100644 --- a/test/sqllogictest/explain/physical_plan_as_json.slt +++ b/test/sqllogictest/explain/physical_plan_as_json.slt @@ -1813,7 +1813,8 @@ SELECT DISTINCT a, b FROM t 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -1972,7 +1973,8 @@ GROUP BY a 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -2144,7 +2146,8 @@ FROM t 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -2436,7 +2439,8 @@ SELECT * FROM hierarchical_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -2568,7 +2572,8 @@ MATERIALIZED VIEW hierarchical_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -2875,7 +2880,8 @@ SELECT * FROM hierarchical_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -3342,7 +3348,8 @@ GROUP BY a 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -3601,7 +3608,8 @@ GROUP BY a 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -3976,7 +3984,8 @@ FROM t 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4205,7 +4214,8 @@ FROM t 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -4499,7 +4509,8 @@ MATERIALIZED VIEW collated_group_by_mv 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4638,7 +4649,8 @@ MATERIALIZED VIEW collated_group_by_mv 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4897,7 +4909,8 @@ MATERIALIZED VIEW collated_group_by_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -5156,7 +5169,8 @@ MATERIALIZED VIEW collated_group_by_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -5912,7 +5926,8 @@ SELECT * FROM collated_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6051,7 +6066,8 @@ SELECT * FROM collated_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6310,7 +6326,8 @@ SELECT * FROM collated_group_by 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6569,7 +6586,8 @@ SELECT * FROM collated_group_by 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -7368,7 +7386,8 @@ MATERIALIZED VIEW collated_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7477,7 +7496,8 @@ MATERIALIZED VIEW collated_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7706,7 +7726,8 @@ MATERIALIZED VIEW collated_global_mv 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7935,7 +7956,8 @@ MATERIALIZED VIEW collated_global_mv 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -8716,7 +8738,8 @@ SELECT * FROM collated_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -8825,7 +8848,8 @@ SELECT * FROM collated_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -9054,7 +9078,8 @@ SELECT * FROM collated_global 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -9283,7 +9308,8 @@ SELECT * FROM collated_global 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } diff --git a/test/sqllogictest/temporal_bucketing.slt b/test/sqllogictest/temporal_bucketing.slt new file mode 100644 index 0000000000000..0e72aec250c34 --- /dev/null +++ b/test/sqllogictest/temporal_bucketing.slt @@ -0,0 +1,127 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +# ----------------------------------------------------------------------------- +# Tests for LIR-level temporal bucketing applied to the *input arrangement* +# of `Reduce`. See `agent-notes/temporal-bucketing-reduce/plan.md`. +# +# Without the `input_bucketing_strategy` field, a `Temporal Filter -> +# GroupAggregate` pattern would have no LIR node to attach bucketing to, +# because `Reduce` builds its internal arrangement inside `render_reduce` +# via `KeyValPlan`, bypassing `ensure_collections`. +# ----------------------------------------------------------------------------- + +statement ok +CREATE TABLE events (k INT NOT NULL, event_time TIMESTAMP NOT NULL); + +# Firing: temporal filter directly above a GROUP BY in a materialized view. +# Lowering should set `input_bucketing_strategy=TemporalBucketing` on the Reduce. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_firing AS +SELECT k, count(*) +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +GROUP BY k; +---- +materialize.public.mv_firing: + Reduce::Accumulable + simple_aggrs[0]=(0, count(*)) + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=(#1) + map=(true) + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF + +# Idempotency: a temporal-filter view materialized as an index already absorbs +# the future-update flag. A downstream Reduce reading from the indexed `Get` +# must NOT re-bucket; the pretty-printer omits `input_bucketing_strategy` when +# it is the default `Direct`. +statement ok +CREATE VIEW v_recent AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now(); + +statement ok +CREATE DEFAULT INDEX ON v_recent; + +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_idempotent AS +SELECT k, count(*) FROM v_recent GROUP BY k; +---- +materialize.public.mv_idempotent: + Reduce::Accumulable + simple_aggrs[0]=(0, count(*)) + input_key=#0{k} + key_plan=id + val_plan + project=(#1) + map=(true) + Get::PassArrangements materialize.public.v_recent + raw=false + arrangements[0]={ key=[#0{k}], permutation=id, thinning=() } + +Used Indexes: + - materialize.public.v_recent_primary_idx (*** full scan ***) + +Target cluster: quickstart + +EOF + +# Nested aggregation: an inner Reduce::Hierarchical sits directly above a +# temporal-filter MFP and gets bucketed. The outer `count(*)` Reduce is +# optimized away here, but the test still pins down that the inner Reduce +# gets `input_bucketing_strategy=TemporalBucketing`. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_rearm AS +SELECT bucket, count(*) FROM ( + SELECT k AS bucket, count(*) AS n, max(event_time) AS m + FROM events + WHERE event_time + INTERVAL '45 day' > mz_now() + GROUP BY k +) sub +WHERE m + INTERVAL '1 day' > mz_now() +GROUP BY bucket; +---- +materialize.public.mv_rearm: + Mfp + project=(#0, #2) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{m} + 1 day)))) + map=(1) + input_key=#0 + Reduce::Hierarchical + aggr_funcs=[max] + buckets=[268435456, 16777216, 1048576, 65536, 4096, 256, 16] + temporal_bucketing_strategy=TemporalBucketing + key_plan + project=(#0) + val_plan + project=(#1) + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF From 74ee5441a0ce3328247aef0db01d1ea8ce48c188 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Fri, 22 May 2026 12:56:13 +0200 Subject: [PATCH 02/13] Clarify new_raw arrange_by calls --- src/compute-types/src/plan/lowering.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 1662fa3f91407..430a4d91faa01 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -863,7 +863,8 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - input_future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { input @@ -896,7 +897,8 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - input_future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { input @@ -963,7 +965,7 @@ This is not expected to cause incorrect results, but could indicate a performanc |LoweredExpr { plan, keys, - has_future_updates: future, + has_future_updates: _, }| { // We don't have an MFP here -- install an operator to permute the // input, if necessary. @@ -973,7 +975,8 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { plan From 15c3c0786e5181b8c32f4035dd4b4f83e2b5df39 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Fri, 22 May 2026 13:19:03 +0200 Subject: [PATCH 03/13] Extend temporal bucketing to TopK, Union, and Threshold Follow-up to the Reduce-only temporal-bucketing commit: - TopK: add `temporal_bucketing_strategy: ArrangementStrategy` to `PlanNode::TopK` and `RenderPlan::Expr::TopK`. Lowering populates it via `strategy_from_future(input_future)` and clears `has_future_updates` when `TemporalBucketing` is chosen. `render_topk` calls `apply_bucketing_strategy` on the input stream before building the Top-K arrangement(s). Note that this is not ideal for `MonotonicTop1Plan { must_consolidate: false }`, where we add temporal bucketing with no downstream consolidator; accepted for now and noted in a comment. Later, we could move the temporal bucketing decision into a new LIR pass. - Union: add `temporal_bucketing_strategies: Vec` to `PlanNode::Union` and `RenderPlan::Expr::Union`, lockstep with `inputs`. The renderer applies `apply_bucketing_strategy` per leg before `concatenate`/`consolidate_named`. Per-leg strategies are non-`Direct` only when the Union consolidates its output. - Threshold: clear `has_future_updates` when the conditional `arrange_by` wrap actually fires with `TemporalBucketing`; the synthesized `ArrangeBy` (built with `new_arranged`) installs the bucket op at render time via `ensure_collections`. - Fold the `refine_union_negate_consolidation` LIR pass into lowering, because we need to know about this when the lowering is deciding about temporal bucketing. (Later, we could move the temporal bucketing decision into a new LIR pass.) - Propagate the new fields through json and `verbose text` EXPLAIN, the `PlanNode`/`RenderPlan` interpreter dispatch, and physically-monotonic interpretation. EXPLAIN tests (mirroring commit 623f53d for Reduce): - src/compute-types/src/explain/text.rs prints `temporal_bucketing_strategy=...` for TopK and `temporal_bucketing_strategies=[...]` for Union when any strategy is non-`Direct`. - test/sqllogictest/temporal_bucketing.slt: TopK firing (LIMIT over a temporal filter), TopK idempotency (over an already-bucketed indexed view), Union firing (EXCEPT ALL over two temporal filters). - test/sqllogictest/explain/physical_plan_as_json.slt: expected JSON updated to include the new fields on TopK and Union nodes. --- src/compute-types/src/explain/text.rs | 40 ++++- src/compute-types/src/plan.rs | 62 ++++--- src/compute-types/src/plan/interpret/api.rs | 14 +- src/compute-types/src/plan/lowering.rs | 94 ++++++++++- src/compute-types/src/plan/render_plan.rs | 17 +- src/compute/src/render.rs | 21 ++- src/compute/src/render/top_k.rs | 23 ++- .../explain/physical_plan_as_json.slt | 102 +++++++++--- test/sqllogictest/temporal_bucketing.slt | 151 ++++++++++++++++++ 9 files changed, 453 insertions(+), 71 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 0a6aab33d265a..9357ddc91501e 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -375,7 +375,11 @@ impl Plan { ctx.indent.reset(); } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy: _, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -470,6 +474,7 @@ impl Plan { Union { inputs, consolidate_output, + temporal_bucketing_strategies: _, } => { write!(f, "{}→", ctx.indent)?; if *consolidate_output { @@ -799,7 +804,11 @@ impl Plan { input.fmt_text(f, ctx) })?; } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -860,7 +869,16 @@ impl Plan { } } writeln!(f, "{}", annotations)?; - ctx.indented(|ctx| input.fmt_text(f, ctx))?; + ctx.indented(|ctx| { + if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) { + writeln!( + f, + "{}temporal_bucketing_strategy={}", + ctx.indent, temporal_bucketing_strategy + )?; + } + input.fmt_text(f, ctx) + })?; } Negate { input } => { writeln!(f, "{}Negate{}", ctx.indent, annotations)?; @@ -885,6 +903,7 @@ impl Plan { Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { if *consolidate_output { writeln!( @@ -896,6 +915,21 @@ impl Plan { writeln!(f, "{}Union{}", ctx.indent, annotations)?; } ctx.indented(|ctx| { + if temporal_bucketing_strategies + .iter() + .any(|s| !matches!(s, ArrangementStrategy::Direct)) + { + let strategies = temporal_bucketing_strategies + .iter() + .map(|s| format!("{}", s)) + .collect::>() + .join(", "); + writeln!( + f, + "{}temporal_bucketing_strategies=[{}]", + ctx.indent, strategies + )?; + } for input in inputs.iter() { input.fmt_text(f, ctx)?; } diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 35168618f054f..23a6021996b18 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -342,6 +342,14 @@ pub enum PlanNode { /// on the properties of the reduction, and the input itself. Please check /// out the documentation for this type for more detail. top_k_plan: TopKPlan, + /// Strategy for bucketing the input collection ahead of the Top-K operator. + /// + /// Set by the lowering from the input's `has_future_updates` flag. The + /// renderer applies it to the per-row input stream at the top of + /// `render_topk`, covering all three `TopKPlan` arms uniformly. See + /// `PlanNode::Reduce::temporal_bucketing_strategy` for the underlying + /// convention. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -373,6 +381,15 @@ pub enum PlanNode { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input bucketing strategies. Lockstep with `inputs`: index `i` is the + /// strategy applied to `inputs[i]` before concatenation. + /// + /// Set by the lowering from each input's `has_future_updates` flag. Only + /// consolidating Unions (`consolidate_output: true`) carry non-`Direct` + /// entries, because bucketing only pays off ahead of a consolidating + /// downstream operator. See `PlanNode::Reduce::temporal_bucketing_strategy` + /// for the underlying convention. + temporal_bucketing_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -536,9 +553,14 @@ impl Plan { // Subsequently, we perform plan refinements for the dataflow. Self::refine_source_mfps(&mut dataflow); - if features.enable_consolidate_after_union_negate { - Self::refine_union_negate_consolidation(&mut dataflow); - } + // Note: `consolidate_output` for `Union` and per-input + // `temporal_bucketing_strategies` are decided at lowering time (see the + // `Union` arm of `lower_mir_expr_stack_safe`). The pre-existing + // `refine_union_negate_consolidation` pass — which used to flip + // `consolidate_output` to `true` for Unions with a `Negate` child — has + // been folded into the lowering, since lowering is the only point where + // the bucketing decision (which depends on `has_future_updates`) is + // available. if dataflow.is_single_time() { Self::refine_single_time_operator_selection(&mut dataflow); @@ -657,38 +679,6 @@ impl Plan { mz_repr::explain::trace_plan(dataflow); } - /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input. - #[mz_ore::instrument( - target = "optimizer", - level = "debug", - fields(path.segment = "refine_union_negate_consolidation") - )] - fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription) { - for build_desc in dataflow.objects_to_build.iter_mut() { - let mut todo = vec![&mut build_desc.plan]; - while let Some(expression) = todo.pop() { - let node = &mut expression.node; - match node { - PlanNode::Union { - inputs, - consolidate_output, - .. - } => { - if inputs - .iter() - .any(|input| matches!(input.node, PlanNode::Negate { .. })) - { - *consolidate_output = true; - } - } - _ => {} - } - todo.extend(node.children_mut()); - } - } - mz_repr::explain::trace_plan(dataflow); - } - /// Refines the plans of objects to be built as part of `dataflow` to take advantage /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a /// one-shot SELECT query. @@ -798,6 +788,7 @@ impl CollectionPlan for PlanNode { | PlanNode::Union { inputs, consolidate_output: _, + temporal_bucketing_strategies: _, } => { for input in inputs { input.depends_on_into(out); @@ -833,6 +824,7 @@ impl CollectionPlan for PlanNode { | PlanNode::TopK { input, top_k_plan: _, + temporal_bucketing_strategy: _, } | PlanNode::Negate { input } | PlanNode::Threshold { diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 1898e542df0af..033ac74b30526 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -407,7 +407,11 @@ where mfp_after, )) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -431,6 +435,7 @@ where Union { inputs, consolidate_output, + temporal_bucketing_strategies: _, } => { // Descend recursively into all children. let inputs = inputs @@ -695,7 +700,11 @@ where // Pass the interpretation result up. Ok(result) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -733,6 +742,7 @@ where Union { inputs, consolidate_output, + temporal_bucketing_strategies: _, } => { // Descend recursively into all children. let inputs: Vec<_> = inputs diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 430a4d91faa01..357b02aa35d6a 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -67,6 +67,10 @@ pub(super) struct Context { debug_info: LirDebugInfo, /// Whether to enable fusion of MFPs in reductions. enable_reduce_mfp_fusion: bool, + /// Whether `Union`s with at least one `Negate` input should set + /// `consolidate_output = true`. Folded in from the deleted + /// `refine_union_negate_consolidation` LIR pass. + enable_consolidate_after_union_negate: bool, } impl Context { @@ -80,6 +84,7 @@ impl Context { id: GlobalId::Transient(0), }, enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion, + enable_consolidate_after_union_negate: features.enable_consolidate_after_union_negate, } } @@ -870,15 +875,27 @@ This is not expected to cause incorrect results, but could indicate a performanc input }; // Return the plan, and no arrangements. + // + // `TopK` itself buckets (via `apply_bucketing_strategy` at the + // top of `render_topk`) when the strategy says so, so the + // output flag is cleared whenever bucketing actually fires. + // + // Bucketing absorption: see `strategy_from_future`. + let temporal_bucketing_strategy = strategy_from_future(input_future); + let has_future_updates = match temporal_bucketing_strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => input_future, + }; let lir_id = self.allocate_lir_id(); LoweredExpr { plan: PlanNode::TopK { input: Box::new(input), top_k_plan, + temporal_bucketing_strategy, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), - has_future_updates: input_future, + has_future_updates, } } MirRelationExpr::Negate { input } => { @@ -922,11 +939,17 @@ This is not expected to cause incorrect results, but could indicate a performanc } = self.lower_mir_expr(input)?; let arity = input.arity(); let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity); - let plan = if !keys + // If the conditional `arrange_by` wrap fires, the synthesized + // `ArrangeBy` builds a real arrangement (`new_arranged`) and so + // its `apply_bucketing_strategy` call at render time will + // actually bucket. If the wrap is skipped, the input already + // had a suitable arrangement and an upstream operator was + // responsible for bucketing. + let wrap_fires = !keys .arranged .iter() - .any(|(key, _, _)| key == &required_arrangement.0) - { + .any(|(key, _, _)| key == &required_arrangement.0); + let plan = if wrap_fires { self.arrange_by( plan, AvailableCollections::new_arranged(vec![required_arrangement]), @@ -938,6 +961,18 @@ This is not expected to cause incorrect results, but could indicate a performanc plan }; + // Clear the future flag whenever the wrap actually bucketed + // the future-stamped updates. + let has_future_updates = if wrap_fires + && matches!( + strategy_from_future(input_future), + ArrangementStrategy::TemporalBucketing + ) { + false + } else { + input_future + }; + let output_keys = threshold_plan.keys(); // Return the plan, and any produced keys. let lir_id = self.allocate_lir_id(); @@ -948,7 +983,7 @@ This is not expected to cause incorrect results, but could indicate a performanc } .as_plan(lir_id), keys: output_keys, - has_future_updates: input_future, + has_future_updates, } } MirRelationExpr::Union { base, inputs } => { @@ -958,7 +993,49 @@ This is not expected to cause incorrect results, but could indicate a performanc for input in inputs.iter() { lowered_inputs.push(self.lower_mir_expr(input)?); } - let any_future = lowered_inputs.iter().any(|l| l.has_future_updates); + + // Fold the deleted `refine_union_negate_consolidation` pass in + // here: a Union with any `Negate` input should consolidate its + // output. The lowering is the only place where this decision + // can be coupled with the per-input bucketing strategy. + let has_negate_input = lowered_inputs + .iter() + .any(|l| matches!(l.plan.node, PlanNode::Negate { .. })); + let consolidate_output = + self.enable_consolidate_after_union_negate && has_negate_input; + + // Per-input bucketing strategies: only meaningful when the + // Union consolidates its output, since bucketing only pays off + // ahead of a downstream consolidator. + let temporal_bucketing_strategies: Vec = if consolidate_output + { + lowered_inputs + .iter() + .map(|l| strategy_from_future(l.has_future_updates)) + .collect() + } else { + lowered_inputs + .iter() + .map(|_| ArrangementStrategy::Direct) + .collect() + }; + + // If the Union itself buckets each input (consolidating Union + // with `TemporalBucketing` strategies), the future-stamped + // updates are absorbed here and we should clear the outer + // flag for those legs. Otherwise, propagate `any_future`. + let has_future_updates = if consolidate_output { + lowered_inputs + .iter() + .zip_eq(temporal_bucketing_strategies.iter()) + .any(|(l, s)| { + l.has_future_updates + && !matches!(s, ArrangementStrategy::TemporalBucketing) + }) + } else { + lowered_inputs.iter().any(|l| l.has_future_updates) + }; + let plans = lowered_inputs .into_iter() .map( @@ -989,11 +1066,12 @@ This is not expected to cause incorrect results, but could indicate a performanc LoweredExpr { plan: PlanNode::Union { inputs: plans, - consolidate_output: false, + consolidate_output, + temporal_bucketing_strategies, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), - has_future_updates: any_future, + has_future_updates, } } MirRelationExpr::ArrangeBy { input, keys } => { diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 2dc21dc133d2b..00d43389a5f61 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -229,6 +229,9 @@ pub enum Expr { /// the Top-K, and the input itself. Please check out the documentation for this type for /// more detail. top_k_plan: TopKPlan, + /// How the renderer should bucket the input collection ahead of the Top-K operator. + /// Mirrors [`PlanNode::TopK::temporal_bucketing_strategy`]. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -259,6 +262,9 @@ pub enum Expr { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input bucketing strategies, lockstep with `inputs`. Mirrors + /// [`PlanNode::Union::temporal_bucketing_strategies`]. + temporal_bucketing_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -455,10 +461,15 @@ impl TryFrom for LetFreePlan { todo.push((*input, Some(lir_id), nesting.saturating_add(1))); } - PlanNode::TopK { input, top_k_plan } => { + PlanNode::TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { let expr = TopK { input: input.lir_id, top_k_plan, + temporal_bucketing_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -487,10 +498,12 @@ impl TryFrom for LetFreePlan { PlanNode::Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { let expr = Union { inputs: inputs.iter().map(|i| i.lir_id).collect(), consolidate_output, + temporal_bucketing_strategies, }; insert_node(lir_id, parent, expr, nesting); @@ -945,6 +958,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { TopK { input: _, top_k_plan, + temporal_bucketing_strategy: _, } => { match top_k_plan { TopKPlan::MonotonicTop1(..) => write!(f, "Monotonic Top1")?, @@ -971,6 +985,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { Union { inputs: _, consolidate_output, + temporal_bucketing_strategies: _, } => { if *consolidate_output { write!(f, "Consolidating ")?; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 8041d3b513dee..03cd5bb0adda1 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -118,6 +118,7 @@ use differential_dataflow::trace::{BatchReader, TraceReader}; use differential_dataflow::{AsCollection, Data, VecCollection}; use futures::FutureExt; use futures::channel::oneshot; +use itertools::Itertools; use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, @@ -1277,9 +1278,13 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { temporal_bucketing_strategy, ) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { let input = expect_input(input); - self.render_topk(input, top_k_plan) + self.render_topk(input, top_k_plan, temporal_bucketing_strategy) } Negate { input } => { let input = expect_input(input); @@ -1296,12 +1301,22 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { let mut oks = Vec::new(); let mut errs = Vec::new(); - for input in inputs.into_iter() { + for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); + // Apply per-input temporal bucketing. No-op for `Direct`. + // Only consolidating Unions carry non-`Direct` strategies; + // see the `Union` arm of `lower_mir_expr_stack_safe`. + let (os, _bucketed) = context::apply_bucketing_strategy( + os, + strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); oks.push(os); errs.push(es); } diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..8ac2e32159d84 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -24,6 +24,7 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, }; @@ -50,14 +51,34 @@ use crate::row_spine::{ use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine}; // The implementation requires integer timestamps to be able to delay feedback for monotonic inputs. -impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { +impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime> + Context<'scope, T> +{ pub(crate) fn render_topk( &self, input: CollectionBundle<'scope, T>, top_k_plan: TopKPlan, + temporal_bucketing_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set); + // Bucket the per-row input stream when lowering chose `TemporalBucketing`. + // `TopK` builds its own arrangement(s) inside the variants below, bypassing + // `ensure_collections`, so the strategy is plumbed through `PlanNode::TopK` + // rather than inferred at the arrangement site. `apply_bucketing_strategy` + // is a no-op for `Direct`. + // + // Note: for `MonotonicTop1Plan` with `must_consolidate: false`, bucketing + // adds a small operator overhead with no corresponding consolidation + // benefit. We accept this for now; a future refinement could gate the + // bucket op on a `has_batcher_downstream()` check. + let (ok_input, _bucketed) = crate::render::context::apply_bucketing_strategy( + ok_input, + temporal_bucketing_strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); + // We create a new region to compartmentalize the topk logic. let outer_scope = ok_input.scope(); let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| { diff --git a/test/sqllogictest/explain/physical_plan_as_json.slt b/test/sqllogictest/explain/physical_plan_as_json.slt index 9490192b838d9..d5924e6999ada 100644 --- a/test/sqllogictest/explain/physical_plan_as_json.slt +++ b/test/sqllogictest/explain/physical_plan_as_json.slt @@ -577,7 +577,8 @@ SELECT * FROM ov "arity": 2, "must_consolidate": true } - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -699,7 +700,11 @@ SELECT a FROM t EXCEPT ALL SELECT b FROM mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -885,7 +890,8 @@ SELECT * FROM ov "arity": 2, "must_consolidate": true } - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -1013,7 +1019,11 @@ WITH cte(x) as (SELECT a FROM t EXCEPT ALL SELECT b FROM mv) } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -1235,7 +1245,11 @@ WITH cte(x) as (SELECT a FROM t EXCEPT ALL SELECT b FROM mv) } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -1475,7 +1489,11 @@ SELECT x * 5 FROM cte WHERE x = 5 } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -2278,7 +2296,11 @@ FROM t } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -2327,7 +2349,11 @@ FROM t } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -2704,7 +2730,11 @@ MATERIALIZED VIEW hierarchical_global_mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -2753,7 +2783,11 @@ MATERIALIZED VIEW hierarchical_global_mv } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -3012,7 +3046,11 @@ SELECT * FROM hierarchical_global } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -3061,7 +3099,11 @@ SELECT * FROM hierarchical_global } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -4332,7 +4374,11 @@ FROM t } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -4381,7 +4427,11 @@ FROM t } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -8470,7 +8520,11 @@ MATERIALIZED VIEW collated_global_mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -8583,7 +8637,11 @@ MATERIALIZED VIEW collated_global_mv } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -9822,7 +9880,11 @@ SELECT * FROM collated_global } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -9935,7 +9997,11 @@ SELECT * FROM collated_global } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } diff --git a/test/sqllogictest/temporal_bucketing.slt b/test/sqllogictest/temporal_bucketing.slt index 0e72aec250c34..ed9bf2ec7220f 100644 --- a/test/sqllogictest/temporal_bucketing.slt +++ b/test/sqllogictest/temporal_bucketing.slt @@ -125,3 +125,154 @@ Source materialize.public.events Target cluster: quickstart EOF + +# ----------------------------------------------------------------------------- +# Tests for extended LIR-level temporal bucketing on TopK, Union, and +# Threshold (follow-up to the Reduce-only commit). See +# `agent-notes/temporal-bucketing-reduce/increase-coverage.md`. +# ----------------------------------------------------------------------------- + +# TopK firing: temporal filter directly under a `SELECT ... ORDER BY ... LIMIT`. +# Lowering should set `temporal_bucketing_strategy=TemporalBucketing` on the +# TopK and clear the future-updates flag at that site so the parent does not +# also bucket. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_topk_firing AS +SELECT k, event_time +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +ORDER BY event_time +LIMIT 5; +---- +materialize.public.mv_topk_firing: + TopK::Basic order_by=[#1 asc nulls_last] limit=5 + temporal_bucketing_strategy=TemporalBucketing + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF + +# TopK idempotency: when the input is an already-bucketed indexed view, the +# TopK should NOT re-bucket; the pretty-printer omits the field when it is +# `Direct`. +statement ok +CREATE VIEW v_topk_input AS +SELECT k, event_time FROM events WHERE event_time + INTERVAL '45 day' > mz_now(); + +statement ok +CREATE DEFAULT INDEX ON v_topk_input; + +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_topk_idempotent AS +SELECT k, event_time FROM v_topk_input ORDER BY event_time LIMIT 5; +---- +materialize.public.mv_topk_idempotent: + TopK::Basic order_by=[#1 asc nulls_last] limit=5 + ArrangeBy + input_key=[#0{k}, #1{event_time}] + raw=true + Get::PassArrangements materialize.public.v_topk_input + raw=false + arrangements[0]={ key=[#0{k}, #1{event_time}], permutation=id, thinning=() } + +Used Indexes: + - materialize.public.v_topk_input_primary_idx (*** full scan ***) + +Target cluster: quickstart + +EOF + +# Union firing: `EXCEPT ALL` lowers to a `Union` over a `Negate` input, which +# triggers consolidation. With at least one input carrying future updates, +# the per-input `temporal_bucketing_strategies` should include +# `TemporalBucketing`. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_union_firing AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT ALL +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_union_firing: + Threshold::Basic ensure_arrangement={ key=[#0], permutation=id, thinning=() } + ArrangeBy + raw=false + arrangements[0]={ key=[#0], permutation=id, thinning=() } + Union consolidate_output=true + temporal_bucketing_strategies=[TemporalBucketing, TemporalBucketing] + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + raw=true + Negate + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days)))) + raw=true + +Target cluster: quickstart + +EOF + +# `EXCEPT` (distinct) lowers to `Distinct -> Union -> Threshold`, so each leg +# already terminates in a `Reduce::Distinct` that consumes the future-updates +# flag itself (via the precursor `Reduce`-input bucketing). By the time data +# reaches the `Union` and `Threshold`, `has_future_updates` is already cleared, +# so neither the `Union` per-input strategies nor the `Threshold` synthetic +# `ArrangeBy` wrap fire as bucketing sites here -- bucketing lands on the +# inner `Reduce::Distinct` operators instead. +# +# This means the `Threshold` flag-clear code in `lowering.rs` is currently +# unreachable from any SQL surface (`EXCEPT` strips the flag via `Distinct`; +# `EXCEPT ALL` consumes it at the consolidating `Union` per-input bucketing). +# We keep the `Threshold` flag-clear logic as defense-in-depth for future MIR +# producers, but no fixture exercises it. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_except_distinct_buckets_at_reduce AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_except_distinct_buckets_at_reduce: + Threshold::Basic ensure_arrangement={ key=[#0], permutation=id, thinning=() } + ArrangeBy + raw=false + arrangements[0]={ key=[#0], permutation=id, thinning=() } + Union consolidate_output=true + ArrangeBy + input_key=[#0] + raw=true + Reduce::Distinct + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=() + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + raw=true + Negate + ArrangeBy + input_key=[#0] + raw=true + Reduce::Distinct + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=() + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days)))) + raw=true + +Target cluster: quickstart + +EOF From 8306cddb24ec5ae30d1f40bbf7a06de2c6de2f20 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Fri, 22 May 2026 14:38:41 +0200 Subject: [PATCH 04/13] Assert that TopK with temporal bucketing but 'must_consolidate: false' can't happen --- src/compute/src/render/top_k.rs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 8ac2e32159d84..ffafacb5f1613 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -68,10 +68,33 @@ impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTim // rather than inferred at the arrangement site. `apply_bucketing_strategy` // is a no-op for `Direct`. // - // Note: for `MonotonicTop1Plan` with `must_consolidate: false`, bucketing - // adds a small operator overhead with no corresponding consolidation - // benefit. We accept this for now; a future refinement could gate the - // bucket op on a `has_batcher_downstream()` check. + // Note: a `MonotonicTop1Plan`/`MonotonicTopKPlan` with `must_consolidate = + // false` together with `TemporalBucketing` here would mean we install a + // bucket operator with no downstream consolidator -- pure overhead. That + // combination cannot actually occur: `RelaxMustConsolidate` (which is the + // only writer of `must_consolidate = false`) runs only on single-time + // dataflows (one-shot peeks / `COPY TO`), and in single-time dataflows + // `ExprPrepOneShot` constant-folds `mz_now()` to the dataflow `as_of` + // before lowering, so no temporal predicates survive into LIR and + // `has_future_updates` is `false` everywhere -- meaning no operator (TopK + // included) is ever lowered with `TemporalBucketing`. The assertion below + // pins down this invariant. + if matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ) { + let must_consolidate = match &top_k_plan { + TopKPlan::MonotonicTop1(p) => p.must_consolidate, + TopKPlan::MonotonicTopK(p) => p.must_consolidate, + TopKPlan::Basic(_) => true, + }; + soft_assert_or_log!( + must_consolidate, + "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \ + `RelaxMustConsolidate` only runs on single-time dataflows where \ + `mz_now()` has been const-folded and no temporal bucketing is set", + ); + } let (ok_input, _bucketed) = crate::render::context::apply_bucketing_strategy( ok_input, temporal_bucketing_strategy, From f73a89129c63420a1eb6d30337d85b51bffe1107 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Fri, 22 May 2026 15:19:19 +0200 Subject: [PATCH 05/13] Add temporal bucketing to the default EXPLAIN --- src/compute-types/src/explain/text.rs | 68 ++++++++++++++++++------ test/sqllogictest/explain/default.slt | 75 +++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 16 deletions(-) diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 9357ddc91501e..316a315778cae 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -290,7 +290,7 @@ impl Plan { key_val_plan, plan, mfp_after, - temporal_bucketing_strategy: _, + temporal_bucketing_strategy, } => { ctx.indent.set(); if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() { @@ -300,23 +300,36 @@ impl Plan { ctx.indent += 1; } + let temporally_bucketed = matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ); + use crate::plan::reduce::ReducePlan; match plan { ReducePlan::Distinct => { - writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Distinct GroupAggregate{annotations}")?; } ReducePlan::Accumulable(plan) => { - writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Accumulable GroupAggregate{annotations}")?; ctx.indented(|ctx| plan.fmt_text(f, ctx))?; } ReducePlan::Hierarchical( plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }), ) => { - write!( - f, - "{}→Bucketed Hierarchical GroupAggregate (buckets: ", - ctx.indent - )?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + write!(f, "Bucketed Hierarchical GroupAggregate (buckets:")?; for bucket in buckets { write!(f, " {bucket}")?; } @@ -329,6 +342,9 @@ impl Plan { }), ) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if *must_consolidate { write!(f, "Consolidating ")?; } @@ -350,11 +366,11 @@ impl Plan { ctx.indent += 1; } } - writeln!( - f, - "{}→Non-incremental GroupAggregate{annotations}", - ctx.indent - )?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Non-incremental GroupAggregate{annotations}")?; ctx.indented(|ctx| plan.fmt_text(f, ctx))?; ctx.indent.reset(); } @@ -378,12 +394,19 @@ impl Plan { TopK { input, top_k_plan, - temporal_bucketing_strategy: _, + temporal_bucketing_strategy, } => { + let temporally_bucketed = matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ); use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if plan.must_consolidate { write!(f, "Consolidating ")?; } @@ -403,6 +426,9 @@ impl Plan { } TopKPlan::MonotonicTopK(plan) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if plan.must_consolidate { write!(f, "Consolidating ")?; } @@ -425,7 +451,11 @@ impl Plan { })?; } TopKPlan::Basic(plan) => { - writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Non-monotonic TopK{annotations}")?; ctx.indented(|ctx| { if plan.group_key.len() > 0 { @@ -474,9 +504,15 @@ impl Plan { Union { inputs, consolidate_output, - temporal_bucketing_strategies: _, + temporal_bucketing_strategies, } => { + let any_temporally_bucketed = temporal_bucketing_strategies + .iter() + .any(|s| matches!(s, ArrangementStrategy::TemporalBucketing)); write!(f, "{}→", ctx.indent)?; + if any_temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if *consolidate_output { write!(f, "Consolidating ")?; } diff --git a/test/sqllogictest/explain/default.slt b/test/sqllogictest/explain/default.slt index fca67c0c4a846..2e66bf1bcdb25 100644 --- a/test/sqllogictest/explain/default.slt +++ b/test/sqllogictest/explain/default.slt @@ -2201,3 +2201,78 @@ Used Indexes: Target cluster: no_replicas EOF + +# Default-format tests for the `Temporally-Bucketed` prefix on Reduce, TopK, +# and Union. The default format only emits the prefix when the operator's +# `temporal_bucketing_strategy` (or any per-input strategy for Union) is +# `TemporalBucketing`; otherwise the operator is printed unchanged. + +statement ok +CREATE TABLE events (k INT NOT NULL, event_time TIMESTAMP NOT NULL); + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_reduce AS +SELECT k, count(*) +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +GROUP BY k; +---- +materialize.public.mv_default_reduce: + →Temporally-Bucketed Accumulable GroupAggregate + Simple aggregates: count(*) + →Read materialize.public.events + +Source materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: no_replicas + +EOF + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_topk AS +SELECT k, event_time +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +ORDER BY event_time LIMIT 5; +---- +materialize.public.mv_default_topk: + →Temporally-Bucketed Non-monotonic TopK + Order By #1 asc nulls_last + Limit 5 + →Read materialize.public.events + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: no_replicas + +EOF + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_union AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT ALL +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_default_union: + →Threshold Diffs #0 + →Arrange (#0) + →Temporally-Bucketed Consolidating Union + →Fused with Child Map/Filter/Project + Project: #0 + Filter: (mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days))) + →Read materialize.public.events + →Negate Diffs + →Fused with Child Map/Filter/Project + Project: #0 + Filter: (mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days))) + →Read materialize.public.events + +Target cluster: no_replicas + +EOF From 0e5f7cb02cf62241a627014c846572f34752b585 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 12:13:07 +0200 Subject: [PATCH 06/13] Simplify trait bounds on `maybe_apply_temporal_bucketing` --- src/compute/src/render.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 03cd5bb0adda1..e968c65ba9f85 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1510,11 +1510,8 @@ pub trait MaybeBucketByTime: Timestamp { summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData + D: differential_dataflow::ExchangeData + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug + differential_dataflow::Hashable; } @@ -1546,11 +1543,8 @@ impl MaybeBucketByTime for mz_repr::Timestamp { summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData + D: differential_dataflow::ExchangeData + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug + differential_dataflow::Hashable, { stream @@ -1595,11 +1589,8 @@ impl MaybeBucketByTime for Product> { _summary: mz_repr::Timestamp, ) -> VecCollection<'scope, Self, D, Diff> where - D: timely::ExchangeData + D: differential_dataflow::ExchangeData + crate::typedefs::MzData - + Ord - + Clone - + std::fmt::Debug + differential_dataflow::Hashable, { // TODO: Implement bucketing on outer timestamp for iterative scopes. From 77399dc235d8c9bc82dd8557cd958c9532a5560f Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 12:20:25 +0200 Subject: [PATCH 07/13] Delete the `enable_consolidate_after_union_negate` flag This has been turned on everywhere for a long time. --- src/compute-types/src/plan/lowering.rs | 12 ++---------- src/repr/src/optimize.rs | 4 ---- src/sql/src/plan/statement/ddl.rs | 1 - src/sql/src/plan/statement/dml.rs | 1 - src/sql/src/session/vars.rs | 5 ----- src/sql/src/session/vars/definitions.rs | 10 ---------- test/sqlancer/Dockerfile | 3 ++- test/testdrive/session.td | 1 - 8 files changed, 4 insertions(+), 33 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 357b02aa35d6a..37577b541598e 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -67,10 +67,6 @@ pub(super) struct Context { debug_info: LirDebugInfo, /// Whether to enable fusion of MFPs in reductions. enable_reduce_mfp_fusion: bool, - /// Whether `Union`s with at least one `Negate` input should set - /// `consolidate_output = true`. Folded in from the deleted - /// `refine_union_negate_consolidation` LIR pass. - enable_consolidate_after_union_negate: bool, } impl Context { @@ -84,7 +80,6 @@ impl Context { id: GlobalId::Transient(0), }, enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion, - enable_consolidate_after_union_negate: features.enable_consolidate_after_union_negate, } } @@ -994,15 +989,12 @@ This is not expected to cause incorrect results, but could indicate a performanc lowered_inputs.push(self.lower_mir_expr(input)?); } - // Fold the deleted `refine_union_negate_consolidation` pass in - // here: a Union with any `Negate` input should consolidate its + // A Union with any `Negate` input should consolidate its // output. The lowering is the only place where this decision // can be coupled with the per-input bucketing strategy. - let has_negate_input = lowered_inputs + let consolidate_output = lowered_inputs .iter() .any(|l| matches!(l.plan.node, PlanNode::Negate { .. })); - let consolidate_output = - self.enable_consolidate_after_union_negate && has_negate_input; // Per-input bucketing strategies: only meaningful when the // Union consolidates its output, since bucketing only pays off diff --git a/src/repr/src/optimize.rs b/src/repr/src/optimize.rs index e7928cb9f63ee..5e18ba8696a77 100644 --- a/src/repr/src/optimize.rs +++ b/src/repr/src/optimize.rs @@ -97,10 +97,6 @@ optimizer_feature_flags!({ // Use `EquivalenceClassesWithholdingErrors` instead of raw // `EquivalenceClasses` during eq prop for joins. enable_eq_classes_withholding_errors: bool, - // Enable consolidation of unions that happen immediately after negate. - // - // The refinement happens in the LIR ⇒ LIR phase. - enable_consolidate_after_union_negate: bool, // Bound from `SystemVars::enable_eager_delta_joins`. enable_eager_delta_joins: bool, // Enable Lattice-based fixpoint iteration on LetRec nodes in the diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index a298ffd3da76a..a4112645f389d 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -4856,7 +4856,6 @@ pub fn unplan_create_cluster( }) => { let schedule = unplan_cluster_schedule(schedule); let OptimizerFeatureOverrides { - enable_consolidate_after_union_negate: _, enable_reduce_mfp_fusion: _, enable_cardinality_estimates: _, persist_fast_path_limit: _, diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 9a9eca366ebdf..28124bdab69e6 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -622,7 +622,6 @@ impl TryFrom for ExplainConfig { enable_new_outer_join_lowering: v.enable_new_outer_join_lowering, enable_variadic_left_join_lowering: v.enable_variadic_left_join_lowering, enable_letrec_fixpoint_analysis: v.enable_letrec_fixpoint_analysis, - enable_consolidate_after_union_negate: Default::default(), enable_reduce_mfp_fusion: Default::default(), enable_cardinality_estimates: Default::default(), persist_fast_path_limit: Default::default(), diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 34fac2472525e..835826bc14883 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1196,7 +1196,6 @@ impl SystemVars { &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES, &REPLICA_STATUS_HISTORY_RETENTION_WINDOW, &ENABLE_STORAGE_SHARD_FINALIZATION, - &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE, &ENABLE_DEFAULT_CONNECTION_VALIDATION, &DEFAULT_TIMESTAMP_INTERVAL, &MIN_TIMESTAMP_INTERVAL, @@ -2035,10 +2034,6 @@ impl SystemVars { *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION) } - pub fn enable_consolidate_after_union_negate(&self) -> bool { - *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE) - } - /// Returns the `enable_default_connection_validation` configuration parameter. pub fn enable_default_connection_validation(&self) -> bool { *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 228a00a3169e6..a08c06f8526e0 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1436,13 +1436,6 @@ pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new false, ); -pub static ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE: VarDefinition = VarDefinition::new( - "enable_consolidate_after_union_negate", - value!(bool; true), - "consolidation after Unions that have a Negated input (Materialize).", - true, -); - pub static DEFAULT_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new( "default_timestamp_interval", value!(Duration; Duration::from_millis(1000)), @@ -2244,7 +2237,6 @@ feature_flags!( impl From<&super::SystemVars> for OptimizerFeatures { fn from(vars: &super::SystemVars) -> Self { Self { - enable_consolidate_after_union_negate: vars.enable_consolidate_after_union_negate(), enable_eager_delta_joins: vars.enable_eager_delta_joins(), enable_new_outer_join_lowering: vars.enable_new_outer_join_lowering(), enable_reduce_mfp_fusion: vars.enable_reduce_mfp_fusion(), @@ -2288,7 +2280,6 @@ mod tests { let false_features = OptimizerFeatures::default(); let OptimizerFeatures { enable_eq_classes_withholding_errors, - enable_consolidate_after_union_negate, enable_eager_delta_joins, enable_letrec_fixpoint_analysis, enable_new_outer_join_lowering, @@ -2318,7 +2309,6 @@ mod tests { } set_var!(enable_eq_classes_withholding_errors); - set_var!(enable_consolidate_after_union_negate); set_var!(enable_eager_delta_joins); set_var!(enable_letrec_fixpoint_analysis); set_var!(enable_new_outer_join_lowering); diff --git a/test/sqlancer/Dockerfile b/test/sqlancer/Dockerfile index 2044bda3cfe81..503d371d93aef 100644 --- a/test/sqlancer/Dockerfile +++ b/test/sqlancer/Dockerfile @@ -23,9 +23,10 @@ RUN apt-get update && TZ=UTC DEBIAN_FRONTEND=noninteractive apt-get install -y - && rm -rf /usr/share/doc/* /usr/share/man/* /usr/share/info/* /usr/share/locale/* /var/cache/* /var/log/* # TODO: Put this back when merged: https://github.com/sqlancer/sqlancer/pull/1307 & DQP +# ggevay: But in the meantime there have been more changes to our fork, see the `main` branch there. RUN git clone https://github.com/MaterializeInc/sqlancer \ && cd sqlancer \ - && git checkout 24bccd84c7a7279fc3746c54884c9b742b6a5ff2 \ + && git checkout 933b9fa077916fa8f431612d6892f6fa4c711a78 \ && rm -rf .git \ && mvn package -DskipTests # RUN git clone --depth=1 --single-branch https://github.com/sqlancer/sqlancer \ diff --git a/test/testdrive/session.td b/test/testdrive/session.td index 638c56fa465cf..1572e4011965a 100644 --- a/test/testdrive/session.td +++ b/test/testdrive/session.td @@ -84,7 +84,6 @@ TimeZone UTC "Sets the time transaction_isolation "strict serializable" "Sets the current transaction's isolation level (PostgreSQL)." unsafe_new_transaction_wall_time "" "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. If not set, uses the system's clock." welcome_message on "Whether to send a notice with a welcome message after a successful connection (Materialize)." -enable_consolidate_after_union_negate on "consolidation after Unions that have a Negated input (Materialize)." force_source_table_syntax off "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources" > SET application_name = 'foo' From 4a93c9c331b5777e253b93217336caaaffd4693e Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 12:25:24 +0200 Subject: [PATCH 08/13] Correct comment about cross-dataflow bucketing --- src/compute-types/src/plan/lowering.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 37577b541598e..15d2706d1dbdd 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -300,11 +300,8 @@ impl Context { // from the underlying binding — applying an MFP doesn't drop future- // timestamped updates that already exist on the input. // - // TODO(temporal-bucketing): `has_future_updates` is computed per - // dataflow; we don't currently propagate it across `Id::Global` - // boundaries (e.g., from an MV's dataflow to its consumer's), so a - // downstream-only `Get`-then-`ArrangeBy` won't bucket unless the - // consumer has its own local temporal MFP. + // Note that global Gets from different dataflows can't have future updates, because + // both indexes and materialized views hold back future updates. let has_future_updates = self.has_future_updates.contains(id) || match &plan { GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => { From b982f81aa12243daec6c594475e086a8e6442303 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 13:01:27 +0200 Subject: [PATCH 09/13] Simplify Threshold --- src/compute-types/src/plan/lowering.rs | 32 +++++++------------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 15d2706d1dbdd..7b487bee0d301 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -931,38 +931,22 @@ This is not expected to cause incorrect results, but could indicate a performanc } = self.lower_mir_expr(input)?; let arity = input.arity(); let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity); - // If the conditional `arrange_by` wrap fires, the synthesized - // `ArrangeBy` builds a real arrangement (`new_arranged`) and so - // its `apply_bucketing_strategy` call at render time will - // actually bucket. If the wrap is skipped, the input already - // had a suitable arrangement and an upstream operator was - // responsible for bucketing. - let wrap_fires = !keys + + let (plan, has_future_updates) = if !keys .arranged .iter() - .any(|(key, _, _)| key == &required_arrangement.0); - let plan = if wrap_fires { - self.arrange_by( + .any(|(key, _, _)| key == &required_arrangement.0) + { + let plan = self.arrange_by( plan, AvailableCollections::new_arranged(vec![required_arrangement]), &keys, arity, input_future, - ) - } else { - plan - }; - - // Clear the future flag whenever the wrap actually bucketed - // the future-stamped updates. - let has_future_updates = if wrap_fires - && matches!( - strategy_from_future(input_future), - ArrangementStrategy::TemporalBucketing - ) { - false + ); + (plan, false) } else { - input_future + (plan, input_future) }; let output_keys = threshold_plan.keys(); From 2321cab6cae7206fa68cb6bbf73b20075a806479 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 13:19:42 +0200 Subject: [PATCH 10/13] Simplify TopK --- src/compute-types/src/plan/lowering.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 7b487bee0d301..ced85f72a564b 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -867,17 +867,7 @@ This is not expected to cause incorrect results, but could indicate a performanc input }; // Return the plan, and no arrangements. - // - // `TopK` itself buckets (via `apply_bucketing_strategy` at the - // top of `render_topk`) when the strategy says so, so the - // output flag is cleared whenever bucketing actually fires. - // - // Bucketing absorption: see `strategy_from_future`. let temporal_bucketing_strategy = strategy_from_future(input_future); - let has_future_updates = match temporal_bucketing_strategy { - ArrangementStrategy::TemporalBucketing => false, - ArrangementStrategy::Direct => input_future, - }; let lir_id = self.allocate_lir_id(); LoweredExpr { plan: PlanNode::TopK { @@ -887,7 +877,7 @@ This is not expected to cause incorrect results, but could indicate a performanc } .as_plan(lir_id), keys: AvailableCollections::new_raw(), - has_future_updates, + has_future_updates: false, } } MirRelationExpr::Negate { input } => { From cc277cc6edeb7a75636b781118681381c252bea0 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 13:30:36 +0200 Subject: [PATCH 11/13] Simplify and future-proof Reduce --- src/compute-types/src/plan/lowering.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index ced85f72a564b..67a81b6bce44f 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -1271,15 +1271,8 @@ This is not expected to cause incorrect results, but could indicate a performanc // `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and // `render_reduce` applies bucketing to the keyed `(key, val)` stream itself. let temporal_bucketing_strategy = strategy_from_future(input_future); - // `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual - // MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce - // future updates. - // - // Bucketing absorption: see `strategy_from_future`. - let has_future_updates = match temporal_bucketing_strategy { - ArrangementStrategy::TemporalBucketing => false, - ArrangementStrategy::Direct => input_future, - }; + // (This can't currently happen due to `extract_mfp_after` separating out any temporal part.) + let has_future_updates = mfp_after.has_temporal_predicates(); Ok(LoweredExpr { plan: PlanNode::Reduce { input_key, From df91d62d7d48c8d6910e541630f11957ec03305f Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 13:40:36 +0200 Subject: [PATCH 12/13] Simplify ArrangeBy --- src/compute-types/src/plan/lowering.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 67a81b6bce44f..4b31de119e3f6 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -1042,7 +1042,7 @@ This is not expected to cause incorrect results, but could indicate a performanc let LoweredExpr { plan: input, keys: mut input_keys, - has_future_updates: future, + has_future_updates: input_has_future_updates, } = self.lower_mir_expr(input)?; // Fill the `types` in `input_keys` if not already present. let arity = input_mir.arity(); @@ -1057,7 +1057,7 @@ This is not expected to cause incorrect results, but could indicate a performanc LoweredExpr { plan: input, keys: input_keys, - has_future_updates: future, + has_future_updates: input_has_future_updates, } } else { let mut new_keys = new_keys @@ -1086,13 +1086,9 @@ This is not expected to cause incorrect results, but could indicate a performanc // Return the plan and extended keys. let lir_id = self.allocate_lir_id(); - let strategy = strategy_from_future(future); - // Bucketing absorption: see `strategy_from_future`. If we bucket, clear - // the future-updates flag so the immediate parent is lowered as `Direct`. - let has_future_updates = match strategy { - ArrangementStrategy::TemporalBucketing => false, - ArrangementStrategy::Direct => future, - }; + let strategy = strategy_from_future(input_has_future_updates); + assert!(!forms.arranged.is_empty()); // i.e., we do build an arrangement + let has_future_updates = false; LoweredExpr { plan: PlanNode::ArrangeBy { input_key, From 694d8865e7d5aa3f264cf2807373ebc94f4f6a9b Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Sun, 24 May 2026 13:48:52 +0200 Subject: [PATCH 13/13] Simplify the comment on strategy_from_future Since this function is about just applying the in-flight flag, I removed that part from the doc comment that talked about propagating the flag downstream. --- src/compute-types/src/plan/lowering.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 4b31de119e3f6..3802ba851353c 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -32,10 +32,8 @@ use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Pla /// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped /// updates. Future updates are the only case where temporal bucketing pays off. /// -/// Convention: every caller that returns `TemporalBucketing` must also clear -/// `LoweredExpr::has_future_updates` on the resulting `LoweredExpr`, so that a stack of -/// bucketing-eligible operators only buckets at the lowest one. A trailing temporal MFP -/// fused on top naturally re-arms the flag. +/// Any arrangement or consolidation that absorbs data that can have future updates should be +/// guarded by a temporal bucketing operator. fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy { if has_future_updates { ArrangementStrategy::TemporalBucketing