Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ def __hash__(self):
"pyspark.pandas.tests.groupby.test_aggregate",
"pyspark.pandas.tests.groupby.test_apply_func",
"pyspark.pandas.tests.groupby.test_corr",
"pyspark.pandas.tests.groupby.test_cov",
"pyspark.pandas.tests.groupby.test_cumulative",
"pyspark.pandas.tests.groupby.test_describe",
"pyspark.pandas.tests.groupby.test_groupby",
Expand Down Expand Up @@ -1477,6 +1478,7 @@ def __hash__(self):
"pyspark.pandas.tests.connect.groupby.test_parity_aggregate",
"pyspark.pandas.tests.connect.groupby.test_parity_apply_func",
"pyspark.pandas.tests.connect.groupby.test_parity_corr",
"pyspark.pandas.tests.connect.groupby.test_parity_cov",
"pyspark.pandas.tests.connect.groupby.test_parity_cumulative",
"pyspark.pandas.tests.connect.groupby.test_parity_missing_data",
"pyspark.pandas.tests.connect.groupby.test_parity_split_apply",
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.pandas/groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ The following methods are available only for `DataFrameGroupBy` objects.
.. autosummary::
:toctree: api/

DataFrameGroupBy.cov
DataFrameGroupBy.describe

The following methods are available only for `SeriesGroupBy` objects.
Expand Down
210 changes: 210 additions & 0 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4303,6 +4303,216 @@ def corr(
)
)

@with_ansi_mode_context
def cov(
self,
min_periods: Optional[int] = None,
ddof: int = 1,
numeric_only: bool = False,
) -> "DataFrame":
"""
Compute pairwise covariance of columns, excluding NA/null values.

The returned DataFrame is the covariance matrix of the columns
of the DataFrame within each group.

Both NA and null values are automatically excluded from the
calculation. A threshold can be set for the minimum number of
observations for each value created. Comparisons with observations
below this threshold will be returned as ``NaN``.

.. versionadded:: 4.3.0

Parameters
----------
min_periods : int, optional
Minimum number of observations required per pair of columns
to have a valid result.

ddof : int, default 1
Delta degrees of freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.

numeric_only : bool, default False
Include only `float`, `int` or `boolean` data.

Returns
-------
DataFrame
The covariance matrix of the series of the DataFrame within each group.

See Also
--------
DataFrame.cov
Series.cov

Examples
--------
>>> df = ps.DataFrame(
... {"A": [1, 1, 2, 2, 2], "B": [1, 2, 3, 4, 5], "C": [4, 6, 7, 9, 11]},
... columns=["A", "B", "C"])
>>> df.groupby("A").cov().sort_index()
B C
A
1 B 0.5 1.0
C 1.0 2.0
2 B 1.0 2.0
C 2.0 4.0

>>> df.groupby("A").cov(ddof=0).sort_index()
B C
A
1 B 0.250000 0.500000
C 0.500000 1.000000
2 B 0.666667 1.333333
C 1.333333 2.666667

>>> df.groupby("A").cov(min_periods=3).sort_index()
B C
A
1 B NaN NaN
C NaN NaN
2 B 1.0 2.0
C 2.0 4.0
"""
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")
if LooseVersion(pd.__version__) >= "3.0.0":
if not isinstance(numeric_only, bool):
raise ValueError("numeric_only accepts only Boolean values")
min_periods = 1 if min_periods is None else min_periods

groupkey_names: List[str] = [str(key.name) for key in self._groupkeys]
internal, _, _ = self._prepare_reduce(
groupkey_names=groupkey_names,
accepted_spark_types=(NumericType, BooleanType) if numeric_only else None,
bool_to_numeric=False,
)

numeric_labels = [
label
for label in internal.column_labels
if isinstance(internal.spark_type_for(label), (NumericType, BooleanType))
]
numeric_scols: List[Column] = [
internal.spark_column_for(label).cast("double") for label in numeric_labels
]
numeric_col_names: List[str] = [name_like_string(label) for label in numeric_labels]
num_scols = len(numeric_scols)

sdf = internal.spark_frame
index_1_col_name = verify_temp_column_name(sdf, "__groupby_cov_index_1_temp_column__")
index_2_col_name = verify_temp_column_name(sdf, "__groupby_cov_index_2_temp_column__")
value_1_col_name = verify_temp_column_name(sdf, "__groupby_cov_value_1_temp_column__")
value_2_col_name = verify_temp_column_name(sdf, "__groupby_cov_value_2_temp_column__")
cov_output_col_name = verify_temp_column_name(sdf, "__groupby_cov_output_temp_column__")
count_output_col_name = verify_temp_column_name(sdf, "__groupby_cov_count_temp_column__")

