diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 664b0d81840ad..693d43f10f57d 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1045,6 +1045,7 @@ def __hash__(self): "pyspark.pandas.tests.groupby.test_aggregate", "pyspark.pandas.tests.groupby.test_apply_func", "pyspark.pandas.tests.groupby.test_corr", + "pyspark.pandas.tests.groupby.test_cov", "pyspark.pandas.tests.groupby.test_cumulative", "pyspark.pandas.tests.groupby.test_describe", "pyspark.pandas.tests.groupby.test_groupby", @@ -1477,6 +1478,7 @@ def __hash__(self): "pyspark.pandas.tests.connect.groupby.test_parity_aggregate", "pyspark.pandas.tests.connect.groupby.test_parity_apply_func", "pyspark.pandas.tests.connect.groupby.test_parity_corr", + "pyspark.pandas.tests.connect.groupby.test_parity_cov", "pyspark.pandas.tests.connect.groupby.test_parity_cumulative", "pyspark.pandas.tests.connect.groupby.test_parity_missing_data", "pyspark.pandas.tests.connect.groupby.test_parity_split_apply", diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst index f86a7572666b4..02b9174ad0abd 100644 --- a/python/docs/source/reference/pyspark.pandas/groupby.rst +++ b/python/docs/source/reference/pyspark.pandas/groupby.rst @@ -97,6 +97,7 @@ The following methods are available only for `DataFrameGroupBy` objects. .. autosummary:: :toctree: api/ + DataFrameGroupBy.cov DataFrameGroupBy.describe The following methods are available only for `SeriesGroupBy` objects. diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index c5b81f05cc578..2cfa1ae2316b2 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -4303,6 +4303,216 @@ def corr( ) ) + @with_ansi_mode_context + def cov( + self, + min_periods: Optional[int] = None, + ddof: int = 1, + numeric_only: bool = False, + ) -> "DataFrame": + """ + Compute pairwise covariance of columns, excluding NA/null values. + + The returned DataFrame is the covariance matrix of the columns + of the DataFrame within each group. + + Both NA and null values are automatically excluded from the + calculation. A threshold can be set for the minimum number of + observations for each value created. Comparisons with observations + below this threshold will be returned as ``NaN``. + + .. versionadded:: 4.3.0 + + Parameters + ---------- + min_periods : int, optional + Minimum number of observations required per pair of columns + to have a valid result. + + ddof : int, default 1 + Delta degrees of freedom. The divisor used in calculations + is ``N - ddof``, where ``N`` represents the number of elements. + + numeric_only : bool, default False + Include only `float`, `int` or `boolean` data. + + Returns + ------- + DataFrame + The covariance matrix of the series of the DataFrame within each group. + + See Also + -------- + DataFrame.cov + Series.cov + + Examples + -------- + >>> df = ps.DataFrame( + ... {"A": [1, 1, 2, 2, 2], "B": [1, 2, 3, 4, 5], "C": [4, 6, 7, 9, 11]}, + ... columns=["A", "B", "C"]) + >>> df.groupby("A").cov().sort_index() + B C + A + 1 B 0.5 1.0 + C 1.0 2.0 + 2 B 1.0 2.0 + C 2.0 4.0 + + >>> df.groupby("A").cov(ddof=0).sort_index() + B C + A + 1 B 0.250000 0.500000 + C 0.500000 1.000000 + 2 B 0.666667 1.333333 + C 1.333333 2.666667 + + >>> df.groupby("A").cov(min_periods=3).sort_index() + B C + A + 1 B NaN NaN + C NaN NaN + 2 B 1.0 2.0 + C 2.0 4.0 + """ + if not isinstance(ddof, int): + raise TypeError("ddof must be integer") + if LooseVersion(pd.__version__) >= "3.0.0": + if not isinstance(numeric_only, bool): + raise ValueError("numeric_only accepts only Boolean values") + min_periods = 1 if min_periods is None else min_periods + + groupkey_names: List[str] = [str(key.name) for key in self._groupkeys] + internal, _, _ = self._prepare_reduce( + groupkey_names=groupkey_names, + accepted_spark_types=(NumericType, BooleanType) if numeric_only else None, + bool_to_numeric=False, + ) + + numeric_labels = [ + label + for label in internal.column_labels + if isinstance(internal.spark_type_for(label), (NumericType, BooleanType)) + ] + numeric_scols: List[Column] = [ + internal.spark_column_for(label).cast("double") for label in numeric_labels + ] + numeric_col_names: List[str] = [name_like_string(label) for label in numeric_labels] + num_scols = len(numeric_scols) + + sdf = internal.spark_frame + index_1_col_name = verify_temp_column_name(sdf, "__groupby_cov_index_1_temp_column__") + index_2_col_name = verify_temp_column_name(sdf, "__groupby_cov_index_2_temp_column__") + value_1_col_name = verify_temp_column_name(sdf, "__groupby_cov_value_1_temp_column__") + value_2_col_name = verify_temp_column_name(sdf, "__groupby_cov_value_2_temp_column__") + cov_output_col_name = verify_temp_column_name(sdf, "__groupby_cov_output_temp_column__") + count_output_col_name = verify_temp_column_name(sdf, "__groupby_cov_count_temp_column__") + + pair_scols: List[Column] = [] + for i in range(0, num_scols): + for j in range(i, num_scols): + pair_scols.append( + F.struct( + F.lit(i).alias(index_1_col_name), + F.lit(j).alias(index_2_col_name), + numeric_scols[i].alias(value_1_col_name), + numeric_scols[j].alias(value_2_col_name), + ) + ) + + sdf = sdf.select(*[F.col(key) for key in groupkey_names], *[F.inline(F.array(*pair_scols))]) + + # Null both values when either is null so pairwise non-null counts are accurate. + sdf = sdf.select( + *[F.col(key) for key in groupkey_names + [index_1_col_name, index_2_col_name]], + F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), F.lit(None)) + .otherwise(F.col(value_1_col_name)) + .alias(value_1_col_name), + F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), F.lit(None)) + .otherwise(F.col(value_2_col_name)) + .alias(value_2_col_name), + ) + + sdf = sdf.groupby(groupkey_names + [index_1_col_name, index_2_col_name]).agg( + SF.covar(F.col(value_1_col_name), F.col(value_2_col_name), ddof).alias( + cov_output_col_name + ), + F.count(F.when(~F.isnull(value_1_col_name), 1)).alias(count_output_col_name), + ) + + sdf = sdf.withColumn( + cov_output_col_name, + F.when(F.col(count_output_col_name) < min_periods, F.lit(None)).otherwise( + F.col(cov_output_col_name) + ), + ) + + # Mirror the (i, j) pair to (j, i) to fill in the lower triangle of the matrix. + auxiliary_col_name = verify_temp_column_name(sdf, "__groupby_cov_auxiliary_temp_column__") + sdf = sdf.withColumn( + auxiliary_col_name, + F.explode( + F.when( + F.col(index_1_col_name) == F.col(index_2_col_name), + F.lit([0]), + ).otherwise(F.lit([0, 1])) + ), + ).select( + *[F.col(key) for key in groupkey_names], + *[ + F.when(F.col(auxiliary_col_name) == 0, F.col(index_1_col_name)) + .otherwise(F.col(index_2_col_name)) + .alias(index_1_col_name), + F.when(F.col(auxiliary_col_name) == 0, F.col(index_2_col_name)) + .otherwise(F.col(index_1_col_name)) + .alias(index_2_col_name), + F.col(cov_output_col_name), + ], + ) + + array_col_name = verify_temp_column_name(sdf, "__groupby_cov_array_temp_column__") + sdf = sdf.groupby(groupkey_names + [index_1_col_name]).agg( + F.array_sort( + F.collect_list( + F.struct( + F.col(index_2_col_name), + F.col(cov_output_col_name), + ) + ) + ).alias(array_col_name) + ) + + for i in range(0, num_scols): + sdf = sdf.withColumn(auxiliary_col_name, F.get(F.col(array_col_name), i)).withColumn( + numeric_col_names[i], + F.col(f"{auxiliary_col_name}.{cov_output_col_name}"), + ) + + sdf = sdf.orderBy(groupkey_names + [index_1_col_name]) + + sdf = sdf.select( + *[F.col(col) for col in groupkey_names + numeric_col_names], + *[ + F.get(F.lit(numeric_col_names), F.col(index_1_col_name)).alias(auxiliary_col_name), + F.monotonically_increasing_id().alias(NATURAL_ORDER_COLUMN_NAME), + ], + ) + + return DataFrame( + InternalFrame( + spark_frame=sdf, + index_spark_columns=[ + scol_for(sdf, key) for key in groupkey_names + [auxiliary_col_name] + ], + index_names=( + [psser._column_label for psser in self._groupkeys] + + self._psdf._internal.index_names + ), + column_labels=numeric_labels, + column_label_names=internal.column_label_names, + ) + ) + class SeriesGroupBy(GroupBy[Series]): @staticmethod diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py index 04891006dee75..9e655384dfacd 100644 --- a/python/pyspark/pandas/missing/groupby.py +++ b/python/pyspark/pandas/missing/groupby.py @@ -42,7 +42,6 @@ class MissingPandasLikeDataFrameGroupBy: # Properties corrwith = _unsupported_property("corrwith") - cov = _unsupported_property("cov") dtypes = _unsupported_property("dtypes") groups = _unsupported_property("groups") hist = _unsupported_property("hist") diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py new file mode 100644 index 0000000000000..389a50ff6e969 --- /dev/null +++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.pandas.tests.groupby.test_cov import CovMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class CovParityTests( + CovMixin, + PandasOnSparkTestUtils, + ReusedConnectTestCase, +): + pass + + +if __name__ == "__main__": + from pyspark.testing import main + + main() diff --git a/python/pyspark/pandas/tests/groupby/test_cov.py b/python/pyspark/pandas/tests/groupby/test_cov.py new file mode 100644 index 0000000000000..849e469b05000 --- /dev/null +++ b/python/pyspark/pandas/tests/groupby/test_cov.py @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np +import pandas as pd + +from pyspark import pandas as ps +from pyspark.loose_version import LooseVersion +from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.sqlutils import SQLTestUtils + + +class CovMixin: + @property + def pdf(self): + return pd.DataFrame( + { + "A": [1, 1, 2, 2, 2, 3], + "B": [-1, 2, 3, 5, 6, 0], + "C": [4, 6, 5, 1, 3, 0], + }, + columns=["A", "B", "C"], + ) + + @property + def psdf(self): + return ps.from_pandas(self.pdf) + + def test_cov(self): + for c in ["A", "B", "C"]: + self.assert_eq( + self.pdf.groupby(c).cov().sort_index(), + self.psdf.groupby(c).cov().sort_index(), + almost=True, + ) + + def test_ddof(self): + # Use a dataset with enough rows per group to keep N - ddof > 0, + # since pandas and Spark diverge on inf/NaN handling when the divisor is non-positive. + pdf = pd.DataFrame( + { + "A": [1, 1, 1, 1, 2, 2, 2, 2], + "B": [1, 2, 3, 4, 5, 6, 7, 8], + "C": [4, 6, 5, 1, 3, 0, 9, 2], + }, + columns=["A", "B", "C"], + ) + psdf = ps.from_pandas(pdf) + for ddof in [0, 1, 2]: + self.assert_eq( + pdf.groupby("A").cov(ddof=ddof).sort_index(), + psdf.groupby("A").cov(ddof=ddof).sort_index(), + almost=True, + ) + + def test_min_periods(self): + pdf = pd.DataFrame( + { + "A": [1, 1, 1, 1, 2, 2], + "B": [1.0, 2.0, np.nan, 4.0, 5.0, 6.0], + "C": [4.0, 6.0, 5.0, 7.0, 1.0, 3.0], + }, + columns=["A", "B", "C"], + ) + psdf = ps.from_pandas(pdf) + for m in [1, 2, 3, 4]: + self.assert_eq( + pdf.groupby("A").cov(min_periods=m).sort_index(), + psdf.groupby("A").cov(min_periods=m).sort_index(), + almost=True, + ) + + def test_numeric_only(self): + pdf = pd.DataFrame( + { + "A": [1, 1, 2, 2, 2], + "B": [1, 2, 3, 4, 5], + "C": [4, 6, 5, 1, 3], + "D": ["x", "y", "z", "w", "v"], + }, + columns=["A", "B", "C", "D"], + ) + psdf = ps.from_pandas(pdf) + self.assert_eq( + pdf.groupby("A").cov(numeric_only=True).sort_index(), + psdf.groupby("A").cov(numeric_only=True).sort_index(), + almost=True, + ) + + def test_invalid_args(self): + with self.assertRaisesRegex(TypeError, "ddof must be integer"): + self.psdf.groupby("A").cov(ddof="1") + + if LooseVersion(pd.__version__) >= "3.0.0": + with self.assertRaisesRegex(ValueError, "numeric_only accepts only Boolean values"): + self.psdf.groupby("A").cov(numeric_only="True") + + +class CovTests( + CovMixin, + PandasOnSparkTestCase, + SQLTestUtils, +): + pass + + +if __name__ == "__main__": + from pyspark.testing import main + + main()