Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
| GenerateExec | Yes | Supports `explode` generator only. |
| GenerateExec | Yes | Supports `explode` and `posexplode` generators (arrays only, `_outer` variants are incompatible). |
| GlobalLimitExec | Yes | |
| HashAggregateExec | Yes | |
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
Expand Down
30 changes: 15 additions & 15 deletions docs/source/user-guide/latest/understanding-comet-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,21 @@ by role. Names match what is shown in the plan output.
These run natively in DataFusion. When several appear consecutively in a plan,
they execute as a single fused native block.

| Node | Spark equivalent |
| ---------------------------- | ---------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` only) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
| Node | Spark equivalent |
| ---------------------------- | ----------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |

### JVM-Side Operators

Expand Down
140 changes: 140 additions & 0 deletions native/core/src/execution/expressions/list_positions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Int32Array, ListArray, RecordBatch};
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion::common::{exec_err, Result as DataFusionResult};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;

/// A `PhysicalExpr` that takes a `List<T>` input and produces a `List<Int32>` where each row's
/// values are `[0, 1, ..., len - 1]`. Offsets and the null bitmap are inherited from the input,
/// so when the resulting list is unnested in parallel with the original list it produces the
/// `pos` column expected by Spark's `posexplode`.
#[derive(Debug, Clone)]
pub struct ListPositionsExpr {
child: Arc<dyn PhysicalExpr>,
field: FieldRef,
}

impl ListPositionsExpr {
pub fn new(child: Arc<dyn PhysicalExpr>) -> Self {
let field = Arc::new(Field::new(
"item",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
));
Self { child, field }
}
}

impl Display for ListPositionsExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "list_positions({})", self.child)
}
}

impl PartialEq for ListPositionsExpr {
fn eq(&self, other: &Self) -> bool {
self.child.eq(&other.child)
}
}

impl Eq for ListPositionsExpr {}

impl Hash for ListPositionsExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.child.hash(state);
}
}

impl PhysicalExpr for ListPositionsExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}

fn data_type(&self, _input_schema: &Schema) -> DataFusionResult<DataType> {
Ok(self.field.data_type().clone())
}

fn nullable(&self, _input_schema: &Schema) -> DataFusionResult<bool> {
Ok(true)
}

fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
let value = self.child.evaluate(batch)?;
let array = value.into_array(batch.num_rows())?;

let list = match array.as_any().downcast_ref::<ListArray>() {
Some(list) => list,
None => {
return exec_err!(
"ListPositionsExpr expected List input, got {}",
array.data_type()
);
}
};

let offsets = list.offsets();
let total_len = *offsets.last().unwrap() as usize;

let mut values: Vec<i32> = Vec::with_capacity(total_len);
for window in offsets.windows(2) {
let start = window[0];
let end = window[1];
for i in 0..(end - start) {
values.push(i);
}
}

let element_field = Arc::new(Field::new("item", DataType::Int32, true));
let result = ListArray::new(
element_field,
offsets.clone(),
Arc::new(Int32Array::from(values)),
list.nulls().cloned(),
);

Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef))
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
if children.len() != 1 {
return exec_err!(
"ListPositionsExpr expects exactly 1 child, got {}",
children.len()
);
}
Ok(Arc::new(ListPositionsExpr::new(Arc::clone(&children[0]))))
}
}
1 change: 1 addition & 0 deletions native/core/src/execution/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
pub mod arithmetic;
pub mod bitwise;
pub mod comparison;
pub mod list_positions;
pub mod logical;
pub mod nullcheck;
pub mod partition;
Expand Down
51 changes: 30 additions & 21 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod operator_registry;
use crate::execution::operators::init_csv_datasource_exec;
use crate::execution::operators::IcebergScanExec;
use crate::execution::{
expressions::list_positions::ListPositionsExpr,
expressions::subquery::Subquery,
operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec, ShuffleScanExec},
planner::expression_registry::ExpressionRegistry,
Expand Down Expand Up @@ -1643,12 +1644,8 @@ impl PhysicalPlanner {
.map(|expr| self.create_expr(expr, child.schema()))
.collect::<Result<Vec<_>, _>>()?;

// For UnnestExec, we need to add a projection to put the columns in the right order:
// 1. First add all projection columns
// 2. Then add the array column to be exploded
// Then UnnestExec will unnest the last column

// Use return_field() to get the proper column names from the expressions
// For posexplode, a parallel List<Int32> positions column is added before the
// array column so UnnestExec can unnest both in parallel.
let child_schema = child.schema();
let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = projections
.iter()
Expand All @@ -1661,34 +1658,43 @@ impl PhysicalPlanner {
})
.collect();

// Add the array column as the last column
let array_field = child_expr
.return_field(&child_schema)
.expect("Failed to get field from array expression");
let array_col_name = array_field.name().to_string();

if explode.position {
let positions_expr: Arc<dyn PhysicalExpr> =
Arc::new(ListPositionsExpr::new(Arc::clone(&child_expr)));
project_exprs.push((positions_expr, "pos".to_string()));
}
project_exprs.push((Arc::clone(&child_expr), array_col_name.clone()));

// Create a projection to arrange columns as needed
let project_exec = Arc::new(ProjectionExec::try_new(
project_exprs,
Arc::clone(&child.native_plan),
)?);

// Get the input schema from the projection
let project_schema = project_exec.schema();

// Build the output schema for UnnestExec
// The output schema replaces the list column with its element type
let mut output_fields: Vec<Field> = Vec::new();

// Add all projection columns (non-array columns)
for i in 0..projections.len() {
output_fields.push(project_schema.field(i).clone());
}

// Add the unnested array element field
let array_input_index = if explode.position {
// pos is non-nullable since outer=true is rejected at planning time.
output_fields.push(Field::new("pos", DataType::Int32, false));
projections.len() + 1
} else {
projections.len()
};

// Extract the element type from the list/array type
let array_field = project_schema.field(projections.len());
let array_field = project_schema.field(array_input_index);
let element_type = match array_field.data_type() {
DataType::List(field) => field.data_type().clone(),
dt => {
Expand All @@ -1699,8 +1705,6 @@ impl PhysicalPlanner {
}
};

// The output column has the same name as the input array column
// but with the element type instead of the list type
output_fields.push(Field::new(
array_field.name(),
element_type,
Expand All @@ -1709,12 +1713,17 @@ impl PhysicalPlanner {

let output_schema = Arc::new(Schema::new(output_fields));

// Use UnnestExec to explode the last column (the array column)
// ListUnnest specifies which column to unnest and the depth (1 for single level)
let list_unnest = ListUnnest {
index_in_input_schema: projections.len(), // Index of the array column to unnest
depth: 1, // Unnest one level (explode single array)
};
let mut list_unnests = Vec::with_capacity(2);
if explode.position {
list_unnests.push(ListUnnest {
index_in_input_schema: projections.len(),
depth: 1,
});
}
list_unnests.push(ListUnnest {
index_in_input_schema: array_input_index,
depth: 1,
});

let unnest_options = UnnestOptions {
preserve_nulls: explode.outer,
Expand All @@ -1723,7 +1732,7 @@ impl PhysicalPlanner {

let unnest_exec = Arc::new(UnnestExec::new(
project_exec,
vec![list_unnest],
list_unnests,
vec![], // No struct columns to unnest
output_schema,
unnest_options,
Expand Down
2 changes: 2 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ message Explode {
bool outer = 2;
// Expressions for other columns to project alongside the exploded values
repeated spark.spark_expression.Expr project_list = 3;
// Whether to emit a position column alongside the exploded values (posexplode)
bool position = 4;
}

message HashJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,8 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
val nodeName = op.generator.nodeName.toLowerCase(Locale.ROOT)
if (nodeName != "explode" && nodeName != "posexplode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
}
if (op.outer) {
Expand Down Expand Up @@ -1262,10 +1263,13 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {
return None
}

val isPosExplode = op.generator.nodeName.toLowerCase(Locale.ROOT) == "posexplode"

val explodeBuilder = OperatorOuterClass.Explode
.newBuilder()
.setChild(childExprProto.get)
.setOuter(op.outer)
.setPosition(isPosExplode)
.addAllProjectList(projectExprs.map(_.get).asJava)

Some(builder.setExplode(explodeBuilder).build())
Expand Down
Loading
Loading