diff --git a/benchmarks/sql_benchmarks/unnest_array_agg/benchmarks/q01.benchmark b/benchmarks/sql_benchmarks/unnest_array_agg/benchmarks/q01.benchmark new file mode 100644 index 000000000000..5cbb062f9aa4 --- /dev/null +++ b/benchmarks/sql_benchmarks/unnest_array_agg/benchmarks/q01.benchmark @@ -0,0 +1,44 @@ +-- Bounded memory-shape reproducer for list columns -> unnest expansion -> +-- GROUP BY row id -> ordered array_agg. +-- +-- The reported issue used about 20,000 rows and 2,000 list elements +-- per row (~40M expanded rows). Defaults below are intentionally small +-- (10K expanded rows) so the benchmark is safe to smoke-test locally; +-- set env vars to scale up: +-- UNNEST_ARRAY_AGG_ROWS=20000 UNNEST_ARRAY_AGG_LIST_LEN=2000 + +name Q01 + +group unnest_array_agg +subgroup ordered_array_agg + +assert B +WITH base AS ( + SELECT + value AS row_id, + range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS vals, + range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS val_idx + FROM range(0, ${UNNEST_ARRAY_AGG_ROWS:-100}) +), expanded AS ( + SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx + FROM base +) +SELECT COUNT(*) = ${UNNEST_ARRAY_AGG_ROWS:-100} * ${UNNEST_ARRAY_AGG_LIST_LEN:-100} +FROM expanded; +---- +true + +run +WITH base AS ( + SELECT + value AS row_id, + range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS vals, + range(0, ${UNNEST_ARRAY_AGG_LIST_LEN:-100}) AS val_idx + FROM range(0, ${UNNEST_ARRAY_AGG_ROWS:-100}) +), expanded AS ( + SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx + FROM base +) +SELECT row_id, array_agg(val ORDER BY idx) AS vals +FROM expanded +GROUP BY row_id; diff --git a/datafusion/sqllogictest/test_files/unnest_array_agg_repro.slt b/datafusion/sqllogictest/test_files/unnest_array_agg_repro.slt new file mode 100644 index 000000000000..5882c37980be --- /dev/null +++ b/datafusion/sqllogictest/test_files/unnest_array_agg_repro.slt @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Bounded reproducer for high-memory query shape: +# list columns -> unnest row expansion -> GROUP BY row id -> ordered array_agg. + +statement ok +CREATE TABLE unnest_array_agg_repro AS +SELECT + value AS row_id, + range(0, 4) AS vals, + range(0, 4) AS val_idx +FROM range(0, 3); + +# Three input rows with four elements each expand to twelve rows. +query I +SELECT COUNT(*) +FROM ( + SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx + FROM unnest_array_agg_repro +) expanded; +---- +12 + +# Regroup each expanded row id and force ordered aggregate state. +query I? +SELECT row_id, array_agg(val ORDER BY idx) AS vals +FROM ( + SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx + FROM unnest_array_agg_repro +) expanded +GROUP BY row_id +ORDER BY row_id; +---- +0 [0, 1, 2, 3] +1 [0, 1, 2, 3] +2 [0, 1, 2, 3] + +# Capture the plan shape for the reproducer: UnnestExec feeds AggregateExec with ordered array_agg. +query TT +EXPLAIN VERBOSE +SELECT row_id, array_agg(val ORDER BY idx) AS vals +FROM ( + SELECT row_id, unnest(vals) AS val, unnest(val_idx) AS idx + FROM unnest_array_agg_repro +) expanded +GROUP BY row_id +ORDER BY row_id; +---- +initial_logical_plan +01)Sort: expanded.row_id ASC NULLS LAST +02)--Projection: expanded.row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST] AS vals +03)----Aggregate: groupBy=[[expanded.row_id]], aggr=[[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]]] +04)------SubqueryAlias: expanded +05)--------Projection: unnest_array_agg_repro.row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1) AS UNNEST(unnest_array_agg_repro.vals) AS val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1) AS UNNEST(unnest_array_agg_repro.val_idx) AS idx +06)----------Unnest: lists[__unnest_placeholder(unnest_array_agg_repro.vals)|depth=1, __unnest_placeholder(unnest_array_agg_repro.val_idx)|depth=1] structs[] + +initial_physical_plan +01)SortExec: expr=[row_id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--ProjectionExec: expr=[row_id@0 as row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]@1 as vals] +03)----AggregateExec: mode=FinalPartitioned, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]] +04)------AggregateExec: mode=Partial, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]] +05)--------ProjectionExec: expr=[row_id@0 as row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1)@1 as val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1)@2 as idx] +06)----------UnnestExec + +physical_plan +01)SortPreservingMergeExec: [row_id@0 ASC NULLS LAST] +02)--SortExec: expr=[row_id@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[row_id@0 as row_id, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]@1 as vals] +04)------AggregateExec: mode=FinalPartitioned, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]] +05)--------RepartitionExec: partitioning=Hash([row_id@0], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[row_id@0 as row_id], aggr=[array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST]] +07)------------SortExec: expr=[idx@2 ASC NULLS LAST], preserve_partitioning=[true] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------ProjectionExec: expr=[row_id@0 as row_id, __unnest_placeholder(unnest_array_agg_repro.vals,depth=1)@1 as val, __unnest_placeholder(unnest_array_agg_repro.val_idx,depth=1)@2 as idx] +10)------------------UnnestExec + +physical_plan_with_schema +01)SortPreservingMergeExec: [row_id@0 ASC NULLS LAST], schema=[row_id:Int64, vals:List(Int64);N] + +05)--------RepartitionExec: partitioning=Hash([row_id@0], 4), input_partitions=4, schema=[row_id:Int64, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST][array_agg]:List(Int64);N, array_agg(expanded.val) ORDER BY [expanded.idx ASC NULLS LAST][array_agg_orderings]:List(Struct("idx@2": Int64))] +