Skip to content

feat: canonical H2O groundwork — top/bot/pearson_corr/pow + 4 fixes#202

Merged
singaraiona merged 27 commits into
masterfrom
feat/canonical-h2o
May 17, 2026
Merged

feat: canonical H2O groundwork — top/bot/pearson_corr/pow + 4 fixes#202
singaraiona merged 27 commits into
masterfrom
feat/canonical-h2o

Conversation

@ser-vasilich
Copy link
Copy Markdown
Collaborator

@ser-vasilich ser-vasilich commented May 15, 2026

Summary

Foundational primitives and bug fixes required to run the canonical H2O benchmark suite. 9 commits. No perf work here — that lands in a follow-up PR against this branch.

Aggregate / arithmetic primitives

  • `feat(sort): add top / bot — partial top-N / bottom-N` (3a9d70f)
  • `feat(agg): add pearson_corr — Pearson correlation coefficient` (8f974a6)
  • `feat(arith): add pow as binary atomic` (a4bdba6)

Bug fixes uncovered while building the canonical H2O suite

  • `fix(collection): atom_eq RAY_LIST does structural compare, not memcmp on pointers` (bf7d560) — composite-LIST group keys never matched
  • `fix(query): multi-key + non-agg routes through eval-level group` (bbd2c72)
  • `fix(query): nested aggregates in non-agg expr evaluate per group` (2647ea6) — q7 `(- (max v1) (min v2))` was collapsing globally
  • `fix(group): O(1) lookup + grow for LIST path` (3c6e5c0) + regression test (27a85ea) — >1024 unique LIST keys

Test + bench files

  • `bench(h2o): add q9 — pearson² per group (id2, id4)` (4d6d9ae)

Related

Companion PRs:

Test plan

  • `make test` passes (2406/2407, 1 skipped)
  • Reviewer: `make check LOCAL=1` in rayforce-bench → `pass — 665/665` (requires all 3 companion branches checked out)

… on pointers

For RAY_LIST a->type fell through atom_eq's default branch which does
memcmp on ray_data — i.e. on the ray_t** pointer array, not the
elements.  Two structurally-identical lists with different element
pointers (the common case after construction) compared not-equal,
silently breaking ray_group_fn / ray_dict / distinct fallback for any
code that built composite-list keys.  Concretely: (group (list (list
1 2) (list 1 2) (list 3 4))) returned three buckets instead of two,
and the eval-level multi-key group-by path (used for non-agg
expressions) put every row in its own group.

Add a RAY_LIST case that recurses element-wise.  Vector LIST keys
are still bounded by ngroups (caller-side).

Tests in test/test_atom.c cover:
  - basic same-shape compare across different pointers
  - mixed-type elements (i64 + f64 + str)
  - nested LIST-of-LIST
  - per-element null short-circuit
  - empty lists
  - sym-atom rows (the q6 multi-key composite-key shape)
Two related bugs blocked canonical H2O groupby queries on multi-key
by-clauses:

  1. The planner had a guard that rejected non-agg expressions with
     multi-key by outright (nyi error).  The eval-level multi-key
     path already implements grouping correctly — drop the guard and
     let the path take it.  Closes q6 (median + multi-key) and the
     multi-key shape of q7 (arith-of-aggregates).

  2. bind_col_slice resolved per-group slices via ray_at_fn, which
     boxes (typed-vec idx-vec) into a RAY_LIST of atoms.  desc/asc/
     take then refused with "type: desc expects a vector".  Slice
     directly via gather_by_idx for typed-vec + I64-idx-vec; fall
     back to ray_at_fn for LIST inputs and other shapes the gather
     kernel doesn't cover.  Unblocks q8 (per-group top-N via
     `(take (desc v) n)`).

Single-key (- (max v1) (min v2)) by id3 still broadcasts global —
that path goes through the DAG fast-scatter, not eval, and the
arith-of-aggregates handling there is a separate fix.

Updated test/test_lang.c::test_eval_select_by_multi_nonagg from
asserting the nyi error to asserting the new working behaviour, and
added test/rfl/integration/canonical_h2o.rfl with q6 / q7 (multi-key
shape) / q8 / atom_eq composite-key regressions.
(- (max v1) (min v2)) inside a single-key (by:) projection collapsed
both inner aggregates globally — the post-DAG scatter ran one full-
table ray_eval, got an atom (= global max - global min), then
broadcast that atom to every group cell.

The classifier expr_refs_row_column short-circuits on is_agg_expr
subtrees because aggregating a column collapses it to a scalar.
That's correct for `(max col)` standalone but masks "non-agg outer
+ agg inner" shapes from the row-alignment check, which then takes
the constant-broadcast branch.

Add expr_contains_agg (recursive walk) and route any non-agg expr
that contains an aggregate subexpression through
nonagg_eval_per_group_buf — the same per-group eval path the eval-
level multi-key fix uses.  Each nested agg now reduces inside its
group's slice.

Updates test_eval_select_by_nonagg_with_agg_subexpr from asserting
the old broadcast (m=[211,211]) to the canonical SQL/k semantic
(m=[91,121] for (+ 1 (sum p))), and adds a single-key q7 case in
test/rfl/integration/canonical_h2o.rfl.
Engine has sqrt/log/exp but no pow.  Needed for q9 (pearson_corr
manual reconstruction) and generally useful — closes the gap with
polars/numpy/pandas Column.pow().

Returns F64 regardless of input types; libm pow() handles fractional
exponents (e.g. (pow 2 0.5) → 1.41…).  Null in either operand
propagates to typed F64 null.  Registered as RAY_FN_ATOMIC so vec
broadcasts go through the existing per-element dispatch — no DAG
opcode yet (perf follow-up).

Tests in test/rfl/arith/pow.rfl cover atom/atom, vec/atom, atom/vec,
vec/vec, null propagation, the (pow x 2) ≡ (* x x) identity, the
pow-then-root round-trip, and type-error paths.
(top v n) returns the n largest values from v in descending order;
(bot v n) returns the n smallest in ascending order.  Per-group use
inside select closes q8 (largest-N per id6) without sorting the full
group:

  (select {top2: (top v 2) by: id6 from: t})

Implementation routes through topk_indices_single — the same
bounded-heap O(N log K) path that powers ray_topk_table, falling
back to ray_desc/ray_asc + take for STR/GUID/LIST/SYM and the n>=len
edge case.  Output type matches the input type.

Tests in test/rfl/arith/top_bot.rfl cover narrow ints (I16/I32/U8),
F64, negative values, n-edge cases (0, len, > len, negative), the
(top v 1) == (max v) and (top v len) == (desc v) identities, the
prefix invariant against full sort, per-group usage with single and
multi-key by-clauses, and the type-error path.
(pearson_corr x y) returns the Pearson correlation coefficient between
two numeric vectors of equal length:

  r = (n·Σxy − Σx·Σy) / sqrt((n·Σx² − Σx²)(n·Σy² − Σy²))

Single-pass formulation with F64 accumulators; nulls in either side
skip the row from BOTH sums (pairwise complete-case deletion, matching
polars / pandas pearson_corr default).  Returns F64 NaN when n < 2 or
either column has zero variance (correlation undefined).

Per-group usage routes through the eval-level scatter — the planner
sees a non-agg expression with column refs that collapses to a scalar
on the full table, and the non-row-aligned fallback re-runs per group.
This unblocks q9 of the canonical H2O benchmark:

  (select {r2: (pow (pearson_corr v1 v2) 2) by: {id2 id4} from: df})

Tests in test/rfl/agg/pearson_corr.rfl cover perfect ±1 cases, F64
return type, narrow integer coercion (I32/I16/U8), n<2 / zero-variance
NaN paths, symmetry r(x,y)==r(y,x), self-correlation == 1.0, the
|r| <= 1.0 bound, error paths (length / type / non-numeric), and the
canonical q9 single-key + multi-key shapes end-to-end.

First-pass implementation goes through collection_elem + as_f64 for
type-agnostic numeric reads — type-specialised inner loops are a perf
follow-up.
The LIST path in ray_group_fn historically capped unique groups at the
initial 1024-slot kblock with no resizing — once `ngroups >= max_groups`
it returned `error: limit`.  Multi-key non-agg select-by builds a
composite-LIST-of-LISTs key and routes through this path; on H2O K=100
datasets the cartesian product of two 100-cardinality keys reaches up
to 10k groups, so the cap fires on real workloads.

Add direct-call coverage:
  (group <2000 unique 2-element list keys>)
plus a select-shaped variant on a 1500-row table whose (k1, k2) pairs
are all unique.  Both fail with `error: limit` against the unfixed
cap; the fix in the next commit makes them pass.
ray_group_fn's RAY_LIST branch had two coupled limitations that bit
multi-key non-agg select-by on H2O-scale workloads:

  1. Hard 1024 cap on unique groups (`max_groups = n < 1024 ? n : 1024`,
     no resizing) — returned `error: limit` once exceeded.
  2. O(N²) linear scan: every row probed every existing group key via
     atom_eq.  At 10M rows × 10k groups this is ~10^11 atom_eq calls —
     8+ minutes per call before the cap was hit; now after lifting the
     cap it'd be that slow on real data.

Replace the linear scan with an open-addressed hash table on
atom_hash, mirroring the scalar / RAY_GUID paths.  atom_hash is a new
helper that walks an atom recursively and produces a hash consistent
with atom_eq's structural compare — composite multi-key composites
hash by combining their cell hashes via ray_hash_combine, so two
[A, 7] composites collide and the equality check on the slot
disambiguates.

Existing patterns this aligns with:
  - ray_hash_* from ops/hash.h (wyhash) — same as pivot.c, datalog.c,
    join.c, collection.c::hs_hash_row.
  - group_ht_t open-addressing — same shape as the GUID and scalar
    paths in the same function (group_ht_init / _grow / _free, GHT_EMPTY
    sentinel, load factor 0.5 grow trigger).
  - group_grow_listkeys mirrors group_grow but also resizes the
    ray_t* keys block; replaces the previous limit-error.

Note collection.c::hs_hash_row's RAY_LIST branch handles atom kinds at
one level only — its default case folds nested-list rows to the same
hash, so distinct/intersect over list-of-lists is also degenerate.
That's outside this fix's scope; this commit only changes ray_group_fn.

Measured on bench/h2o/q9.rfl (G1_1e7_1e2, 10M rows, by {id2 id4} →
10000 groups, pearson_corr v1 v2):

  pre-fix (cap):       error: limit after 2.6s
  pre-fix (cap-grow):  484.7s per query (linear scan)
  this fix (HT):         4.8s per query — ~100× faster

Makes the test added in 27a85ea pass.
Canonical H2O benchmark q9: regression metric per (id2, id4) group.
Polars reference:
  df.groupby(["id2","id4"]).agg((pl.pearson_corr("v1","v2")**2).alias("r2"))

Engine equivalent:
  (select {r2: (pow (pearson_corr v1 v2) 2) by: {id2: id2 id4: id4}
           from: df})

Closes the q9 gap in REQUIREMENTS_CANONICAL_H2O.md — needed
pearson_corr (added 8f974a6), pow (a4bdba6), and the group LIST-
path hash fix (3c6e5c0) to run end-to-end on the K=100 dataset
(100×100 = 10k unique groups exceeds the historical 1024 cap).

Same harness shape as q1/q2/q3/q5/q7: 3 warmup iterations, 5 timed
runs via timeit, exit.
ser-vasilich and others added 16 commits May 17, 2026 12:07
Foundation only — group.c hash-agg path not yet implemented.  ray_group2
+ OP_PEARSON_CORR DAG nodes are emitted by the planner for `(select
(pearson_corr x y) by ...)` shapes, but exec_group will panic on the
unknown opcode until Phase B lands.

Files:
- src/ops/ops.h: OP_PEARSON_CORR=79, agg_ins2 field in OP_GROUP ext
- src/ops/internal.h: GHT_NEED_PEARSON, off_sum_y/off_sumsq_y/off_sumxy,
  agg_is_binary in ght_layout_t
- src/lang/eval.c: pearson_corr promoted to RAY_FN_AGGR | RAY_FN_LAZY_AWARE
- src/ops/graph.c: ray_pearson_corr DAG-builder, ray_group2 (variant
  accepting agg_ins2 sibling array), pointer-fixup for agg_ins2
- src/ops/query.c: resolve_agg_opcode("pearson_corr"); two planner sites
  collect agg_ins2 and dispatch to ray_group2 when any agg is binary
- src/ops/dump.c + test/test_dump.c: opcode name "PEARSON_CORR"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…CORR

Additive changes only — compiles cleanly, no behavioural impact for
existing code paths (no agg uses GHT_NEED_PEARSON yet because the
phase1 packing, accumulator update, and phase3 finalize sites are
still to-do).

- ght_compute_layout: detect OP_PEARSON_CORR via agg_ops, set
  agg_is_binary bit, allocate two consecutive agg_vals slots per binary
  agg (x at s, y at s+1), allocate off_sum_y/off_sumsq_y/off_sumxy
  blocks when GHT_NEED_PEARSON is set.
- ht_path ght_need computation: OP_PEARSON_CORR sets SUM | SUMSQ |
  PEARSON.

Remaining Phase B sites (chain is interdependent — must land together):
  * agg input resolution: read ext->agg_ins2[a] → agg_vecs2[a]
  * radix_phase1_ctx_t.agg_vecs2 + dispatch ctx plumbing
  * radix_phase1_fn + group_rows_range: pack y after x in entry agg_vals
  * init_accum_from_entry + accum_from_entry: write Σy, Σy², Σxy
  * radix phase3 finalize: OP_PEARSON_CORR arm → r = (n·Σxy − Σx·Σy) /
    sqrt((n·Σx² − Σx²)(n·Σy² − Σy²))
  * dense-array bypass: route OP_PEARSON_CORR → ht_path
  * exec.c scalar dispatch (n_keys=0) or lower to OP_GROUP

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires OP_PEARSON_CORR into the radix-partitioned + single-HT group-by
pipeline.  Single-pass two-moments formula matches ray_pearson_corr_fn
(see comment).  All 2406 existing tests pass; pearson_corr.rfl groupby
+ multi-key cases pass through the new opcode path.

Touch list:
- ght_compute_layout: detect OP_PEARSON_CORR via agg_ops, set
  agg_is_binary bit, reserve 2 consecutive agg_vals slots per binary
  agg (x at s, y at s+1); allocate off_sum_y/off_sumsq_y/off_sumxy
  blocks when GHT_NEED_PEARSON.
- ht_path ght_need: OP_PEARSON_CORR → SUM|SUMSQ|PEARSON.
- Agg input resolution: read ext->agg_ins2[a] via the same OP_SCAN /
  OP_CONST / expr_compile ladder used for the x-side.
- All 7 agg_vecs cleanup sites: release agg_vecs2[a] alongside.
- radix_phase1_ctx_t: new agg_vecs2 field, plumbed through both
  call sites + single-HT group_rows_range signature update.
- radix_phase1_fn + group_rows_range: pack y after x in entry agg_vals.
- init_accum_from_entry: seed Σy, Σy², Σxy (both f64 and i64 inputs).
- accum_from_entry: incremental update of Σy, Σy², Σxy in both branches.
- Radix phase-3 finalize: OP_PEARSON_CORR arm —
    r = (n·Σxy − Σx·Σy) / sqrt((n·Σx² − Σx²)(n·Σy² − Σy²))
  Emits NaN for n<2 or constant-side; canonicalize folds → null.
- Dense-array bypass: OP_PEARSON_CORR forces ht_path (da_accum_t
  doesn't have per-worker y-side state yet).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds OP_PEARSON_CORR to two more finalize sites missed in the earlier
Phase B pass: the single-HT (non-radix) path's per-group emit at
group.c:4915 and the two out_type switches at 4644/4861.  Without
these the single-HT code path falls through to `default: v = 0.0`
which is why `make check` saw r²=0 instead of 1.0 for groups where
n>=2 but the planner chose single-HT over radix.

Still WIP — q9 bench at 10m hasn't been re-run since this commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…dbl_inplace

Adds aggr_med_per_group_buf in query.c that recognises `(med col)` in
the eval-fallback path and replaces the per-group ray_at_fn slice +
ray_med_fn scratch allocations with a single reusable scratch buffer
(sized at max_grp_cnt) and an exported in-place quickselect helper
ray_median_dbl_inplace in agg.c.

Skips two ray-vector allocations per group; for q6's 10k-group case
the allocator savings dominate (median compute itself is O(n) and
unchanged).  Reverts to aggr_unary_per_group_buf for non-numeric
inputs (LIST/STR/etc).

OP_MEDIAN opcode + ray_median DAG-builder + prototype are added too,
but not yet wired into the planner — that's a follow-up if we want
median in the OP_GROUP fast path; for now `med` continues to land in
the eval-fallback streaming branch where the new fast path picks it up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the bucket-scatter median pattern from query.c:3582 into the
second non-agg eval site at line 4028.  Modest improvement on q6
(9023→7253ms on 10m); the dominant cost is now per-group random
access into the 80MB v3 column (10000 groups × ~1000 cache-missing
reads each).  Closing the gap with DuckDB needs a real bucket-scatter
OP_MEDIAN that materialises group values into contiguous memory
before quickselect — a separate epic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sized [n_tasks * n_groups] hist/cursor matrices and the serial cumsum
that walks them scale with the dispatch grain, not the worker count.
With 10M rows × 100k groups (q8) the default 8K-morsel grain inflated
hist to ~1GB and the cumsum to ~120M cache-strided ops (~1.4s). Cap
n_tasks at total_workers via ray_pool_dispatch_n; q8 1540ms→162ms,
q6 241ms→121ms, both now faster than DuckDB.
ray_pool_dispatch_n silently clamps task count at MAX_RING_CAP=65536,
so per-group median/topk on >65k groups dropped the tail.  q8 at 10M
rows × 100k id6 groups returned 65536 cells instead of 100000.  Fall
back to elements-based ray_pool_dispatch above the cap (auto-grows
grain), keep dispatch_n below it (best parallelism for small per-group
work).
Pairwise concat loop was O(N²) — for 100k LIST<F64>[2] cells (q8
post-explode) it spent 2s allocating and copying cumulatively-sized
intermediates.  Pre-size one output vector and memcpy each item's
data when all inputs are same-typed fixed-width numerics with no
nulls; q8 explode 2200ms→52ms.
@singaraiona singaraiona merged commit 07dc9e8 into master May 17, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants