Skip to content

[flink] supports agg pushdown with partitions groupby in flink#7828

Merged
JingsongLi merged 6 commits into
apache:masterfrom
steFaiz:flink_count_star_agg
May 14, 2026
Merged

[flink] supports agg pushdown with partitions groupby in flink#7828
JingsongLi merged 6 commits into
apache:masterfrom
steFaiz:flink_count_star_agg

Conversation

@steFaiz
Copy link
Copy Markdown
Contributor

@steFaiz steFaiz commented May 12, 2026

Purpose

Mirrors spark implementation.
This PR originates from #7779. In our internal case, we find it hard for us to quickly get the real records num for data evolution tables, especially for each partition.
Currently both snapshots system table and partitions system table only shows unmerged records num. We could get the accurate values by count(*) agg pushdown (probably with group by clause) through flink OLAP queries.

Tests

Unit Tests and ITCases

@steFaiz steFaiz marked this pull request as draft May 12, 2026 09:39
@steFaiz steFaiz changed the title [flink] supports MIN/MAX agg pushdown and groupby partitions in flink [wip][flink] supports MIN/MAX agg pushdown and groupby partitions in flink May 12, 2026
@steFaiz steFaiz force-pushed the flink_count_star_agg branch from 9d55733 to 07a5152 Compare May 13, 2026 02:25
@steFaiz steFaiz marked this pull request as ready for review May 13, 2026 06:44
@steFaiz steFaiz changed the title [wip][flink] supports MIN/MAX agg pushdown and groupby partitions in flink supports agg pushdown with partitions groupby in flink May 13, 2026
@steFaiz steFaiz changed the title supports agg pushdown with partitions groupby in flink [flink] supports agg pushdown with partitions groupby in flink May 13, 2026
@steFaiz steFaiz closed this May 13, 2026
@steFaiz steFaiz reopened this May 13, 2026
int originalFieldIndex =
toOriginalFieldIndex(
fieldIndexMapping,
aggregateExpression.getArgs().get(0).getFieldIndex());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extractAggregates uses aggregateExpression.getArgs().get(0).getFieldIndex() — This assumes the expression argument is a FieldReferenceExpression. If Flink ever returns a computed expression as arg (e.g., MIN(f0 + 1)), this would fail at runtime.

Can you add test to cover this?

Copy link
Copy Markdown
Contributor Author

@steFaiz steFaiz May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your remind! During implementation, I referred to Flink's original agg pushdown interface, which says it only supports pushing down simple fields:

image

For example, if the sql is:

SELECT MIN(f0 * 2) FROM paimon

The plan will be:

LocalHashAggregate(Partial_MIN($f0))
  +- Calc(select=[(f0 * 2) AS $f0])
     +- TableSourceScan(project=[f0])

Since Flink does not consider these situations, the args of AggregateExpression is hard-coded as FieldReferenceExpression:
image

I've added a test for these cases!

@JingsongLi
Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 2ba33bb into apache:master May 14, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants