Skip to content

Spark: Support aggregate pushdown for identity partition column GROUP BY#16176

Open
hemanthboyina wants to merge 4 commits intoapache:mainfrom
hemanthboyina:groupby_aggregate
Open

Spark: Support aggregate pushdown for identity partition column GROUP BY#16176
hemanthboyina wants to merge 4 commits intoapache:mainfrom
hemanthboyina:groupby_aggregate

Conversation

@hemanthboyina
Copy link
Copy Markdown
Contributor

This PR enables aggregate pushdown for queries with GROUP BY on identity partition columns. Currently, Iceberg supports pushing down aggregates (COUNT, MIN, MAX) for queries without GROUP BY, computing results from file metadata instead of reading data files. However, when a query includes GROUP BY, the pushdown is disabled even when the GROUP BY columns are identity partition fields.

@github-actions github-actions Bot added the spark label Apr 30, 2026
@singhpk234 singhpk234 requested a review from huaxingao May 2, 2026 02:24
Comment on lines +216 to +217
Map<List<Object>, AggregateEvaluator> evaluatorsByPartition =
groupFilesByPartition(spec, groupByPositions, boundAggregates);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not confident this is correct, plus we are just checking the recent partitioning, a table could comprise of lot of different partition spec files which evolved across snapshots

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @singhpk234 You raised a valid point. the current implementation only considers the current partition spec and bails out for files from different specs. Will look into handling spec evolution properly and update the PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handled partition spec evolution changes, can you please review

Copy link
Copy Markdown
Contributor

@anuragmantri anuragmantri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the useful PR @hemanthboyina. Overall, it looks good to me. I made some suggestions.

return -1;
}

private boolean allGroupByAreIdentityPartitionFields(Aggregation aggregation) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allGroupByAreIdentityPartitionFields() and resolveGroupByFields() look very similar except

  • allGroupByAreIdentityPartitionFields additionally checks instanceof NamedReference
  • resolveGroupByFields additionally collects field IDs and fields into output lists
    Can we merge these two?

Or maybe let canPushDownAggregation() allow group by and then have the checks in this merged method? What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Merged allGroupByAreIdentityPartitionFields into resolveGroupByFields. Removed the separate method and simplified canPushDownAggregation

return true;
}

private static class ArrayStructLike implements StructLike {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use AggregateEvaluator.ArrayStructLike instead? May have to make it package-private.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateEvaluator.ArrayStructLike is private static in the api module. Since SparkScanBuilder is in spark module, I assume even package-private wouldn't help, we'd need to make it public. Kept the changes same to avoid API surface changes. Happy to follow up separately if preferred.

@@ -568,11 +568,9 @@ public void testAggregationPushdownOnBucketedColumn() {
sql(
"CREATE TABLE %s (id BIGINT, struct_with_int STRUCT<c1:INT>) USING iceberg PARTITIONED BY (bucket(8, id))",
tableName);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Unrelated whitespace change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -909,4 +907,183 @@ public void testAggregatePushDownForIncrementalScan() {
assertEquals(
"min/max/count push down", expected2, rowsToJava(unboundedPushdownDs.collectAsList()));
}

@TestTemplate
public void testGroupByIdentityPartitionColumnCountPushDown() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also verify the EXPLAIN string has the pushdown like other tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}

@TestTemplate
public void testGroupByIdentityPartitionColumnWithMinMax() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can we also have explain string verification?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants