-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Spark: Support aggregate pushdown for identity partition column GROUP BY #16176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import org.apache.iceberg.BaseMetadataTable; | ||
| import org.apache.iceberg.BaseTable; | ||
|
|
@@ -28,6 +29,8 @@ | |
| import org.apache.iceberg.IncrementalAppendScan; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.MetricsModes; | ||
| import org.apache.iceberg.PartitionField; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.ScanTask; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.Snapshot; | ||
|
|
@@ -41,12 +44,15 @@ | |
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.spark.Spark3Util; | ||
| import org.apache.iceberg.spark.SparkAggregates; | ||
| import org.apache.iceberg.spark.SparkSchemaUtil; | ||
| import org.apache.iceberg.spark.SparkTableUtil; | ||
| import org.apache.iceberg.spark.SparkUtil; | ||
| import org.apache.iceberg.spark.TimeTravel; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
|
|
@@ -157,6 +163,10 @@ public boolean pushAggregation(Aggregation aggregation) { | |
| return false; | ||
| } | ||
|
|
||
| if (aggregation.groupByExpressions().length > 0) { | ||
| return pushGroupByAggregation(aggregation, expressions); | ||
| } | ||
|
|
||
| try (CloseableIterable<FileScanTask> fileScanTasks = planFilesWithStats()) { | ||
| for (FileScanTask task : fileScanTasks) { | ||
| if (!task.deletes().isEmpty()) { | ||
|
|
@@ -186,6 +196,213 @@ public boolean pushAggregation(Aggregation aggregation) { | |
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Push down aggregation with GROUP BY on identity partition columns. When all GROUP BY columns | ||
| * are identity partition fields, aggregates can be computed from file metadata grouped by | ||
| * partition values, avoiding reading any data files. | ||
| */ | ||
| private boolean pushGroupByAggregation( | ||
| Aggregation aggregation, List<BoundAggregate<?, ?>> boundAggregates) { | ||
| PartitionSpec spec = table().spec(); | ||
| Schema tableSchema = table().schema(); | ||
|
|
||
| List<Integer> groupByPositions = Lists.newArrayList(); | ||
| List<Types.NestedField> groupByFields = Lists.newArrayList(); | ||
| if (!resolveGroupByPartitions( | ||
| aggregation, spec, tableSchema, groupByPositions, groupByFields)) { | ||
| return false; | ||
| } | ||
|
|
||
| Map<List<Object>, AggregateEvaluator> evaluatorsByPartition = | ||
| groupFilesByPartition(spec, groupByPositions, boundAggregates); | ||
| if (evaluatorsByPartition == null) { | ||
| return false; | ||
| } | ||
|
|
||
| localScan = buildGroupedLocalScan(groupByFields, evaluatorsByPartition); | ||
| return localScan != null; | ||
| } | ||
|
|
||
| private boolean resolveGroupByPartitions( | ||
| Aggregation aggregation, | ||
| PartitionSpec spec, | ||
| Schema tableSchema, | ||
| List<Integer> groupByPositions, | ||
| List<Types.NestedField> groupByFields) { | ||
| for (org.apache.spark.sql.connector.expressions.Expression groupByExpr : | ||
| aggregation.groupByExpressions()) { | ||
| String colName = | ||
| SparkUtil.toColumnName( | ||
| (org.apache.spark.sql.connector.expressions.NamedReference) groupByExpr); | ||
| Types.NestedField sourceField = tableSchema.findField(colName); | ||
| if (sourceField == null) { | ||
| LOG.info("Skipping grouped aggregate pushdown: cannot find field {}", colName); | ||
| return false; | ||
| } | ||
|
|
||
| int pos = findIdentityPartitionPosition(spec, sourceField.fieldId()); | ||
| if (pos < 0) { | ||
| LOG.info( | ||
| "Skipping grouped aggregate pushdown: {} is not an identity partition field", colName); | ||
| return false; | ||
| } | ||
|
|
||
| groupByPositions.add(pos); | ||
| groupByFields.add(sourceField); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| private Map<List<Object>, AggregateEvaluator> groupFilesByPartition( | ||
| PartitionSpec spec, | ||
| List<Integer> groupByPositions, | ||
| List<BoundAggregate<?, ?>> boundAggregates) { | ||
| Map<List<Object>, AggregateEvaluator> evaluatorsByPartition = Maps.newLinkedHashMap(); | ||
|
|
||
| try (CloseableIterable<FileScanTask> fileScanTasks = planFilesWithStats()) { | ||
| for (FileScanTask task : fileScanTasks) { | ||
| if (!task.deletes().isEmpty()) { | ||
| LOG.info("Skipping grouped aggregate pushdown: detected row level deletes"); | ||
| return null; | ||
| } | ||
|
|
||
| if (task.file().specId() != spec.specId()) { | ||
| LOG.info( | ||
| "Skipping grouped aggregate pushdown: file from different spec {}", | ||
| task.file().specId()); | ||
| return null; | ||
| } | ||
|
|
||
| StructLike partition = task.file().partition(); | ||
| List<Object> key = Lists.newArrayListWithCapacity(groupByPositions.size()); | ||
| for (int pos : groupByPositions) { | ||
| key.add(partition.get(pos, Object.class)); | ||
| } | ||
|
|
||
| evaluatorsByPartition | ||
| .computeIfAbsent(key, k -> AggregateEvaluator.create(boundAggregates)) | ||
| .update(task.file()); | ||
| } | ||
| } catch (IOException e) { | ||
| LOG.info("Skipping grouped aggregate pushdown: ", e); | ||
| return null; | ||
| } | ||
|
|
||
| if (evaluatorsByPartition.isEmpty()) { | ||
| return null; | ||
| } | ||
|
|
||
| for (AggregateEvaluator evaluator : evaluatorsByPartition.values()) { | ||
| if (!evaluator.allAggregatorsValid()) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| return evaluatorsByPartition; | ||
| } | ||
|
|
||
| private SparkLocalScan buildGroupedLocalScan( | ||
| List<Types.NestedField> groupByFields, | ||
| Map<List<Object>, AggregateEvaluator> evaluatorsByPartition) { | ||
| AggregateEvaluator firstEvaluator = evaluatorsByPartition.values().iterator().next(); | ||
| List<Types.NestedField> resultFields = Lists.newArrayList(); | ||
| int fieldId = 0; | ||
|
|
||
| for (Types.NestedField field : groupByFields) { | ||
| resultFields.add(Types.NestedField.optional(fieldId++, field.name(), field.type())); | ||
| } | ||
|
|
||
| for (Types.NestedField field : firstEvaluator.resultType().fields()) { | ||
| resultFields.add(Types.NestedField.optional(fieldId++, field.name(), field.type())); | ||
| } | ||
|
|
||
| Types.StructType resultType = Types.StructType.of(resultFields); | ||
| List<InternalRow> resultRows = Lists.newArrayList(); | ||
|
|
||
| for (Map.Entry<List<Object>, AggregateEvaluator> entry : evaluatorsByPartition.entrySet()) { | ||
| List<Object> partitionValues = entry.getKey(); | ||
| StructLike aggResult = entry.getValue().result(); | ||
|
|
||
| Object[] combined = new Object[resultFields.size()]; | ||
| for (int i = 0; i < partitionValues.size(); i++) { | ||
| combined[i] = partitionValues.get(i); | ||
| } | ||
|
|
||
| for (int i = 0; i < aggResult.size(); i++) { | ||
| combined[partitionValues.size() + i] = aggResult.get(i, Object.class); | ||
| } | ||
|
|
||
| resultRows.add(new StructInternalRow(resultType).setStruct(new ArrayStructLike(combined))); | ||
| } | ||
|
|
||
| StructType pushedSchema = SparkSchemaUtil.convert(new Schema(resultFields)); | ||
| return new SparkLocalScan( | ||
| table(), pushedSchema, resultRows.toArray(new InternalRow[0]), filters()); | ||
| } | ||
|
|
||
| private int findIdentityPartitionPosition(PartitionSpec spec, int sourceFieldId) { | ||
| List<PartitionField> fields = spec.fields(); | ||
| for (int i = 0; i < fields.size(); i++) { | ||
| PartitionField field = fields.get(i); | ||
| if (field.sourceId() == sourceFieldId && field.transform().isIdentity()) { | ||
| return i; | ||
| } | ||
| } | ||
|
|
||
| return -1; | ||
| } | ||
|
|
||
| private boolean allGroupByAreIdentityPartitionFields(Aggregation aggregation) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Or maybe let canPushDownAggregation() allow group by and then have the checks in this merged method? What do you think? |
||
| PartitionSpec spec = table().spec(); | ||
| Schema tableSchema = table().schema(); | ||
|
|
||
| for (org.apache.spark.sql.connector.expressions.Expression groupByExpr : | ||
| aggregation.groupByExpressions()) { | ||
| if (!(groupByExpr instanceof org.apache.spark.sql.connector.expressions.NamedReference)) { | ||
| return false; | ||
| } | ||
|
|
||
| String colName = | ||
| SparkUtil.toColumnName( | ||
| (org.apache.spark.sql.connector.expressions.NamedReference) groupByExpr); | ||
| Types.NestedField sourceField = tableSchema.findField(colName); | ||
| if (sourceField == null) { | ||
| return false; | ||
| } | ||
|
|
||
| if (findIdentityPartitionPosition(spec, sourceField.fieldId()) < 0) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| private static class ArrayStructLike implements StructLike { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
| private final Object[] values; | ||
|
|
||
| ArrayStructLike(Object[] values) { | ||
| this.values = values; | ||
| } | ||
|
|
||
| @Override | ||
| public int size() { | ||
| return values.length; | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public <T> T get(int pos, Class<T> javaClass) { | ||
| return (T) values[pos]; | ||
| } | ||
|
|
||
| @Override | ||
| public <T> void set(int pos, T value) { | ||
| values[pos] = value; | ||
| } | ||
| } | ||
|
|
||
| private boolean canPushDownAggregation(Aggregation aggregation) { | ||
| if (!isMainTable()) { | ||
| return false; | ||
|
|
@@ -195,12 +412,12 @@ private boolean canPushDownAggregation(Aggregation aggregation) { | |
| return false; | ||
| } | ||
|
|
||
| // If group by expression is the same as the partition, the statistics information can still | ||
| // be used to calculate min/max/count, will enable aggregate push down in next phase. | ||
| // TODO: enable aggregate push down for partition col group by expression | ||
| if (aggregation.groupByExpressions().length > 0) { | ||
| LOG.info("Skipping aggregate pushdown: group by aggregation push down is not supported"); | ||
| return false; | ||
| if (!allGroupByAreIdentityPartitionFields(aggregation)) { | ||
| LOG.info( | ||
| "Skipping aggregate pushdown: group by columns must all be identity partition fields"); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not confident this is correct, plus we are just checking the recent partitioning, a table could comprise of lot of different partition spec files which evolved across snapshots
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @singhpk234 You raised a valid point. the current implementation only considers the current partition spec and bails out for files from different specs. Will look into handling spec evolution properly and update the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handled partition spec evolution changes, can you please review