diff --git a/src/compute-types/src/explain/text.rs b/src/compute-types/src/explain/text.rs index 04d099cc702fb..316a315778cae 100644 --- a/src/compute-types/src/explain/text.rs +++ b/src/compute-types/src/explain/text.rs @@ -290,6 +290,7 @@ impl Plan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { ctx.indent.set(); if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() { @@ -299,23 +300,36 @@ impl Plan { ctx.indent += 1; } + let temporally_bucketed = matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ); + use crate::plan::reduce::ReducePlan; match plan { ReducePlan::Distinct => { - writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Distinct GroupAggregate{annotations}")?; } ReducePlan::Accumulable(plan) => { - writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Accumulable GroupAggregate{annotations}")?; ctx.indented(|ctx| plan.fmt_text(f, ctx))?; } ReducePlan::Hierarchical( plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }), ) => { - write!( - f, - "{}→Bucketed Hierarchical GroupAggregate (buckets: ", - ctx.indent - )?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + write!(f, "Bucketed Hierarchical GroupAggregate (buckets:")?; for bucket in buckets { write!(f, " {bucket}")?; } @@ -328,6 +342,9 @@ impl Plan { }), ) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if *must_consolidate { write!(f, "Consolidating ")?; } @@ -349,11 +366,11 @@ impl Plan { ctx.indent += 1; } } - writeln!( - f, - "{}→Non-incremental GroupAggregate{annotations}", - ctx.indent - )?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Non-incremental GroupAggregate{annotations}")?; ctx.indented(|ctx| plan.fmt_text(f, ctx))?; ctx.indent.reset(); } @@ -374,11 +391,22 @@ impl Plan { ctx.indent.reset(); } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { + let temporally_bucketed = matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ); use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if plan.must_consolidate { write!(f, "Consolidating ")?; } @@ -398,6 +426,9 @@ impl Plan { } TopKPlan::MonotonicTopK(plan) => { write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if plan.must_consolidate { write!(f, "Consolidating ")?; } @@ -420,7 +451,11 @@ impl Plan { })?; } TopKPlan::Basic(plan) => { - writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?; + write!(f, "{}→", ctx.indent)?; + if temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } + writeln!(f, "Non-monotonic TopK{annotations}")?; ctx.indented(|ctx| { if plan.group_key.len() > 0 { @@ -469,8 +504,15 @@ impl Plan { Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { + let any_temporally_bucketed = temporal_bucketing_strategies + .iter() + .any(|s| matches!(s, ArrangementStrategy::TemporalBucketing)); write!(f, "{}→", ctx.indent)?; + if any_temporally_bucketed { + write!(f, "Temporally-Bucketed ")?; + } if *consolidate_output { write!(f, "Consolidating ")?; } @@ -739,6 +781,7 @@ impl Plan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { use crate::plan::reduce::ReducePlan; match plan { @@ -764,6 +807,13 @@ impl Plan { let key = CompactScalars(key); writeln!(f, "{}input_key={}", ctx.indent, key)?; } + if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) { + writeln!( + f, + "{}temporal_bucketing_strategy={}", + ctx.indent, temporal_bucketing_strategy + )?; + } if key_val_plan.key_plan.deref().is_identity() { writeln!(f, "{}key_plan=id", ctx.indent)?; } else { @@ -790,7 +840,11 @@ impl Plan { input.fmt_text(f, ctx) })?; } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { use crate::plan::top_k::TopKPlan; match top_k_plan { TopKPlan::MonotonicTop1(plan) => { @@ -851,7 +905,16 @@ impl Plan { } } writeln!(f, "{}", annotations)?; - ctx.indented(|ctx| input.fmt_text(f, ctx))?; + ctx.indented(|ctx| { + if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) { + writeln!( + f, + "{}temporal_bucketing_strategy={}", + ctx.indent, temporal_bucketing_strategy + )?; + } + input.fmt_text(f, ctx) + })?; } Negate { input } => { writeln!(f, "{}Negate{}", ctx.indent, annotations)?; @@ -876,6 +939,7 @@ impl Plan { Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { if *consolidate_output { writeln!( @@ -887,6 +951,21 @@ impl Plan { writeln!(f, "{}Union{}", ctx.indent, annotations)?; } ctx.indented(|ctx| { + if temporal_bucketing_strategies + .iter() + .any(|s| !matches!(s, ArrangementStrategy::Direct)) + { + let strategies = temporal_bucketing_strategies + .iter() + .map(|s| format!("{}", s)) + .collect::>() + .join(", "); + writeln!( + f, + "{}temporal_bucketing_strategies=[{}]", + ctx.indent, strategies + )?; + } for input in inputs.iter() { input.fmt_text(f, ctx)?; } diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index edabc5f1c947b..23a6021996b18 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -130,6 +130,15 @@ pub enum ArrangementStrategy { TemporalBucketing, } +impl std::fmt::Display for ArrangementStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ArrangementStrategy::Direct => write!(f, "Direct"), + ArrangementStrategy::TemporalBucketing => write!(f, "TemporalBucketing"), + } + } +} + /// An identifier for an LIR node. #[derive( Clone, @@ -308,6 +317,20 @@ pub enum PlanNode { /// predicates so that it can be readily evaluated. /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]? mfp_after: MapFilterProject, + /// Strategy for forming the internal input arrangement built by `Reduce` + /// (materialized via `key_val_plan`). + /// + /// Set by the lowering from the input's `has_future_updates` flag. The + /// renderer applies it to the keyed `(key, val)` stream feeding the + /// reduce. See `render_reduce` for the rationale on why this is + /// plumbed through `Reduce` rather than handled at the arrangement site. + /// + /// Note: unrelated to the hash buckets used by hierarchical reductions + /// (e.g. `ReducePlan::Hierarchical`'s `buckets`), which are an internal + /// sharding scheme for `min`/`max`-style aggregations. Here "bucketing" + /// refers exclusively to temporal (time-domain) bucketing of + /// future-stamped updates. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -319,6 +342,14 @@ pub enum PlanNode { /// on the properties of the reduction, and the input itself. Please check /// out the documentation for this type for more detail. top_k_plan: TopKPlan, + /// Strategy for bucketing the input collection ahead of the Top-K operator. + /// + /// Set by the lowering from the input's `has_future_updates` flag. The + /// renderer applies it to the per-row input stream at the top of + /// `render_topk`, covering all three `TopKPlan` arms uniformly. See + /// `PlanNode::Reduce::temporal_bucketing_strategy` for the underlying + /// convention. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -350,6 +381,15 @@ pub enum PlanNode { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input bucketing strategies. Lockstep with `inputs`: index `i` is the + /// strategy applied to `inputs[i]` before concatenation. + /// + /// Set by the lowering from each input's `has_future_updates` flag. Only + /// consolidating Unions (`consolidate_output: true`) carry non-`Direct` + /// entries, because bucketing only pays off ahead of a consolidating + /// downstream operator. See `PlanNode::Reduce::temporal_bucketing_strategy` + /// for the underlying convention. + temporal_bucketing_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -513,9 +553,14 @@ impl Plan { // Subsequently, we perform plan refinements for the dataflow. Self::refine_source_mfps(&mut dataflow); - if features.enable_consolidate_after_union_negate { - Self::refine_union_negate_consolidation(&mut dataflow); - } + // Note: `consolidate_output` for `Union` and per-input + // `temporal_bucketing_strategies` are decided at lowering time (see the + // `Union` arm of `lower_mir_expr_stack_safe`). The pre-existing + // `refine_union_negate_consolidation` pass — which used to flip + // `consolidate_output` to `true` for Unions with a `Negate` child — has + // been folded into the lowering, since lowering is the only point where + // the bucketing decision (which depends on `has_future_updates`) is + // available. if dataflow.is_single_time() { Self::refine_single_time_operator_selection(&mut dataflow); @@ -634,38 +679,6 @@ impl Plan { mz_repr::explain::trace_plan(dataflow); } - /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input. - #[mz_ore::instrument( - target = "optimizer", - level = "debug", - fields(path.segment = "refine_union_negate_consolidation") - )] - fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription) { - for build_desc in dataflow.objects_to_build.iter_mut() { - let mut todo = vec![&mut build_desc.plan]; - while let Some(expression) = todo.pop() { - let node = &mut expression.node; - match node { - PlanNode::Union { - inputs, - consolidate_output, - .. - } => { - if inputs - .iter() - .any(|input| matches!(input.node, PlanNode::Negate { .. })) - { - *consolidate_output = true; - } - } - _ => {} - } - todo.extend(node.children_mut()); - } - } - mz_repr::explain::trace_plan(dataflow); - } - /// Refines the plans of objects to be built as part of `dataflow` to take advantage /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a /// one-shot SELECT query. @@ -775,6 +788,7 @@ impl CollectionPlan for PlanNode { | PlanNode::Union { inputs, consolidate_output: _, + temporal_bucketing_strategies: _, } => { for input in inputs { input.depends_on_into(out); @@ -805,10 +819,12 @@ impl CollectionPlan for PlanNode { key_val_plan: _, plan: _, mfp_after: _, + temporal_bucketing_strategy: _, } | PlanNode::TopK { input, top_k_plan: _, + temporal_bucketing_strategy: _, } | PlanNode::Negate { input } | PlanNode::Threshold { diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index 60b5736f01c35..033ac74b30526 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -393,6 +393,7 @@ where key_val_plan, plan, mfp_after, + temporal_bucketing_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -406,7 +407,11 @@ where mfp_after, )) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -430,6 +435,7 @@ where Union { inputs, consolidate_output, + temporal_bucketing_strategies: _, } => { // Descend recursively into all children. let inputs = inputs @@ -676,6 +682,7 @@ where key_val_plan, plan, mfp_after, + temporal_bucketing_strategy: _, } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; @@ -693,7 +700,11 @@ where // Pass the interpretation result up. Ok(result) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy: _, + } => { // Descend recursively into all children. let input = self.apply_rec(input, rg)?; // Interpret the current node. @@ -731,6 +742,7 @@ where Union { inputs, consolidate_output, + temporal_bucketing_strategies: _, } => { // Descend recursively into all children. let inputs: Vec<_> = inputs diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 0545222982906..357b02aa35d6a 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -31,6 +31,11 @@ use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Pla /// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped /// updates. Future updates are the only case where temporal bucketing pays off. +/// +/// Convention: every caller that returns `TemporalBucketing` must also clear +/// `LoweredExpr::has_future_updates` on the resulting `LoweredExpr`, so that a stack of +/// bucketing-eligible operators only buckets at the lowest one. A trailing temporal MFP +/// fused on top naturally re-arms the flag. fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy { if has_future_updates { ArrangementStrategy::TemporalBucketing @@ -62,6 +67,10 @@ pub(super) struct Context { debug_info: LirDebugInfo, /// Whether to enable fusion of MFPs in reductions. enable_reduce_mfp_fusion: bool, + /// Whether `Union`s with at least one `Negate` input should set + /// `consolidate_output = true`. Folded in from the deleted + /// `refine_union_negate_consolidation` LIR pass. + enable_consolidate_after_union_negate: bool, } impl Context { @@ -75,6 +84,7 @@ impl Context { id: GlobalId::Transient(0), }, enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion, + enable_consolidate_after_union_negate: features.enable_consolidate_after_union_negate, } } @@ -294,6 +304,12 @@ impl Context { // Even with a non-temporal MFP, we must propagate `has_future_updates` // from the underlying binding — applying an MFP doesn't drop future- // timestamped updates that already exist on the input. + // + // TODO(temporal-bucketing): `has_future_updates` is computed per + // dataflow; we don't currently propagate it across `Id::Global` + // boundaries (e.g., from an MV's dataflow to its consumer's), so a + // downstream-only `Get`-then-`ArrangeBy` won't bucket unless the + // consumer has its own local temporal MFP. let has_future_updates = self.has_future_updates.contains(id) || match &plan { GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => { @@ -852,21 +868,34 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - input_future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { input }; // Return the plan, and no arrangements. + // + // `TopK` itself buckets (via `apply_bucketing_strategy` at the + // top of `render_topk`) when the strategy says so, so the + // output flag is cleared whenever bucketing actually fires. + // + // Bucketing absorption: see `strategy_from_future`. + let temporal_bucketing_strategy = strategy_from_future(input_future); + let has_future_updates = match temporal_bucketing_strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => input_future, + }; let lir_id = self.allocate_lir_id(); LoweredExpr { plan: PlanNode::TopK { input: Box::new(input), top_k_plan, + temporal_bucketing_strategy, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), - has_future_updates: input_future, + has_future_updates, } } MirRelationExpr::Negate { input } => { @@ -885,7 +914,8 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - input_future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { input @@ -909,11 +939,17 @@ This is not expected to cause incorrect results, but could indicate a performanc } = self.lower_mir_expr(input)?; let arity = input.arity(); let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity); - let plan = if !keys + // If the conditional `arrange_by` wrap fires, the synthesized + // `ArrangeBy` builds a real arrangement (`new_arranged`) and so + // its `apply_bucketing_strategy` call at render time will + // actually bucket. If the wrap is skipped, the input already + // had a suitable arrangement and an upstream operator was + // responsible for bucketing. + let wrap_fires = !keys .arranged .iter() - .any(|(key, _, _)| key == &required_arrangement.0) - { + .any(|(key, _, _)| key == &required_arrangement.0); + let plan = if wrap_fires { self.arrange_by( plan, AvailableCollections::new_arranged(vec![required_arrangement]), @@ -925,6 +961,18 @@ This is not expected to cause incorrect results, but could indicate a performanc plan }; + // Clear the future flag whenever the wrap actually bucketed + // the future-stamped updates. + let has_future_updates = if wrap_fires + && matches!( + strategy_from_future(input_future), + ArrangementStrategy::TemporalBucketing + ) { + false + } else { + input_future + }; + let output_keys = threshold_plan.keys(); // Return the plan, and any produced keys. let lir_id = self.allocate_lir_id(); @@ -935,7 +983,7 @@ This is not expected to cause incorrect results, but could indicate a performanc } .as_plan(lir_id), keys: output_keys, - has_future_updates: input_future, + has_future_updates, } } MirRelationExpr::Union { base, inputs } => { @@ -945,14 +993,56 @@ This is not expected to cause incorrect results, but could indicate a performanc for input in inputs.iter() { lowered_inputs.push(self.lower_mir_expr(input)?); } - let any_future = lowered_inputs.iter().any(|l| l.has_future_updates); + + // Fold the deleted `refine_union_negate_consolidation` pass in + // here: a Union with any `Negate` input should consolidate its + // output. The lowering is the only place where this decision + // can be coupled with the per-input bucketing strategy. + let has_negate_input = lowered_inputs + .iter() + .any(|l| matches!(l.plan.node, PlanNode::Negate { .. })); + let consolidate_output = + self.enable_consolidate_after_union_negate && has_negate_input; + + // Per-input bucketing strategies: only meaningful when the + // Union consolidates its output, since bucketing only pays off + // ahead of a downstream consolidator. + let temporal_bucketing_strategies: Vec = if consolidate_output + { + lowered_inputs + .iter() + .map(|l| strategy_from_future(l.has_future_updates)) + .collect() + } else { + lowered_inputs + .iter() + .map(|_| ArrangementStrategy::Direct) + .collect() + }; + + // If the Union itself buckets each input (consolidating Union + // with `TemporalBucketing` strategies), the future-stamped + // updates are absorbed here and we should clear the outer + // flag for those legs. Otherwise, propagate `any_future`. + let has_future_updates = if consolidate_output { + lowered_inputs + .iter() + .zip_eq(temporal_bucketing_strategies.iter()) + .any(|(l, s)| { + l.has_future_updates + && !matches!(s, ArrangementStrategy::TemporalBucketing) + }) + } else { + lowered_inputs.iter().any(|l| l.has_future_updates) + }; + let plans = lowered_inputs .into_iter() .map( |LoweredExpr { plan, keys, - has_future_updates: future, + has_future_updates: _, }| { // We don't have an MFP here -- install an operator to permute the // input, if necessary. @@ -962,7 +1052,8 @@ This is not expected to cause incorrect results, but could indicate a performanc AvailableCollections::new_raw(), &keys, arity, - future, + // `new_raw` means no arrangement, so no bucketing is needed + false, ) } else { plan @@ -975,11 +1066,12 @@ This is not expected to cause incorrect results, but could indicate a performanc LoweredExpr { plan: PlanNode::Union { inputs: plans, - consolidate_output: false, + consolidate_output, + temporal_bucketing_strategies, } .as_plan(lir_id), keys: AvailableCollections::new_raw(), - has_future_updates: any_future, + has_future_updates, } } MirRelationExpr::ArrangeBy { input, keys } => { @@ -1031,17 +1123,24 @@ This is not expected to cause incorrect results, but could indicate a performanc // Return the plan and extended keys. let lir_id = self.allocate_lir_id(); + let strategy = strategy_from_future(future); + // Bucketing absorption: see `strategy_from_future`. If we bucket, clear + // the future-updates flag so the immediate parent is lowered as `Direct`. + let has_future_updates = match strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => future, + }; LoweredExpr { plan: PlanNode::ArrangeBy { input_key, input: Box::new(input), input_mfp, forms, - strategy: strategy_from_future(future), + strategy, } .as_plan(lir_id), keys: input_keys, - has_future_updates: future, + has_future_updates, } } } @@ -1203,9 +1302,21 @@ This is not expected to cause incorrect results, but could indicate a performanc ); let output_keys = reduce_plan.keys(group_key.len(), output_arity); let lir_id = self.allocate_lir_id(); + // `Reduce` builds its own input arrangement inside `render_reduce` (via `KeyValPlan`), + // bypassing `ensure_collections`. So we can't piggy-back on an upstream `ArrangeBy`'s + // strategy to request temporal bucketing on a temporal-MFP-fed input: there is no such + // `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and + // `render_reduce` applies bucketing to the keyed `(key, val)` stream itself. + let temporal_bucketing_strategy = strategy_from_future(input_future); // `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual // MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce - // future updates. The output's future flag is just whatever the input had. + // future updates. + // + // Bucketing absorption: see `strategy_from_future`. + let has_future_updates = match temporal_bucketing_strategy { + ArrangementStrategy::TemporalBucketing => false, + ArrangementStrategy::Direct => input_future, + }; Ok(LoweredExpr { plan: PlanNode::Reduce { input_key, @@ -1213,10 +1324,11 @@ This is not expected to cause incorrect results, but could indicate a performanc key_val_plan, plan: reduce_plan, mfp_after, + temporal_bucketing_strategy, } .as_plan(lir_id), keys: output_keys, - has_future_updates: input_future, + has_future_updates, }) } diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index 09cc55da15cfa..00d43389a5f61 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -215,6 +215,9 @@ pub enum Expr { /// the key for the reduction; otherwise, the results become undefined. Additionally, the /// MFP must be free from temporal predicates so that it can be readily evaluated. mfp_after: MapFilterProject, + /// How the renderer should form the internal input arrangement built by `Reduce`. + /// Mirrors [`PlanNode::Reduce::temporal_bucketing_strategy`]. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Key-based "Top K" operator, retaining the first K records in each group. TopK { @@ -226,6 +229,9 @@ pub enum Expr { /// the Top-K, and the input itself. Please check out the documentation for this type for /// more detail. top_k_plan: TopKPlan, + /// How the renderer should bucket the input collection ahead of the Top-K operator. + /// Mirrors [`PlanNode::TopK::temporal_bucketing_strategy`]. + temporal_bucketing_strategy: ArrangementStrategy, }, /// Inverts the sign of each update. Negate { @@ -256,6 +262,9 @@ pub enum Expr { inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, + /// Per-input bucketing strategies, lockstep with `inputs`. Mirrors + /// [`PlanNode::Union::temporal_bucketing_strategies`]. + temporal_bucketing_strategies: Vec, }, /// The `input` plan, but with additional arrangements. /// @@ -438,6 +447,7 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { let expr = Reduce { input_key, @@ -445,15 +455,21 @@ impl TryFrom for LetFreePlan { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, }; insert_node(lir_id, parent, expr, nesting); todo.push((*input, Some(lir_id), nesting.saturating_add(1))); } - PlanNode::TopK { input, top_k_plan } => { + PlanNode::TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { let expr = TopK { input: input.lir_id, top_k_plan, + temporal_bucketing_strategy, }; insert_node(lir_id, parent, expr, nesting); @@ -482,10 +498,12 @@ impl TryFrom for LetFreePlan { PlanNode::Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { let expr = Union { inputs: inputs.iter().map(|i| i.lir_id).collect(), consolidate_output, + temporal_bucketing_strategies, }; insert_node(lir_id, parent, expr, nesting); @@ -911,6 +929,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { key_val_plan: _key_val_plan, plan, mfp_after: _mfp_after, + temporal_bucketing_strategy: _, } => match plan { ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"), ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"), @@ -939,6 +958,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { TopK { input: _, top_k_plan, + temporal_bucketing_strategy: _, } => { match top_k_plan { TopKPlan::MonotonicTop1(..) => write!(f, "Monotonic Top1")?, @@ -965,6 +985,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { Union { inputs: _, consolidate_output, + temporal_bucketing_strategies: _, } => { if *consolidate_output { write!(f, "Consolidating ")?; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..03cd5bb0adda1 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -118,6 +118,7 @@ use differential_dataflow::trace::{BatchReader, TraceReader}; use differential_dataflow::{AsCollection, Data, VecCollection}; use futures::FutureExt; use futures::channel::oneshot; +use itertools::Itertools; use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, @@ -1264,14 +1265,26 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { key_val_plan, plan, mfp_after, + temporal_bucketing_strategy, } => { let input = expect_input(input); let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after); - self.render_reduce(input_key, input, key_val_plan, plan, mfp_option) + self.render_reduce( + input_key, + input, + key_val_plan, + plan, + mfp_option, + temporal_bucketing_strategy, + ) } - TopK { input, top_k_plan } => { + TopK { + input, + top_k_plan, + temporal_bucketing_strategy, + } => { let input = expect_input(input); - self.render_topk(input, top_k_plan) + self.render_topk(input, top_k_plan, temporal_bucketing_strategy) } Negate { input } => { let input = expect_input(input); @@ -1288,12 +1301,22 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { Union { inputs, consolidate_output, + temporal_bucketing_strategies, } => { let mut oks = Vec::new(); let mut errs = Vec::new(); - for input in inputs.into_iter() { + for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) { let (os, es) = expect_input(input).as_specific_collection(None, &self.config_set); + // Apply per-input temporal bucketing. No-op for `Direct`. + // Only consolidating Unions carry non-`Direct` strategies; + // see the `Union` arm of `lower_mir_expr_stack_safe`. + let (os, _bucketed) = context::apply_bucketing_strategy( + os, + strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); oks.push(os); errs.push(es); } @@ -1481,11 +1504,18 @@ pub trait RenderTimestamp: MzTimestamp + Refines { /// Total-ordered timestamps perform real bucketing; partially-ordered timestamps /// (e.g. `Product<…>` in iterative scopes) implement this as a no-op. pub trait MaybeBucketByTime: Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff>; + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable; } impl RenderTimestamp for mz_repr::Timestamp { @@ -1510,11 +1540,19 @@ impl RenderTimestamp for mz_repr::Timestamp { } impl MaybeBucketByTime for mz_repr::Timestamp { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, as_of: Antichain, summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { stream .bucket::>(as_of, summary) .as_collection() @@ -1551,11 +1589,19 @@ impl RenderTimestamp for Product> { } impl MaybeBucketByTime for Product> { - fn maybe_apply_temporal_bucketing<'scope>( - stream: StreamVec<'scope, Self, (Row, Self, Diff)>, + fn maybe_apply_temporal_bucketing<'scope, D>( + stream: StreamVec<'scope, Self, (D, Self, Diff)>, _as_of: Antichain, _summary: mz_repr::Timestamp, - ) -> VecCollection<'scope, Self, Row, Diff> { + ) -> VecCollection<'scope, Self, D, Diff> + where + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, + { // TODO: Implement bucketing on outer timestamp for iterative scopes. stream.as_collection() } diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 5ba42239c74b6..828adf08ff012 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -1004,7 +1004,21 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { ); } - // Track whether we applied temporal bucketing, to avoid double-bucketing. + // Track whether we already applied temporal bucketing in this call, to + // avoid bucketing the same updates twice. + // + // Stacked bucketing across operators is prevented at the LIR level: in + // `strategy_from_future` (see `src/compute-types/src/plan/lowering.rs`), + // a bucketing consumer clears `LoweredExpr::has_future_updates`, so its + // parent is lowered with `ArrangementStrategy::Direct`. + // + // This flag is a belt-and-suspenders check for the case where a single + // `ensure_collections` call would otherwise fire bucketing twice on the + // same collection: once when forming the raw collection (via + // `as_collection_core` below) and again when building an arrangement in + // `collections.arranged`, or across two arrangements in that loop. The + // same `strategy` argument applies to all sites; the flag downgrades + // every application after the first to `Direct`. let mut bucketed = false; // True iff at least one new arrangement will actually be built below. Bucketing only @@ -1023,19 +1037,17 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // Apply temporal bucketing when the lowering selected `TemporalBucketing` and // we will build at least one arrangement. This path fires when the collection // must be formed from scratch (e.g., from an arrangement via as_collection_core). - let oks = if will_create_arrangement - && matches!(strategy, ArrangementStrategy::TemporalBucketing) - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) - { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(config_set) - .try_into() - .expect("must fit"); - bucketed = true; - T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary) - } else { - oks - }; + let (oks, applied) = apply_bucketing_strategy( + oks, + if will_create_arrangement { + strategy + } else { + ArrangementStrategy::Direct + }, + as_of.clone(), + config_set, + ); + bucketed |= applied; self.collection = Some((oks, errs)); } for (key, _, thinning) in collections.arranged { @@ -1051,19 +1063,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // the bundle (e.g., from an upstream temporal Mfp or Get) and we // haven't bucketed yet. This is the common path for temporal-MFP // → ArrangeBy flows. - let oks = if !bucketed - && matches!(strategy, ArrangementStrategy::TemporalBucketing) - && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) - { - let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY - .get(config_set) - .try_into() - .expect("must fit"); - bucketed = true; - T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary) + let effective_strategy = if bucketed { + ArrangementStrategy::Direct } else { - oks + strategy }; + let (oks, applied) = + apply_bucketing_strategy(oks, effective_strategy, as_of.clone(), config_set); + bucketed |= applied; let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); @@ -1353,3 +1360,44 @@ fn walk_cursor( } *fuel -= work; } + +/// Apply temporal bucketing to a per-row stream when the requested `strategy` selects it and the +/// `enable_compute_temporal_bucketing` dyncfg is on; otherwise return `oks` unchanged. +/// +/// Returns `(stream, applied)` where `applied` is true iff bucketing actually fired. Callers +/// use this flag to avoid double-bucketing the same stream within a single `ensure_collections` +/// invocation. +/// +/// Generic over the row data type `D` so the helper can serve both `Row` streams (the +/// `ArrangeBy` rendering in `ensure_collections`) and `(Row, Row)` streams (the internal +/// keyed input arrangement built by `render_reduce`). +pub(crate) fn apply_bucketing_strategy<'scope, T, D>( + oks: VecCollection<'scope, T, D, Diff>, + strategy: ArrangementStrategy, + as_of: Antichain, + config_set: &ConfigSet, +) -> (VecCollection<'scope, T, D, Diff>, bool) +where + T: RenderTimestamp + MaybeBucketByTime, + D: timely::ExchangeData + + crate::typedefs::MzData + + Ord + + Clone + + std::fmt::Debug + + differential_dataflow::Hashable, +{ + if matches!(strategy, ArrangementStrategy::TemporalBucketing) + && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set) + { + let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY + .get(config_set) + .try_into() + .expect("must fit"); + ( + T::maybe_apply_temporal_bucketing(oks.inner, as_of, summary), + true, + ) + } else { + (oks, false) + } +} diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index e165bd567c5e2..013ef6594b2ea 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -27,6 +27,7 @@ use differential_dataflow::trace::implementations::merge_batcher::container::Int use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::reduce::{ AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan, ReducePlan, ReductionType, SingleBasicPlan, reduction_type, @@ -68,7 +69,11 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { key_val_plan: KeyValPlan, reduce_plan: ReducePlan, mfp_after: Option, - ) -> CollectionBundle<'scope, T> { + temporal_bucketing_strategy: ArrangementStrategy, + ) -> CollectionBundle<'scope, T> + where + T: crate::render::MaybeBucketByTime, + { // Convert `mfp_after` to an actionable plan. let mfp_after = mfp_after.map(|m| { m.into_plan() @@ -156,15 +161,25 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { }, ); - // Render the reduce plan - self.render_reduce_plan( - reduce_plan, + // Bucket the keyed `(key, val)` stream when lowering chose `TemporalBucketing`. + // `Reduce` builds its own arrangement via `KeyValPlan`, bypassing + // `ensure_collections`, so the strategy is plumbed through `PlanNode::Reduce` + // rather than inferred at the arrangement site. `apply_bucketing_strategy` is a + // no-op for `Direct`. + // + // Unlike `ensure_collections`, there's only one bucketing call site here, so we + // don't need to track an `already_bucketed` flag. If a second site is ever added + // in this function, it must consult `_bucketed`. + let (key_val_collection, _bucketed) = crate::render::context::apply_bucketing_strategy( key_val_input.as_collection(), - err, - key_arity, - mfp_after, - ) - .leave_region(self.scope) + temporal_bucketing_strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); + + // Render the reduce plan + self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after) + .leave_region(self.scope) }) } diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..ffafacb5f1613 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -24,6 +24,7 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; +use mz_compute_types::plan::ArrangementStrategy; use mz_compute_types::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, }; @@ -50,14 +51,57 @@ use crate::row_spine::{ use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine}; // The implementation requires integer timestamps to be able to delay feedback for monotonic inputs. -impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> { +impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime> + Context<'scope, T> +{ pub(crate) fn render_topk( &self, input: CollectionBundle<'scope, T>, top_k_plan: TopKPlan, + temporal_bucketing_strategy: ArrangementStrategy, ) -> CollectionBundle<'scope, T> { let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set); + // Bucket the per-row input stream when lowering chose `TemporalBucketing`. + // `TopK` builds its own arrangement(s) inside the variants below, bypassing + // `ensure_collections`, so the strategy is plumbed through `PlanNode::TopK` + // rather than inferred at the arrangement site. `apply_bucketing_strategy` + // is a no-op for `Direct`. + // + // Note: a `MonotonicTop1Plan`/`MonotonicTopKPlan` with `must_consolidate = + // false` together with `TemporalBucketing` here would mean we install a + // bucket operator with no downstream consolidator -- pure overhead. That + // combination cannot actually occur: `RelaxMustConsolidate` (which is the + // only writer of `must_consolidate = false`) runs only on single-time + // dataflows (one-shot peeks / `COPY TO`), and in single-time dataflows + // `ExprPrepOneShot` constant-folds `mz_now()` to the dataflow `as_of` + // before lowering, so no temporal predicates survive into LIR and + // `has_future_updates` is `false` everywhere -- meaning no operator (TopK + // included) is ever lowered with `TemporalBucketing`. The assertion below + // pins down this invariant. + if matches!( + temporal_bucketing_strategy, + ArrangementStrategy::TemporalBucketing + ) { + let must_consolidate = match &top_k_plan { + TopKPlan::MonotonicTop1(p) => p.must_consolidate, + TopKPlan::MonotonicTopK(p) => p.must_consolidate, + TopKPlan::Basic(_) => true, + }; + soft_assert_or_log!( + must_consolidate, + "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \ + `RelaxMustConsolidate` only runs on single-time dataflows where \ + `mz_now()` has been const-folded and no temporal bucketing is set", + ); + } + let (ok_input, _bucketed) = crate::render::context::apply_bucketing_strategy( + ok_input, + temporal_bucketing_strategy, + self.as_of_frontier.clone(), + &self.config_set, + ); + // We create a new region to compartmentalize the topk logic. let outer_scope = ok_input.scope(); let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| { diff --git a/test/sqllogictest/explain/default.slt b/test/sqllogictest/explain/default.slt index fca67c0c4a846..2e66bf1bcdb25 100644 --- a/test/sqllogictest/explain/default.slt +++ b/test/sqllogictest/explain/default.slt @@ -2201,3 +2201,78 @@ Used Indexes: Target cluster: no_replicas EOF + +# Default-format tests for the `Temporally-Bucketed` prefix on Reduce, TopK, +# and Union. The default format only emits the prefix when the operator's +# `temporal_bucketing_strategy` (or any per-input strategy for Union) is +# `TemporalBucketing`; otherwise the operator is printed unchanged. + +statement ok +CREATE TABLE events (k INT NOT NULL, event_time TIMESTAMP NOT NULL); + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_reduce AS +SELECT k, count(*) +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +GROUP BY k; +---- +materialize.public.mv_default_reduce: + →Temporally-Bucketed Accumulable GroupAggregate + Simple aggregates: count(*) + →Read materialize.public.events + +Source materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: no_replicas + +EOF + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_topk AS +SELECT k, event_time +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +ORDER BY event_time LIMIT 5; +---- +materialize.public.mv_default_topk: + →Temporally-Bucketed Non-monotonic TopK + Order By #1 asc nulls_last + Limit 5 + →Read materialize.public.events + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: no_replicas + +EOF + +query T multiline +EXPLAIN PHYSICAL PLAN AS TEXT FOR +CREATE MATERIALIZED VIEW mv_default_union AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT ALL +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_default_union: + →Threshold Diffs #0 + →Arrange (#0) + →Temporally-Bucketed Consolidating Union + →Fused with Child Map/Filter/Project + Project: #0 + Filter: (mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days))) + →Read materialize.public.events + →Negate Diffs + →Fused with Child Map/Filter/Project + Project: #0 + Filter: (mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days))) + →Read materialize.public.events + +Target cluster: no_replicas + +EOF diff --git a/test/sqllogictest/explain/physical_plan_as_json.slt b/test/sqllogictest/explain/physical_plan_as_json.slt index c068f82066a64..d5924e6999ada 100644 --- a/test/sqllogictest/explain/physical_plan_as_json.slt +++ b/test/sqllogictest/explain/physical_plan_as_json.slt @@ -577,7 +577,8 @@ SELECT * FROM ov "arity": 2, "must_consolidate": true } - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -699,7 +700,11 @@ SELECT a FROM t EXCEPT ALL SELECT b FROM mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -885,7 +890,8 @@ SELECT * FROM ov "arity": 2, "must_consolidate": true } - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -1013,7 +1019,11 @@ WITH cte(x) as (SELECT a FROM t EXCEPT ALL SELECT b FROM mv) } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -1235,7 +1245,11 @@ WITH cte(x) as (SELECT a FROM t EXCEPT ALL SELECT b FROM mv) } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -1475,7 +1489,11 @@ SELECT x * 5 FROM cte WHERE x = 5 } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -1813,7 +1831,8 @@ SELECT DISTINCT a, b FROM t 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -1972,7 +1991,8 @@ GROUP BY a 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -2144,7 +2164,8 @@ FROM t 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -2275,7 +2296,11 @@ FROM t } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -2324,7 +2349,11 @@ FROM t } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -2436,7 +2465,8 @@ SELECT * FROM hierarchical_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -2568,7 +2598,8 @@ MATERIALIZED VIEW hierarchical_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -2699,7 +2730,11 @@ MATERIALIZED VIEW hierarchical_global_mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -2748,7 +2783,11 @@ MATERIALIZED VIEW hierarchical_global_mv } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -2875,7 +2914,8 @@ SELECT * FROM hierarchical_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -3006,7 +3046,11 @@ SELECT * FROM hierarchical_global } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -3055,7 +3099,11 @@ SELECT * FROM hierarchical_global } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -3342,7 +3390,8 @@ GROUP BY a 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -3601,7 +3650,8 @@ GROUP BY a 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -3976,7 +4026,8 @@ FROM t 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4205,7 +4256,8 @@ FROM t 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -4322,7 +4374,11 @@ FROM t } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -4371,7 +4427,11 @@ FROM t } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -4499,7 +4559,8 @@ MATERIALIZED VIEW collated_group_by_mv 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4638,7 +4699,8 @@ MATERIALIZED VIEW collated_group_by_mv 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -4897,7 +4959,8 @@ MATERIALIZED VIEW collated_group_by_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -5156,7 +5219,8 @@ MATERIALIZED VIEW collated_group_by_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -5912,7 +5976,8 @@ SELECT * FROM collated_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6051,7 +6116,8 @@ SELECT * FROM collated_group_by 2 ], "input_arity": 3 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6310,7 +6376,8 @@ SELECT * FROM collated_group_by 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -6569,7 +6636,8 @@ SELECT * FROM collated_group_by 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -7368,7 +7436,8 @@ MATERIALIZED VIEW collated_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7477,7 +7546,8 @@ MATERIALIZED VIEW collated_global_mv 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7706,7 +7776,8 @@ MATERIALIZED VIEW collated_global_mv 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -7935,7 +8006,8 @@ MATERIALIZED VIEW collated_global_mv 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -8448,7 +8520,11 @@ MATERIALIZED VIEW collated_global_mv } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -8561,7 +8637,11 @@ MATERIALIZED VIEW collated_global_mv } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } @@ -8716,7 +8796,8 @@ SELECT * FROM collated_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -8825,7 +8906,8 @@ SELECT * FROM collated_global 1 ], "input_arity": 2 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -9054,7 +9136,8 @@ SELECT * FROM collated_global 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } }, @@ -9283,7 +9366,8 @@ SELECT * FROM collated_global 0 ], "input_arity": 1 - } + }, + "temporal_bucketing_strategy": "Direct" } } } @@ -9796,7 +9880,11 @@ SELECT * FROM collated_global } } ], - "consolidate_output": true + "consolidate_output": true, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } }, @@ -9909,7 +9997,11 @@ SELECT * FROM collated_global } } ], - "consolidate_output": false + "consolidate_output": false, + "temporal_bucketing_strategies": [ + "Direct", + "Direct" + ] } } } diff --git a/test/sqllogictest/temporal_bucketing.slt b/test/sqllogictest/temporal_bucketing.slt new file mode 100644 index 0000000000000..ed9bf2ec7220f --- /dev/null +++ b/test/sqllogictest/temporal_bucketing.slt @@ -0,0 +1,278 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +# ----------------------------------------------------------------------------- +# Tests for LIR-level temporal bucketing applied to the *input arrangement* +# of `Reduce`. See `agent-notes/temporal-bucketing-reduce/plan.md`. +# +# Without the `input_bucketing_strategy` field, a `Temporal Filter -> +# GroupAggregate` pattern would have no LIR node to attach bucketing to, +# because `Reduce` builds its internal arrangement inside `render_reduce` +# via `KeyValPlan`, bypassing `ensure_collections`. +# ----------------------------------------------------------------------------- + +statement ok +CREATE TABLE events (k INT NOT NULL, event_time TIMESTAMP NOT NULL); + +# Firing: temporal filter directly above a GROUP BY in a materialized view. +# Lowering should set `input_bucketing_strategy=TemporalBucketing` on the Reduce. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_firing AS +SELECT k, count(*) +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +GROUP BY k; +---- +materialize.public.mv_firing: + Reduce::Accumulable + simple_aggrs[0]=(0, count(*)) + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=(#1) + map=(true) + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF + +# Idempotency: a temporal-filter view materialized as an index already absorbs +# the future-update flag. A downstream Reduce reading from the indexed `Get` +# must NOT re-bucket; the pretty-printer omits `input_bucketing_strategy` when +# it is the default `Direct`. +statement ok +CREATE VIEW v_recent AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now(); + +statement ok +CREATE DEFAULT INDEX ON v_recent; + +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_idempotent AS +SELECT k, count(*) FROM v_recent GROUP BY k; +---- +materialize.public.mv_idempotent: + Reduce::Accumulable + simple_aggrs[0]=(0, count(*)) + input_key=#0{k} + key_plan=id + val_plan + project=(#1) + map=(true) + Get::PassArrangements materialize.public.v_recent + raw=false + arrangements[0]={ key=[#0{k}], permutation=id, thinning=() } + +Used Indexes: + - materialize.public.v_recent_primary_idx (*** full scan ***) + +Target cluster: quickstart + +EOF + +# Nested aggregation: an inner Reduce::Hierarchical sits directly above a +# temporal-filter MFP and gets bucketed. The outer `count(*)` Reduce is +# optimized away here, but the test still pins down that the inner Reduce +# gets `input_bucketing_strategy=TemporalBucketing`. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_rearm AS +SELECT bucket, count(*) FROM ( + SELECT k AS bucket, count(*) AS n, max(event_time) AS m + FROM events + WHERE event_time + INTERVAL '45 day' > mz_now() + GROUP BY k +) sub +WHERE m + INTERVAL '1 day' > mz_now() +GROUP BY bucket; +---- +materialize.public.mv_rearm: + Mfp + project=(#0, #2) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{m} + 1 day)))) + map=(1) + input_key=#0 + Reduce::Hierarchical + aggr_funcs=[max] + buckets=[268435456, 16777216, 1048576, 65536, 4096, 256, 16] + temporal_bucketing_strategy=TemporalBucketing + key_plan + project=(#0) + val_plan + project=(#1) + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF + +# ----------------------------------------------------------------------------- +# Tests for extended LIR-level temporal bucketing on TopK, Union, and +# Threshold (follow-up to the Reduce-only commit). See +# `agent-notes/temporal-bucketing-reduce/increase-coverage.md`. +# ----------------------------------------------------------------------------- + +# TopK firing: temporal filter directly under a `SELECT ... ORDER BY ... LIMIT`. +# Lowering should set `temporal_bucketing_strategy=TemporalBucketing` on the +# TopK and clear the future-updates flag at that site so the parent does not +# also bucket. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_topk_firing AS +SELECT k, event_time +FROM events +WHERE event_time + INTERVAL '45 day' > mz_now() +ORDER BY event_time +LIMIT 5; +---- +materialize.public.mv_topk_firing: + TopK::Basic order_by=[#1 asc nulls_last] limit=5 + temporal_bucketing_strategy=TemporalBucketing + Get::Collection materialize.public.events + raw=true + +Source materialize.public.events + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + +Target cluster: quickstart + +EOF + +# TopK idempotency: when the input is an already-bucketed indexed view, the +# TopK should NOT re-bucket; the pretty-printer omits the field when it is +# `Direct`. +statement ok +CREATE VIEW v_topk_input AS +SELECT k, event_time FROM events WHERE event_time + INTERVAL '45 day' > mz_now(); + +statement ok +CREATE DEFAULT INDEX ON v_topk_input; + +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_topk_idempotent AS +SELECT k, event_time FROM v_topk_input ORDER BY event_time LIMIT 5; +---- +materialize.public.mv_topk_idempotent: + TopK::Basic order_by=[#1 asc nulls_last] limit=5 + ArrangeBy + input_key=[#0{k}, #1{event_time}] + raw=true + Get::PassArrangements materialize.public.v_topk_input + raw=false + arrangements[0]={ key=[#0{k}, #1{event_time}], permutation=id, thinning=() } + +Used Indexes: + - materialize.public.v_topk_input_primary_idx (*** full scan ***) + +Target cluster: quickstart + +EOF + +# Union firing: `EXCEPT ALL` lowers to a `Union` over a `Negate` input, which +# triggers consolidation. With at least one input carrying future updates, +# the per-input `temporal_bucketing_strategies` should include +# `TemporalBucketing`. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_union_firing AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT ALL +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_union_firing: + Threshold::Basic ensure_arrangement={ key=[#0], permutation=id, thinning=() } + ArrangeBy + raw=false + arrangements[0]={ key=[#0], permutation=id, thinning=() } + Union consolidate_output=true + temporal_bucketing_strategies=[TemporalBucketing, TemporalBucketing] + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + raw=true + Negate + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days)))) + raw=true + +Target cluster: quickstart + +EOF + +# `EXCEPT` (distinct) lowers to `Distinct -> Union -> Threshold`, so each leg +# already terminates in a `Reduce::Distinct` that consumes the future-updates +# flag itself (via the precursor `Reduce`-input bucketing). By the time data +# reaches the `Union` and `Threshold`, `has_future_updates` is already cleared, +# so neither the `Union` per-input strategies nor the `Threshold` synthetic +# `ArrangeBy` wrap fire as bucketing sites here -- bucketing lands on the +# inner `Reduce::Distinct` operators instead. +# +# This means the `Threshold` flag-clear code in `lowering.rs` is currently +# unreachable from any SQL surface (`EXCEPT` strips the flag via `Distinct`; +# `EXCEPT ALL` consumes it at the consolidating `Union` per-input bucketing). +# We keep the `Threshold` flag-clear logic as defense-in-depth for future MIR +# producers, but no fixture exercises it. +query T multiline +EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR +CREATE MATERIALIZED VIEW mv_except_distinct_buckets_at_reduce AS +SELECT k FROM events WHERE event_time + INTERVAL '45 day' > mz_now() +EXCEPT +SELECT k FROM events WHERE event_time + INTERVAL '90 day' > mz_now(); +---- +materialize.public.mv_except_distinct_buckets_at_reduce: + Threshold::Basic ensure_arrangement={ key=[#0], permutation=id, thinning=() } + ArrangeBy + raw=false + arrangements[0]={ key=[#0], permutation=id, thinning=() } + Union consolidate_output=true + ArrangeBy + input_key=[#0] + raw=true + Reduce::Distinct + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=() + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 45 days)))) + raw=true + Negate + ArrangeBy + input_key=[#0] + raw=true + Reduce::Distinct + temporal_bucketing_strategy=TemporalBucketing + key_plan=id + val_plan + project=() + Get::Collection materialize.public.events + project=(#0) + filter=((mz_now() < timestamp_to_mz_timestamp((#1{event_time} + 90 days)))) + raw=true + +Target cluster: quickstart + +EOF