Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions python/sedonadb/python/sedonadb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,93 @@ def distinct_on(self, *cols: Union[str, Expr]) -> "DataFrame":

return DataFrame(self._ctx, self._impl.distinct_on(coerced))

def _check_union_compatible(self, other: "DataFrame", method: str) -> None:
# Both inputs must line up by column name (in order) so a positional
# union can't silently misalign differently-named columns — the
# classic footgun. Callers that want a positional union of
# differently-named columns opt in by aliasing/selecting to align
# the names first.
if not isinstance(other, DataFrame):
raise TypeError(
f"{method}() expects a DataFrame as the first argument, "
f"got {type(other).__name__}"
)
if self.columns != other.columns:
raise ValueError(
f"{method}() requires both DataFrames to have the same column "
f"names in the same order; got {self.columns} and "
f"{other.columns}. Align them first (e.g. with select) if you "
f"intend a positional union."
)

def union(self, other: "DataFrame") -> "DataFrame":
"""Concatenate two DataFrames vertically, keeping duplicate rows.

This is SQL `UNION ALL` (and matches PySpark's `union`): duplicate
rows are preserved. Use `union_distinct` for SQL `UNION` semantics.
Both DataFrames must have the same column names in the same order;
otherwise an error is raised. (To union differently-named columns
positionally, align the names first, e.g. with `select`.)

Args:
other: The DataFrame to append. Must have the same column names,
in the same order, as this DataFrame.

Examples:

>>> sd = sedona.db.connect()
>>> a = sd.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)")
>>> b = sd.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)")
>>> a.union(b).sort("x").show()
┌───────┐
│ x │
│ int64 │
╞═══════╡
│ 1 │
├╌╌╌╌╌╌╌┤
│ 2 │
├╌╌╌╌╌╌╌┤
│ 2 │
├╌╌╌╌╌╌╌┤
│ 3 │
└───────┘
"""
self._check_union_compatible(other, "union")
return DataFrame(self._ctx, self._impl.union(other._impl))

def union_distinct(self, other: "DataFrame") -> "DataFrame":
"""Concatenate two DataFrames vertically and drop duplicate rows.

This is SQL `UNION`: the result has no duplicate rows (comparing
all columns). Use `union` to keep duplicates (`UNION ALL`). Both
DataFrames must have the same column names in the same order;
otherwise an error is raised. (To union differently-named columns
positionally, align the names first, e.g. with `select`.)

Args:
other: The DataFrame to append. Must have the same column names,
in the same order, as this DataFrame.

Examples:

>>> sd = sedona.db.connect()
>>> a = sd.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)")
>>> b = sd.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)")
>>> a.union_distinct(b).sort("x").show()
┌───────┐
│ x │
│ int64 │
╞═══════╡
│ 1 │
├╌╌╌╌╌╌╌┤
│ 2 │
├╌╌╌╌╌╌╌┤
│ 3 │
└───────┘
"""
self._check_union_compatible(other, "union_distinct")
return DataFrame(self._ctx, self._impl.union_distinct(other._impl))

def limit(self, n: Optional[int], /, *, offset: int = 0) -> "DataFrame":
"""Limit result to n rows starting at offset

Expand Down
17 changes: 17 additions & 0 deletions python/sedonadb/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,23 @@ impl InternalDataFrame {
Ok(InternalDataFrame::new(inner, self.runtime.clone()))
}

/// `UNION ALL` — concatenate two DataFrames, preserving duplicate rows.
/// The two inputs must have the same schema (matched by position);
/// DataFusion errors at plan-build time otherwise.
fn union(&self, other: &InternalDataFrame) -> Result<InternalDataFrame, PySedonaError> {
let inner = self.inner.clone().union(other.inner.clone())?;
Ok(InternalDataFrame::new(inner, self.runtime.clone()))
}

/// `UNION` — concatenate two DataFrames and drop duplicate rows.
fn union_distinct(
&self,
other: &InternalDataFrame,
) -> Result<InternalDataFrame, PySedonaError> {
let inner = self.inner.clone().union_distinct(other.inner.clone())?;
Ok(InternalDataFrame::new(inner, self.runtime.clone()))
}

fn execute<'py>(&self, py: Python<'py>) -> Result<usize, PySedonaError> {
let df = self.inner.clone();
let count = wait_for_future(py, &self.runtime, async move {
Expand Down
99 changes: 99 additions & 0 deletions python/sedonadb/tests/expr/test_dataframe_union.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pandas as pd
import pandas.testing as pdt
import pytest

from sedonadb.dataframe import DataFrame
from sedonadb.expr import col


def test_union_keeps_duplicates(con):
a = con.create_data_frame(pd.DataFrame({"x": [1, 2]}))
b = con.create_data_frame(pd.DataFrame({"x": [2, 3]}))
out = a.union(b).sort("x").to_pandas()
# UNION ALL: the shared value 2 appears twice.
pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 2, 3]}))


def test_union_multi_column(con):
a = con.create_data_frame(pd.DataFrame({"x": [1], "y": ["a"]}))
b = con.create_data_frame(pd.DataFrame({"x": [2], "y": ["b"]}))
out = a.union(b).sort("x").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2], "y": ["a", "b"]}))


def test_union_distinct_drops_duplicates(con):
a = con.create_data_frame(pd.DataFrame({"x": [1, 2]}))
b = con.create_data_frame(pd.DataFrame({"x": [2, 3]}))
out = a.union_distinct(b).sort("x").to_pandas()
# UNION: the shared value 2 is de-duplicated.
pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 3]}))


def test_union_distinct_dedupes_within_inputs(con):
a = con.create_data_frame(pd.DataFrame({"x": [1, 1, 2]}))
b = con.create_data_frame(pd.DataFrame({"x": [2, 2, 3]}))
out = a.union_distinct(b).sort("x").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 3]}))


@pytest.mark.parametrize("method", ["union", "union_distinct"])
def test_union_different_names_raises(con, method):
# Same column count but different names: rather than silently aligning
# by position (a footgun), require matching names. A positional union of
# differently-named columns must be opted into by aligning names first.
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
b = con.create_data_frame(pd.DataFrame({"y": [1]}))
with pytest.raises(ValueError, match="same column names"):
getattr(a, method)(b)


def test_union_positional_alignment_opt_in(con):
# The opt-in path: align names with select, then union.
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
b = con.create_data_frame(pd.DataFrame({"y": [2]}))
out = a.union(b.select(col("y").alias("x"))).sort("x").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2]}))


def test_union_returns_lazy_dataframe(con):
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
b = con.create_data_frame(pd.DataFrame({"x": [2]}))
assert isinstance(a.union(b), DataFrame)
assert isinstance(a.union_distinct(b), DataFrame)


def test_union_non_dataframe_raises(con):
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
with pytest.raises(TypeError, match="union\\(\\) expects a DataFrame"):
a.union({"x": 1})


def test_union_distinct_non_dataframe_raises(con):
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
with pytest.raises(TypeError, match="union_distinct\\(\\) expects a DataFrame"):
a.union_distinct({"x": 1})


@pytest.mark.parametrize("method", ["union", "union_distinct"])
def test_union_column_count_mismatch_raises(con, method):
a = con.create_data_frame(pd.DataFrame({"x": [1]}))
b = con.create_data_frame(pd.DataFrame({"x": [1], "y": [2]}))
with pytest.raises(ValueError, match="same column names"):
getattr(a, method)(b)