diff --git a/python/sedonadb/python/sedonadb/dataframe.py b/python/sedonadb/python/sedonadb/dataframe.py index 7bace3e2f..a0e667502 100644 --- a/python/sedonadb/python/sedonadb/dataframe.py +++ b/python/sedonadb/python/sedonadb/dataframe.py @@ -905,6 +905,93 @@ def distinct_on(self, *cols: Union[str, Expr]) -> "DataFrame": return DataFrame(self._ctx, self._impl.distinct_on(coerced)) + def _check_union_compatible(self, other: "DataFrame", method: str) -> None: + # Both inputs must line up by column name (in order) so a positional + # union can't silently misalign differently-named columns — the + # classic footgun. Callers that want a positional union of + # differently-named columns opt in by aliasing/selecting to align + # the names first. + if not isinstance(other, DataFrame): + raise TypeError( + f"{method}() expects a DataFrame as the first argument, " + f"got {type(other).__name__}" + ) + if self.columns != other.columns: + raise ValueError( + f"{method}() requires both DataFrames to have the same column " + f"names in the same order; got {self.columns} and " + f"{other.columns}. Align them first (e.g. with select) if you " + f"intend a positional union." + ) + + def union(self, other: "DataFrame") -> "DataFrame": + """Concatenate two DataFrames vertically, keeping duplicate rows. + + This is SQL `UNION ALL` (and matches PySpark's `union`): duplicate + rows are preserved. Use `union_distinct` for SQL `UNION` semantics. + Both DataFrames must have the same column names in the same order; + otherwise an error is raised. (To union differently-named columns + positionally, align the names first, e.g. with `select`.) + + Args: + other: The DataFrame to append. Must have the same column names, + in the same order, as this DataFrame. + + Examples: + + >>> sd = sedona.db.connect() + >>> a = sd.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)") + >>> b = sd.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)") + >>> a.union(b).sort("x").show() + ┌───────┐ + │ x │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ├╌╌╌╌╌╌╌┤ + │ 2 │ + ├╌╌╌╌╌╌╌┤ + │ 2 │ + ├╌╌╌╌╌╌╌┤ + │ 3 │ + └───────┘ + """ + self._check_union_compatible(other, "union") + return DataFrame(self._ctx, self._impl.union(other._impl)) + + def union_distinct(self, other: "DataFrame") -> "DataFrame": + """Concatenate two DataFrames vertically and drop duplicate rows. + + This is SQL `UNION`: the result has no duplicate rows (comparing + all columns). Use `union` to keep duplicates (`UNION ALL`). Both + DataFrames must have the same column names in the same order; + otherwise an error is raised. (To union differently-named columns + positionally, align the names first, e.g. with `select`.) + + Args: + other: The DataFrame to append. Must have the same column names, + in the same order, as this DataFrame. + + Examples: + + >>> sd = sedona.db.connect() + >>> a = sd.sql("SELECT * FROM (VALUES (1), (2)) AS t(x)") + >>> b = sd.sql("SELECT * FROM (VALUES (2), (3)) AS t(x)") + >>> a.union_distinct(b).sort("x").show() + ┌───────┐ + │ x │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ├╌╌╌╌╌╌╌┤ + │ 2 │ + ├╌╌╌╌╌╌╌┤ + │ 3 │ + └───────┘ + """ + self._check_union_compatible(other, "union_distinct") + return DataFrame(self._ctx, self._impl.union_distinct(other._impl)) + def limit(self, n: Optional[int], /, *, offset: int = 0) -> "DataFrame": """Limit result to n rows starting at offset diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs index ddbf87cdb..761b4f171 100644 --- a/python/sedonadb/src/dataframe.rs +++ b/python/sedonadb/src/dataframe.rs @@ -325,6 +325,23 @@ impl InternalDataFrame { Ok(InternalDataFrame::new(inner, self.runtime.clone())) } + /// `UNION ALL` — concatenate two DataFrames, preserving duplicate rows. + /// The two inputs must have the same schema (matched by position); + /// DataFusion errors at plan-build time otherwise. + fn union(&self, other: &InternalDataFrame) -> Result { + let inner = self.inner.clone().union(other.inner.clone())?; + Ok(InternalDataFrame::new(inner, self.runtime.clone())) + } + + /// `UNION` — concatenate two DataFrames and drop duplicate rows. + fn union_distinct( + &self, + other: &InternalDataFrame, + ) -> Result { + let inner = self.inner.clone().union_distinct(other.inner.clone())?; + Ok(InternalDataFrame::new(inner, self.runtime.clone())) + } + fn execute<'py>(&self, py: Python<'py>) -> Result { let df = self.inner.clone(); let count = wait_for_future(py, &self.runtime, async move { diff --git a/python/sedonadb/tests/expr/test_dataframe_union.py b/python/sedonadb/tests/expr/test_dataframe_union.py new file mode 100644 index 000000000..8bc25519d --- /dev/null +++ b/python/sedonadb/tests/expr/test_dataframe_union.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +import pandas.testing as pdt +import pytest + +from sedonadb.dataframe import DataFrame +from sedonadb.expr import col + + +def test_union_keeps_duplicates(con): + a = con.create_data_frame(pd.DataFrame({"x": [1, 2]})) + b = con.create_data_frame(pd.DataFrame({"x": [2, 3]})) + out = a.union(b).sort("x").to_pandas() + # UNION ALL: the shared value 2 appears twice. + pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 2, 3]})) + + +def test_union_multi_column(con): + a = con.create_data_frame(pd.DataFrame({"x": [1], "y": ["a"]})) + b = con.create_data_frame(pd.DataFrame({"x": [2], "y": ["b"]})) + out = a.union(b).sort("x").to_pandas() + pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2], "y": ["a", "b"]})) + + +def test_union_distinct_drops_duplicates(con): + a = con.create_data_frame(pd.DataFrame({"x": [1, 2]})) + b = con.create_data_frame(pd.DataFrame({"x": [2, 3]})) + out = a.union_distinct(b).sort("x").to_pandas() + # UNION: the shared value 2 is de-duplicated. + pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 3]})) + + +def test_union_distinct_dedupes_within_inputs(con): + a = con.create_data_frame(pd.DataFrame({"x": [1, 1, 2]})) + b = con.create_data_frame(pd.DataFrame({"x": [2, 2, 3]})) + out = a.union_distinct(b).sort("x").to_pandas() + pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2, 3]})) + + +@pytest.mark.parametrize("method", ["union", "union_distinct"]) +def test_union_different_names_raises(con, method): + # Same column count but different names: rather than silently aligning + # by position (a footgun), require matching names. A positional union of + # differently-named columns must be opted into by aligning names first. + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + b = con.create_data_frame(pd.DataFrame({"y": [1]})) + with pytest.raises(ValueError, match="same column names"): + getattr(a, method)(b) + + +def test_union_positional_alignment_opt_in(con): + # The opt-in path: align names with select, then union. + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + b = con.create_data_frame(pd.DataFrame({"y": [2]})) + out = a.union(b.select(col("y").alias("x"))).sort("x").to_pandas() + pdt.assert_frame_equal(out, pd.DataFrame({"x": [1, 2]})) + + +def test_union_returns_lazy_dataframe(con): + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + b = con.create_data_frame(pd.DataFrame({"x": [2]})) + assert isinstance(a.union(b), DataFrame) + assert isinstance(a.union_distinct(b), DataFrame) + + +def test_union_non_dataframe_raises(con): + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + with pytest.raises(TypeError, match="union\\(\\) expects a DataFrame"): + a.union({"x": 1}) + + +def test_union_distinct_non_dataframe_raises(con): + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + with pytest.raises(TypeError, match="union_distinct\\(\\) expects a DataFrame"): + a.union_distinct({"x": 1}) + + +@pytest.mark.parametrize("method", ["union", "union_distinct"]) +def test_union_column_count_mismatch_raises(con, method): + a = con.create_data_frame(pd.DataFrame({"x": [1]})) + b = con.create_data_frame(pd.DataFrame({"x": [1], "y": [2]})) + with pytest.raises(ValueError, match="same column names"): + getattr(a, method)(b)