Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,7 @@ mod tests {
use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::MirScalarExpr;
use mz_expr::{Eval, MirScalarExpr};
use mz_ore::now::to_datetime;
use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
use mz_persist_client::PersistClient;
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use maplit::btreemap;
use mz_catalog::memory::objects::Cluster;
use mz_controller_types::ReplicaId;
use mz_expr::row::RowCollection;
use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
use mz_expr::{Eval, MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
use mz_ore::cast::CastFrom;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::stats::SnapshotPartStats;
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use mz_catalog::memory::objects::{
CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
};
use mz_expr::{
CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
CollectionPlan, Eval, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
};
use mz_ore::cast::CastFrom;
use mz_ore::collections::{CollectionExt, HashSet};
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::str::FromStr;
use std::sync::Arc;

use mz_adapter_types::connection::ConnectionId;
use mz_expr::Eval;
use mz_ore::cast::CastInto;
use mz_persist_client::Diagnostics;
use mz_persist_client::batch::ProtoBatch;
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer/inner/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use mz_catalog::memory::error::ErrorKind;
use mz_catalog::memory::objects::{CatalogItem, Secret};
use mz_expr::MirScalarExpr;
use mz_expr::{Eval, MirScalarExpr};
use mz_ore::collections::CollectionExt;
use mz_ore::instrument;
use mz_repr::{CatalogItemId, Datum, RowArena};
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::Context;
use chrono::{DateTime, Utc};
use derivative::Derivative;
use mz_expr::Eval;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Row, RowArena};
use mz_secrets::SecretsReader;
Expand Down
2 changes: 1 addition & 1 deletion src/compute-types/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

use std::collections::BTreeMap;

use mz_expr::{MapFilterProject, MirScalarExpr};
use mz_expr::{Columns, Eval, MapFilterProject, MirScalarExpr};
use mz_repr::{Datum, Row, RowArena};
use serde::{Deserialize, Serialize};

Expand Down
4 changes: 2 additions & 2 deletions src/compute-types/src/plan/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! not create any new stateful operators.

use mz_expr::{
JoinInputCharacteristics, JoinInputMapper, MapFilterProject, MirScalarExpr, join_permutations,
permutation_for_arrangement,
Columns, JoinInputCharacteristics, JoinInputMapper, MapFilterProject, MirScalarExpr,
join_permutations, permutation_for_arrangement,
};
use serde::{Deserialize, Serialize};

Expand Down
2 changes: 1 addition & 1 deletion src/compute-types/src/plan/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! Planning of linear joins.

use mz_expr::{
JoinInputCharacteristics, MapFilterProject, MirScalarExpr, join_permutations,
Columns, JoinInputCharacteristics, MapFilterProject, MirScalarExpr, join_permutations,
permutation_for_arrangement,
};
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use columnar::Len;
use itertools::Itertools;
use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
use mz_expr::{
AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
AggregateExpr, Columns, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
};
use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mz_compute_types::dyncfgs::{
};
use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
use mz_dyncfg::ConfigSet;
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
use mz_expr::{Eval, Id, MapFilterProject, MirScalarExpr};
use mz_ore::soft_assert_or_log;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::VecDeque;

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
use mz_expr::MfpPlan;
use mz_expr::{Eval, MfpPlan};
use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
use mz_repr::{DatumVec, RowArena, SharedRow};
use mz_repr::{Diff, Row, Timestamp};
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
use mz_dyncfg::ConfigSet;
use mz_expr::MirScalarExpr;
use mz_expr::{Eval, MirScalarExpr};
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_timely_util::operator::{CollectionExt, StreamExt};
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
use mz_dyncfg::ConfigSet;
use mz_expr::Eval;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_timely_util::columnar::builder::ColumnBuilder;
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use mz_compute_types::plan::top_k::{
BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
};
use mz_expr::func::CastUint64ToInt64;
use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
use mz_expr::{BinaryFunc, Columns, Eval, EvalError, MirScalarExpr, UnaryFunc, func};
use mz_ore::cast::CastFrom;
use mz_ore::soft_assert_or_log;
use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
Expand Down
2 changes: 1 addition & 1 deletion src/expr/benches/case_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
use std::hint::black_box;

use criterion::{Criterion, criterion_group, criterion_main};
use mz_expr::MirScalarExpr;
use mz_expr::func::{BinaryFunc, Eq};
use mz_expr::{Eval, MirScalarExpr};
use mz_repr::{Datum, ReprColumnType, ReprRelationType, ReprScalarType, RowArena};
use mz_transform::Transform;
use mz_transform::case_literal::CaseLiteralTransform;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/interpret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use std::fmt::Debug;

use mz_repr::{Datum, ReprColumnType, ReprRelationType, ReprScalarType, Row, RowArena};

use crate::func::Eval;
use crate::scalar::func::variadic::And;
use crate::{
BinaryFunc, EvalError, MapFilterProject, MfpPlan, MirScalarExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc, func,
};

/// An inclusive range of non-null datum values.
#[derive(Clone, Eq, PartialEq, Debug)]
enum Values<'a> {
Expand Down
3 changes: 2 additions & 1 deletion src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub use relation::{
};
pub use scalar::func::{self, BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub use scalar::{
EvalError, FilterCharacteristics, MirScalarExpr, ProtoDomainLimit, ProtoEvalError, like_pattern,
Columns, Eval, EvalError, FilterCharacteristics, MirScalarExpr, ProtoDomainLimit,
ProtoEvalError, like_pattern,
};

/// A [`MirRelationExpr`] that claims to have been optimized, e.g., by an
Expand Down
16 changes: 11 additions & 5 deletions src/expr/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::fmt::Display;
use mz_repr::{Datum, Row};
use serde::{Deserialize, Serialize};

use crate::scalar::columns::Columns;
use crate::visit::Visit;
use crate::{MirRelationExpr, MirScalarExpr};

Expand Down Expand Up @@ -1509,6 +1510,7 @@ pub mod util {
use std::collections::BTreeMap;

use crate::MirScalarExpr;
use crate::scalar::columns::Columns;

#[allow(dead_code)]
/// A triple of actions that map from rows to (key, val) pairs and back again.
Expand All @@ -1524,8 +1526,9 @@ pub mod util {
/// Derive supporting logic to support transforming rows to (key, val) pairs,
/// and back again.
///
/// We are given as input a list of key expressions and an input arity, and the
/// requirement the produced key should be the application of the key expressions.
/// We are given as input a list of mappings from columns to key indices, a key length,
/// and an input arity. (The produced key should be the application of the key expressions.)
///
/// To produce the `val` output, we will identify those input columns not found in
/// the key expressions, and name all other columns.
/// To reconstitute the original row, we identify the sequence of columns from the
Expand All @@ -1536,18 +1539,20 @@ pub mod util {
/// of a row that should become the value associated with its key.
///
/// The permutations and thinning expressions generated here will be tracked in
/// `dataflow::plan::AvailableCollections`; see the
/// `compute_types::plan::AvailableCollections`; see the
/// documentation there for more details.
pub fn permutation_for_arrangement(
key: &[MirScalarExpr],
key: &[impl Columns],
unthinned_arity: usize,
) -> (Vec<usize>, Vec<usize>) {
let columns_in_key: BTreeMap<_, _> = key
.iter()
.enumerate()
.filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
.collect();
let mut input_cursor = key.len();
let key_len = key.len();

let mut input_cursor = key_len;
let permutation = (0..unthinned_arity)
.map(|c| {
if let Some(c) = columns_in_key.get(&c) {
Expand Down Expand Up @@ -1604,6 +1609,7 @@ pub mod plan {
use mz_repr::{Datum, Diff, Row, RowArena};
use serde::{Deserialize, Serialize};

use crate::func::Eval;
use crate::{BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, UnaryFunc, func};

/// A wrapper type which indicates it is safe to simply evaluate all expressions.
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::Id::Local;
use crate::explain::{HumanizedExpr, HumanizerMode};
use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc};
use crate::row::{RowCollection, RowCollectionIter};
use crate::scalar::columns::Columns;
use crate::scalar::func::variadic::{
JsonbBuildArray, JsonbBuildObject, ListCreate, ListIndex, MapBuild, RecordCreate,
};
Expand Down
33 changes: 15 additions & 18 deletions src/expr/src/relation/join_input_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::ops::Range;
use itertools::Itertools;
use mz_repr::ReprRelationType;

use crate::scalar::columns::Columns;
use crate::scalar::func::variadic::{And, Or};
use crate::visit::Visit;
use crate::{MirRelationExpr, MirScalarExpr, VariadicFunc};
Expand Down Expand Up @@ -180,23 +181,19 @@ impl JoinInputMapper {
/// Takes an expression from the global context and creates a new version
/// where column references have been remapped to the local context.
/// Assumes that all columns in `expr` are from the same input.
pub fn map_expr_to_local(&self, mut expr: MirScalarExpr) -> MirScalarExpr {
expr.visit_pre_mut(|e| {
if let MirScalarExpr::Column(c, _name) = e {
*c -= self.prior_arities[self.input_relation[*c]];
}
pub fn map_expr_to_local<C: Columns + Sized>(&self, mut expr: C) -> C {
expr.visit_columns(|c| {
*c -= self.prior_arities[self.input_relation[*c]];
});
expr
}

/// Takes an expression from the local context of the `index`th input and
/// creates a new version where column references have been remapped to the
/// global context.
pub fn map_expr_to_global(&self, mut expr: MirScalarExpr, index: usize) -> MirScalarExpr {
expr.visit_pre_mut(|e| {
if let MirScalarExpr::Column(c, _name) = e {
*c += self.prior_arities[index];
}
pub fn map_expr_to_global<C: Columns + Sized>(&self, mut expr: C, index: usize) -> C {
expr.visit_columns(|c| {
*c += self.prior_arities[index];
});
expr
}
Expand Down Expand Up @@ -229,7 +226,7 @@ impl JoinInputMapper {
}

/// Find the sorted, dedupped set of inputs an expression references
pub fn lookup_inputs(&self, expr: &MirScalarExpr) -> impl Iterator<Item = usize> + use<> {
pub fn lookup_inputs<C: Columns>(&self, expr: &C) -> impl Iterator<Item = usize> + use<C> {
expr.support()
.iter()
.map(|c| self.input_relation[*c])
Expand All @@ -238,7 +235,7 @@ impl JoinInputMapper {
}

/// Returns the index of the only input referenced in the given expression.
pub fn single_input(&self, expr: &MirScalarExpr) -> Option<usize> {
pub fn single_input(&self, expr: &impl Columns) -> Option<usize> {
let mut inputs = self.lookup_inputs(expr);
if let Some(first_input) = inputs.next() {
if inputs.next().is_none() {
Expand All @@ -249,7 +246,7 @@ impl JoinInputMapper {
}

/// Returns whether the given expr refers to columns of only the `index`th input.
pub fn is_localized(&self, expr: &MirScalarExpr, index: usize) -> bool {
pub fn is_localized(&self, expr: &impl Columns, index: usize) -> bool {
if let Some(single_input) = self.single_input(expr) {
if single_input == index {
return true;
Expand Down Expand Up @@ -296,16 +293,16 @@ impl JoinInputMapper {
/// input_mapper.find_bound_expr(&MirScalarExpr::column(0), &[1], &equivalences)
/// );
/// ```
pub fn find_bound_expr(
pub fn find_bound_expr<C: Columns + Clone + Eq>(
&self,
expr: &MirScalarExpr,
expr: &C,
bound_inputs: &[usize],
equivalences: &[Vec<MirScalarExpr>],
) -> Option<MirScalarExpr> {
equivalences: &[Vec<C>],
) -> Option<C> {
if let Some(equivalence) = equivalences.iter().find(|equivs| equivs.contains(expr)) {
if let Some(bound_expr) = equivalence
.iter()
.find(|expr| self.lookup_inputs(expr).all(|i| bound_inputs.contains(&i)))
.find(|expr| self.lookup_inputs(*expr).all(|i| bound_inputs.contains(&i)))
{
return Some(bound_expr.clone());
}
Expand Down
Loading
Loading