pair_scols: List[Column] = []
for i in range(0, num_scols):
for j in range(i, num_scols):
pair_scols.append(
F.struct(
F.lit(i).alias(index_1_col_name),
F.lit(j).alias(index_2_col_name),
numeric_scols[i].alias(value_1_col_name),
numeric_scols[j].alias(value_2_col_name),
)
)

sdf = sdf.select(*[F.col(key) for key in groupkey_names], *[F.inline(F.array(*pair_scols))])

# Null both values when either is null so pairwise non-null counts are accurate.
sdf = sdf.select(
*[F.col(key) for key in groupkey_names + [index_1_col_name, index_2_col_name]],
F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), F.lit(None))
.otherwise(F.col(value_1_col_name))
.alias(value_1_col_name),
F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), F.lit(None))
.otherwise(F.col(value_2_col_name))
.alias(value_2_col_name),
)

sdf = sdf.groupby(groupkey_names + [index_1_col_name, index_2_col_name]).agg(
SF.covar(F.col(value_1_col_name), F.col(value_2_col_name), ddof).alias(
cov_output_col_name
),
F.count(F.when(~F.isnull(value_1_col_name), 1)).alias(count_output_col_name),
)

sdf = sdf.withColumn(
cov_output_col_name,
F.when(F.col(count_output_col_name) < min_periods, F.lit(None)).otherwise(
F.col(cov_output_col_name)
),
)

# Mirror the (i, j) pair to (j, i) to fill in the lower triangle of the matrix.
auxiliary_col_name = verify_temp_column_name(sdf, "__groupby_cov_auxiliary_temp_column__")
sdf = sdf.withColumn(
auxiliary_col_name,
F.explode(
F.when(
F.col(index_1_col_name) == F.col(index_2_col_name),
F.lit([0]),
).otherwise(F.lit([0, 1]))
),
).select(
*[F.col(key) for key in groupkey_names],
*[
F.when(F.col(auxiliary_col_name) == 0, F.col(index_1_col_name))
.otherwise(F.col(index_2_col_name))
.alias(index_1_col_name),
F.when(F.col(auxiliary_col_name) == 0, F.col(index_2_col_name))
.otherwise(F.col(index_1_col_name))
.alias(index_2_col_name),
F.col(cov_output_col_name),
],
)

array_col_name = verify_temp_column_name(sdf, "__groupby_cov_array_temp_column__")
sdf = sdf.groupby(groupkey_names + [index_1_col_name]).agg(
F.array_sort(
F.collect_list(
F.struct(
F.col(index_2_col_name),
F.col(cov_output_col_name),
)
)
).alias(array_col_name)
)

for i in range(0, num_scols):
sdf = sdf.withColumn(auxiliary_col_name, F.get(F.col(array_col_name), i)).withColumn(
numeric_col_names[i],
F.col(f"{auxiliary_col_name}.{cov_output_col_name}"),
)

sdf = sdf.orderBy(groupkey_names + [index_1_col_name])

sdf = sdf.select(
*[F.col(col) for col in groupkey_names + numeric_col_names],
*[
F.get(F.lit(numeric_col_names), F.col(index_1_col_name)).alias(auxiliary_col_name),
F.monotonically_increasing_id().alias(NATURAL_ORDER_COLUMN_NAME),
],
)

return DataFrame(
InternalFrame(
spark_frame=sdf,
index_spark_columns=[
scol_for(sdf, key) for key in groupkey_names + [auxiliary_col_name]
],
index_names=(
[psser._column_label for psser in self._groupkeys]
+ self._psdf._internal.index_names
),
column_labels=numeric_labels,
column_label_names=internal.column_label_names,
)
)


class SeriesGroupBy(GroupBy[Series]):
@staticmethod
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/pandas/missing/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class MissingPandasLikeDataFrameGroupBy:

# Properties
corrwith = _unsupported_property("corrwith")
cov = _unsupported_property("cov")
dtypes = _unsupported_property("dtypes")
groups = _unsupported_property("groups")
hist = _unsupported_property("hist")
Expand Down
34 changes: 34 additions & 0 deletions python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.pandas.tests.groupby.test_cov import CovMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.testing.pandasutils import PandasOnSparkTestUtils


class CovParityTests(
CovMixin,
PandasOnSparkTestUtils,
ReusedConnectTestCase,
):
pass


if __name__ == "__main__":
from pyspark.testing import main

main()
Loading