diff --git a/src/ops/dump.c b/src/ops/dump.c index cd97a5d8..1c9e3535 100644 --- a/src/ops/dump.c +++ b/src/ops/dump.c @@ -95,6 +95,10 @@ const char* ray_opcode_name(uint16_t op) { case OP_GROUP: return "GROUP"; case OP_GROUP_TOPK_ROWFORM: return "GROUP_TOPK_ROWFORM"; case OP_GROUP_BOTK_ROWFORM: return "GROUP_BOTK_ROWFORM"; + case OP_GROUP_PEARSON_ROWFORM: return "GROUP_PEARSON_ROWFORM"; + case OP_GROUP_MAXMIN_ROWFORM: return "GROUP_MAXMIN_ROWFORM"; + case OP_GROUP_MEDIAN_STDDEV_ROWFORM: return "GROUP_MEDIAN_STDDEV_ROWFORM"; + case OP_GROUP_SUM_COUNT_ROWFORM: return "GROUP_SUM_COUNT_ROWFORM"; case OP_FILTERED_GROUP:return "FILTERED_GROUP"; case OP_PIVOT: return "PIVOT"; case OP_ANTIJOIN: return "ANTIJOIN"; diff --git a/src/ops/exec.c b/src/ops/exec.c index caa28511..a3ec646d 100644 --- a/src/ops/exec.c +++ b/src/ops/exec.c @@ -1240,6 +1240,18 @@ static ray_t* exec_node_inner(ray_graph_t* g, ray_op_t* op) { case OP_GROUP_BOTK_ROWFORM: return exec_group_topk_rowform(g, op); + case OP_GROUP_PEARSON_ROWFORM: + return exec_group_pearson_rowform(g, op); + + case OP_GROUP_MAXMIN_ROWFORM: + return exec_group_maxmin_rowform(g, op); + + case OP_GROUP_MEDIAN_STDDEV_ROWFORM: + return exec_group_median_stddev_rowform(g, op); + + case OP_GROUP_SUM_COUNT_ROWFORM: + return exec_group_sum_count_rowform(g, op); + case OP_PIVOT: { ray_t* tbl = g->table; ray_t* owned_tbl = NULL; diff --git a/src/ops/graph.c b/src/ops/graph.c index 022fcd87..f6856d4d 100644 --- a/src/ops/graph.c +++ b/src/ops/graph.c @@ -911,6 +911,187 @@ ray_op_t* ray_group_topk_rowform(ray_graph_t* g, ray_op_t* key, return &g->nodes[ext->base.id]; } +/* Dedicated per-group Pearson² with row-form emission. Mirrors the + * OP_GROUP ext-node layout (1-2 keys + 2 agg inputs x, y) so downstream + * optimiser passes can introspect ext->keys / ext->agg_ins the same + * way they do for OP_GROUP, but with a distinct opcode that exec.c + * routes to exec_group_pearson_rowform. */ +ray_op_t* ray_group_pearson_rowform(ray_graph_t* g, ray_op_t** keys, + uint8_t n_keys, ray_op_t* x, ray_op_t* y) { + if (!g || !keys || n_keys < 1 || n_keys > 2 || !x || !y) return NULL; + for (uint8_t k = 0; k < n_keys; k++) + if (!keys[k]) return NULL; + + size_t keys_sz = (size_t)n_keys * sizeof(ray_op_t*); + size_t ops_sz = sizeof(uint16_t); + size_t ins_sz = sizeof(ray_op_t*); + size_t ins2_sz = sizeof(ray_op_t*); + size_t ops_off = keys_sz; + /* align */ + ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1); + size_t ins_off = ops_off + ops_sz; + ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1); + size_t ins2_off = ins_off + ins_sz; + ins2_off = (ins2_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1); + + ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins2_off + ins2_sz); + if (!ext) return NULL; + + ext->base.opcode = OP_GROUP_PEARSON_ROWFORM; + ext->base.arity = 0; + ext->base.out_type = RAY_TABLE; + ext->base.est_rows = keys[0]->est_rows; + ext->base.inputs[0] = keys[0]; + + char* trail = EXT_TRAIL(ext); + ext->keys = (ray_op_t**)trail; + for (uint8_t k = 0; k < n_keys; k++) ext->keys[k] = keys[k]; + ext->agg_ops = (uint16_t*)(trail + ops_off); + ext->agg_ops[0] = OP_PEARSON_CORR; + ext->agg_ins = (ray_op_t**)(trail + ins_off); + ext->agg_ins[0] = x; + ext->agg_ins2 = (ray_op_t**)(trail + ins2_off); + ext->agg_ins2[0] = y; + ext->agg_k = NULL; + ext->n_keys = n_keys; + ext->n_aggs = 1; + + g->nodes[ext->base.id] = ext->base; + return &g->nodes[ext->base.id]; +} + +/* Dedicated per-group max(x) + min(y) with row-form emission. Mirror + * of ray_group_pearson_rowform with 1 key and 2 fixed-state aggs + * (OP_MAX and OP_MIN). */ +ray_op_t* ray_group_maxmin_rowform(ray_graph_t* g, ray_op_t* key, + ray_op_t* x, ray_op_t* y) { + if (!g || !key || !x || !y) return NULL; + + size_t keys_sz = sizeof(ray_op_t*); + size_t ops_sz = 2 * sizeof(uint16_t); + size_t ins_sz = 2 * sizeof(ray_op_t*); + size_t ops_off = keys_sz; + ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1); + size_t ins_off = ops_off + ops_sz; + ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1); + + ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz); + if (!ext) return NULL; + + ext->base.opcode = OP_GROUP_MAXMIN_ROWFORM; + ext->base.arity = 0; + ext->base.out_type = RAY_TABLE; + ext->base.est_rows = key->est_rows; + ext->base.inputs[0] = key; + + char* trail = EXT_TRAIL(ext); + ext->keys = (ray_op_t**)trail; + ext->keys[0] = key; + ext->agg_ops = (uint16_t*)(trail + ops_off); + ext->agg_ops[0] = OP_MAX; + ext->agg_ops[1] = OP_MIN; + ext->agg_ins = (ray_op_t**)(trail + ins_off); + ext->agg_ins[0] = x; + ext->agg_ins[1] = y; + ext->agg_ins2 = NULL; + ext->agg_k = NULL; + ext->n_keys = 1; + ext->n_aggs = 2; + + g->nodes[ext->base.id] = ext->base; + return &g->nodes[ext->base.id]; +} + +/* Dedicated per-group median(v) + std(v) with row-form emission. Mirror + * of ray_group_maxmin_rowform with 2 keys and 2 aggs on the same + * value column (OP_MEDIAN and OP_STDDEV). */ +ray_op_t* ray_group_median_stddev_rowform(ray_graph_t* g, ray_op_t** keys, + ray_op_t* v, int with_count) { + if (!g || !keys || !keys[0] || !keys[1] || !v) return NULL; + + uint8_t n_aggs = with_count ? 3 : 2; + size_t keys_sz = 2 * sizeof(ray_op_t*); + size_t ops_sz = (size_t)n_aggs * sizeof(uint16_t); + size_t ins_sz = (size_t)n_aggs * sizeof(ray_op_t*); + size_t ops_off = keys_sz; + ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1); + size_t ins_off = ops_off + ops_sz; + ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1); + + ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz); + if (!ext) return NULL; + + ext->base.opcode = OP_GROUP_MEDIAN_STDDEV_ROWFORM; + ext->base.arity = 0; + ext->base.out_type = RAY_TABLE; + ext->base.est_rows = keys[0]->est_rows; + ext->base.inputs[0] = keys[0]; + + char* trail = EXT_TRAIL(ext); + ext->keys = (ray_op_t**)trail; + ext->keys[0] = keys[0]; + ext->keys[1] = keys[1]; + ext->agg_ops = (uint16_t*)(trail + ops_off); + ext->agg_ops[0] = OP_MEDIAN; + ext->agg_ops[1] = OP_STDDEV; + if (with_count) ext->agg_ops[2] = OP_COUNT; + ext->agg_ins = (ray_op_t**)(trail + ins_off); + ext->agg_ins[0] = v; + ext->agg_ins[1] = v; + if (with_count) ext->agg_ins[2] = v; + ext->agg_ins2 = NULL; + ext->agg_k = NULL; + ext->n_keys = 2; + ext->n_aggs = n_aggs; + + g->nodes[ext->base.id] = ext->base; + return &g->nodes[ext->base.id]; +} + +/* Dedicated multi-key per-group sum(v) + count(v) with row-form + * emission. N keys (3..8) packed inline in the ext node trail. Mirror + * of ray_group_median_stddev_rowform extended for variadic keys. */ +ray_op_t* ray_group_sum_count_rowform(ray_graph_t* g, ray_op_t** keys, + uint8_t n_keys, ray_op_t* v) { + if (!g || !keys || !v || n_keys < 3 || n_keys > 8) return NULL; + for (uint8_t k = 0; k < n_keys; k++) + if (!keys[k]) return NULL; + + size_t keys_sz = (size_t)n_keys * sizeof(ray_op_t*); + size_t ops_sz = 2 * sizeof(uint16_t); + size_t ins_sz = 2 * sizeof(ray_op_t*); + size_t ops_off = keys_sz; + ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1); + size_t ins_off = ops_off + ops_sz; + ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1); + + ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz); + if (!ext) return NULL; + + ext->base.opcode = OP_GROUP_SUM_COUNT_ROWFORM; + ext->base.arity = 0; + ext->base.out_type = RAY_TABLE; + ext->base.est_rows = keys[0]->est_rows; + ext->base.inputs[0] = keys[0]; + + char* trail = EXT_TRAIL(ext); + ext->keys = (ray_op_t**)trail; + for (uint8_t k = 0; k < n_keys; k++) ext->keys[k] = keys[k]; + ext->agg_ops = (uint16_t*)(trail + ops_off); + ext->agg_ops[0] = OP_SUM; + ext->agg_ops[1] = OP_COUNT; + ext->agg_ins = (ray_op_t**)(trail + ins_off); + ext->agg_ins[0] = v; + ext->agg_ins[1] = v; + ext->agg_ins2 = NULL; + ext->agg_k = NULL; + ext->n_keys = n_keys; + ext->n_aggs = 2; + + g->nodes[ext->base.id] = ext->base; + return &g->nodes[ext->base.id]; +} + ray_op_t* ray_pivot_op(ray_graph_t* g, ray_op_t** index_cols, uint8_t n_index, ray_op_t* pivot_col, diff --git a/src/ops/group.c b/src/ops/group.c index d4b083b4..e3a563b2 100644 --- a/src/ops/group.c +++ b/src/ops/group.c @@ -9268,3 +9268,2228 @@ ray_t* exec_group_topk_rowform(ray_graph_t* g, ray_op_t* op) { return result; } + +/* ════════════════════════════════════════════════════════════════════════ + * exec_group_pearson_rowform — dedicated single-pass per-group Pearson² + * with row-form emission. Bypasses Anton's master-merge regression in + * the shared radix HT path by using a private morsel-scatter + + * per-partition open-addressing HT pipeline that mirrors + * exec_group_topk_rowform. + * + * Algorithm: + * Phase 1: morsel-parallel scan reads (k0[,k1], x, y) per row, + * composes hash from key(s), scatters fat entries into + * per-(worker, partition) buffers — no contention. + * Phase 2: RADIX_P parallel tasks build a per-partition HT. Each + * entry holds the fixed Pearson state (Σx, Σy, Σx², Σy², + * Σxy, cnt). Each scatter entry probes/inserts and + * accumulates in-place. + * Phase 3: walk all partition HTs, compute r² from state, emit + * (key0[, key1], r²) row form. + * + * Per-row scatter stride: 40 B (hash + 2×key + 2×val). 1-key shape + * writes 0 in key1 slot (wastes 8 B per row — acceptable; the + * dominant cost is HT work in phase 2, not scatter bandwidth). + * ════════════════════════════════════════════════════════════════════════ */ + +#define GRPC_SCATTER_STRIDE 40u + +typedef struct { + char* data; /* [count * GRPC_SCATTER_STRIDE] */ + uint32_t count; + uint32_t cap; + bool oom; + ray_t* _hdr; +} grpc_scat_buf_t; + +/* Per-group Pearson accumulator state. All sums kept as doubles + * (mirrors OP_PEARSON_CORR finalize formula in radix_phase3_fn). */ +typedef struct { + int64_t key0; + int64_t key1; + double sum_x; + double sum_y; + double sumsq_x; + double sumsq_y; + double sumxy; + int64_t cnt; +} grpc_entry_t; + +typedef struct { + uint32_t* slots; /* [cap]: packed (salt:8 | idx:24); GRPC_EMPTY = UINT32_MAX */ + grpc_entry_t* entries; /* [entry_cap] */ + uint32_t count; + uint32_t cap; /* slot count, power of 2 */ + uint32_t entry_cap; /* entries allocated */ + bool oom; + ray_t* _slots_hdr; + ray_t* _entries_hdr; +} grpc_ht_t; + +#define GRPC_EMPTY UINT32_MAX +#define GRPC_PACK(salt, idx) (((uint32_t)(uint8_t)(salt) << 24) | ((idx) & 0xFFFFFF)) +#define GRPC_IDX(s) ((s) & 0xFFFFFF) +#define GRPC_SALT(s) ((uint8_t)((s) >> 24)) +#define GRPC_HASH_SALT(h) ((uint8_t)((h) >> 56)) + +static bool grpc_ht_init(grpc_ht_t* ht, uint32_t init_cap) { + memset(ht, 0, sizeof(*ht)); + if (init_cap < 32) init_cap = 32; + uint32_t cap = 1; + while (cap < init_cap) cap <<= 1; + ht->cap = cap; + ht->entry_cap = cap / 2; + if (ht->entry_cap < 16) ht->entry_cap = 16; + + ht->slots = (uint32_t*)scratch_alloc(&ht->_slots_hdr, (size_t)cap * 4); + if (!ht->slots) { ht->oom = true; return false; } + memset(ht->slots, 0xFF, (size_t)cap * 4); + + ht->entries = (grpc_entry_t*)scratch_alloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpc_entry_t)); + if (!ht->entries) { ht->oom = true; return false; } + return true; +} + +static void grpc_ht_free(grpc_ht_t* ht) { + if (ht->_slots_hdr) scratch_free(ht->_slots_hdr); + if (ht->_entries_hdr) scratch_free(ht->_entries_hdr); + memset(ht, 0, sizeof(*ht)); +} + +static bool grpc_ht_grow_slots(grpc_ht_t* ht) { + uint32_t old_cap = ht->cap; + uint32_t new_cap = old_cap * 2; + ray_t* new_hdr = NULL; + uint32_t* new_slots = (uint32_t*)scratch_alloc(&new_hdr, (size_t)new_cap * 4); + if (!new_slots) { ht->oom = true; return false; } + memset(new_slots, 0xFF, (size_t)new_cap * 4); + + uint32_t mask = new_cap - 1; + for (uint32_t i = 0; i < ht->count; i++) { + grpc_entry_t* e = &ht->entries[i]; + /* Recompute hash from canonical keys. */ + uint64_t h = ray_hash_i64(e->key0); + h = ray_hash_combine(h, ray_hash_i64(e->key1)); + uint32_t p = (uint32_t)(h & mask); + uint8_t salt = GRPC_HASH_SALT(h); + for (;;) { + if (new_slots[p] == GRPC_EMPTY) { + new_slots[p] = GRPC_PACK(salt, i); + break; + } + p = (p + 1) & mask; + } + } + scratch_free(ht->_slots_hdr); + ht->_slots_hdr = new_hdr; + ht->slots = new_slots; + ht->cap = new_cap; + return true; +} + +static bool grpc_ht_grow_entries(grpc_ht_t* ht) { + uint32_t new_ecap = ht->entry_cap * 2; + grpc_entry_t* new_e = (grpc_entry_t*)scratch_realloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpc_entry_t), + (size_t)new_ecap * sizeof(grpc_entry_t)); + if (!new_e) { ht->oom = true; return false; } + ht->entries = new_e; + ht->entry_cap = new_ecap; + return true; +} + +/* Probe-or-insert: returns entry pointer initialized to zero on miss. */ +static inline grpc_entry_t* +grpc_ht_get(grpc_ht_t* ht, uint64_t hash, int64_t k0, int64_t k1) { + if (ht->cap == 0 || (ht->count + 1) * 2 > ht->cap) { + if (!grpc_ht_grow_slots(ht)) return NULL; + } + if (ht->count >= ht->entry_cap) { + if (!grpc_ht_grow_entries(ht)) return NULL; + } + uint32_t mask = ht->cap - 1; + uint32_t p = (uint32_t)(hash & mask); + uint8_t salt = GRPC_HASH_SALT(hash); + for (;;) { + uint32_t s = ht->slots[p]; + if (s == GRPC_EMPTY) { + uint32_t idx = ht->count++; + ht->slots[p] = GRPC_PACK(salt, idx); + grpc_entry_t* e = &ht->entries[idx]; + memset(e, 0, sizeof(*e)); + e->key0 = k0; + e->key1 = k1; + return e; + } + if (GRPC_SALT(s) == salt) { + grpc_entry_t* e = &ht->entries[GRPC_IDX(s)]; + if (e->key0 == k0 && e->key1 == k1) + return e; + } + p = (p + 1) & mask; + } +} + +/* ─── Phase 1 ────────────────────────────────────────────────────────── + * Per-worker scan: read (k0[, k1], x, y) per row, hash, scatter into + * partition buckets. Skips rows with null x, y, or any key. */ + +typedef struct { + const void* k0_data; + const void* k1_data; /* NULL if n_keys == 1 */ + const void* x_data; + const void* y_data; + int8_t k0_type; + int8_t k1_type; + int8_t x_type; + int8_t y_type; + uint8_t k0_attrs; + uint8_t k1_attrs; + const uint8_t* k0_null_bm; + const uint8_t* k1_null_bm; + const uint8_t* x_null_bm; + const uint8_t* y_null_bm; + uint8_t n_keys; + uint8_t x_is_f64; + uint8_t y_is_f64; + grpc_scat_buf_t* bufs; /* [n_workers * RADIX_P] */ + uint32_t n_workers; +} grpc_phase1_ctx_t; + +static inline bool grpc_is_null(const uint8_t* nbm, int64_t row) { + return (nbm[row >> 3] >> (row & 7)) & 1; +} + +static inline double grpc_val_read_dbl(const void* base, int8_t t, int64_t row, + int is_f64) { + if (is_f64) { + double v; memcpy(&v, (const char*)base + (size_t)row*8, 8); return v; + } + /* Cast int → double (matches OP_PEARSON_CORR finalize). */ + return (double)read_col_i64(base, row, t, 0); +} + +static inline void grpc_scat_push(grpc_scat_buf_t* buf, uint64_t hash, + int64_t k0, int64_t k1, + double x, double y) { + if (__builtin_expect(buf->count >= buf->cap, 0)) { + uint32_t old_cap = buf->cap ? buf->cap : 64; + uint32_t new_cap = old_cap * 2; + char* new_data = (char*)scratch_realloc(&buf->_hdr, + (size_t)buf->cap * GRPC_SCATTER_STRIDE, + (size_t)new_cap * GRPC_SCATTER_STRIDE); + if (!new_data) { buf->oom = true; return; } + buf->data = new_data; + buf->cap = new_cap; + } + char* dst = buf->data + (size_t)buf->count * GRPC_SCATTER_STRIDE; + memcpy(dst, &hash, 8); + memcpy(dst + 8, &k0, 8); + memcpy(dst + 16, &k1, 8); + memcpy(dst + 24, &x, 8); + memcpy(dst + 32, &y, 8); + buf->count++; +} + +static void grpc_phase1_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + grpc_phase1_ctx_t* c = (grpc_phase1_ctx_t*)ctx_v; + grpc_scat_buf_t* my_bufs = &c->bufs[(size_t)worker_id * RADIX_P]; + + for (int64_t r = start; r < end; r++) { + if (c->x_null_bm && grpc_is_null(c->x_null_bm, r)) continue; + if (c->y_null_bm && grpc_is_null(c->y_null_bm, r)) continue; + if (c->k0_null_bm && grpc_is_null(c->k0_null_bm, r)) continue; + if (c->n_keys == 2 && c->k1_null_bm && grpc_is_null(c->k1_null_bm, r)) + continue; + int64_t k0 = read_col_i64(c->k0_data, r, c->k0_type, c->k0_attrs); + int64_t k1 = 0; + uint64_t h = ray_hash_i64(k0); + if (c->n_keys == 2) { + k1 = read_col_i64(c->k1_data, r, c->k1_type, c->k1_attrs); + h = ray_hash_combine(h, ray_hash_i64(k1)); + } else { + h = ray_hash_combine(h, ray_hash_i64(0)); + } + double x = grpc_val_read_dbl(c->x_data, c->x_type, r, c->x_is_f64); + double y = grpc_val_read_dbl(c->y_data, c->y_type, r, c->y_is_f64); + uint32_t part = RADIX_PART(h); + grpc_scat_push(&my_bufs[part], h, k0, k1, x, y); + } +} + +/* ─── Phase 2 ────────────────────────────────────────────────────────── + * RADIX_P tasks. Each builds a partition HT and accumulates Pearson + * state from the scatter entries in its partition. */ + +typedef struct { + grpc_scat_buf_t* bufs; + uint32_t n_workers; + grpc_ht_t* part_hts; + int64_t* part_emit_rows; /* [RADIX_P]: grp count per partition */ +} grpc_phase2_ctx_t; + +static void grpc_phase2_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpc_phase2_ctx_t* c = (grpc_phase2_ctx_t*)ctx_v; + + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpc_ht_t* ph = &c->part_hts[p]; + if (!grpc_ht_init(ph, 8192)) return; + + for (uint32_t w = 0; w < c->n_workers; w++) { + grpc_scat_buf_t* buf = &c->bufs[(size_t)w * RADIX_P + p]; + if (!buf->data || buf->oom) continue; + uint32_t nbuf = buf->count; + const char* base = buf->data; + + /* Stride-ahead prefetch on slot array. */ + enum { PF_DIST = 8 }; + uint32_t pf_end = (nbuf < PF_DIST) ? nbuf : PF_DIST; + uint32_t mask = ph->cap - 1; + for (uint32_t j = 0; j < pf_end; j++) { + uint64_t h; + memcpy(&h, base + (size_t)j * GRPC_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(h & mask)], 0, 1); + } + for (uint32_t i = 0; i < nbuf; i++) { + if (i + PF_DIST < nbuf) { + uint64_t hpf; + memcpy(&hpf, + base + (size_t)(i + PF_DIST) * GRPC_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(hpf & (ph->cap - 1))], 0, 1); + } + uint64_t h; + int64_t k0, k1; + double x, y; + const char* e = base + (size_t)i * GRPC_SCATTER_STRIDE; + memcpy(&h, e, 8); + memcpy(&k0, e + 8, 8); + memcpy(&k1, e + 16, 8); + memcpy(&x, e + 24, 8); + memcpy(&y, e + 32, 8); + grpc_entry_t* me = grpc_ht_get(ph, h, k0, k1); + if (!me) return; + me->sum_x += x; + me->sum_y += y; + me->sumsq_x += x * x; + me->sumsq_y += y * y; + me->sumxy += x * y; + me->cnt += 1; + } + } + + c->part_emit_rows[p] = (int64_t)ph->count; + } +} + +/* Public entry: invoked from exec.c on OP_GROUP_PEARSON_ROWFORM. */ +ray_t* exec_group_pearson_rowform(ray_graph_t* g, ray_op_t* op) { + ray_op_ext_t* ext = find_ext(g, op->id); + if (!ext || ext->n_keys < 1 || ext->n_keys > 2 || ext->n_aggs != 1 + || !ext->agg_ins || !ext->agg_ins2) + return ray_error("domain", "group_pearson_rowform: bad shape"); + + ray_t* tbl = g->table; + if (!tbl || RAY_IS_ERR(tbl)) return tbl; + + /* Resolve key vectors. */ + ray_t* k_vecs[2] = { NULL, NULL }; + int64_t k_syms[2] = { 0, 0 }; + int8_t k_types[2] = { 0, 0 }; + uint8_t k_attrs[2] = { 0, 0 }; + for (uint8_t k = 0; k < ext->n_keys; k++) { + ray_op_ext_t* kext = find_ext(g, ext->keys[k]->id); + if (!kext || kext->base.opcode != OP_SCAN) + return ray_error("domain", "group_pearson_rowform: non-scan key"); + k_vecs[k] = ray_table_get_col(tbl, kext->sym); + if (!k_vecs[k]) + return ray_error("domain", "group_pearson_rowform: key column missing"); + k_syms[k] = kext->sym; + k_types[k] = k_vecs[k]->type; + k_attrs[k] = k_vecs[k]->attrs; + int8_t kt = k_types[k]; + if (kt != RAY_I64 && kt != RAY_I32 && kt != RAY_I16 && kt != RAY_U8 && + kt != RAY_BOOL && kt != RAY_DATE && kt != RAY_TIME && + kt != RAY_TIMESTAMP && kt != RAY_SYM) + return ray_error("nyi", "group_pearson_rowform: key type"); + } + + /* Resolve x, y. */ + ray_op_ext_t* xext = find_ext(g, ext->agg_ins[0]->id); + ray_op_ext_t* yext = find_ext(g, ext->agg_ins2[0]->id); + if (!xext || !yext || xext->base.opcode != OP_SCAN || yext->base.opcode != OP_SCAN) + return ray_error("domain", "group_pearson_rowform: non-scan val"); + ray_t* x_vec = ray_table_get_col(tbl, xext->sym); + ray_t* y_vec = ray_table_get_col(tbl, yext->sym); + if (!x_vec || !y_vec) + return ray_error("domain", "group_pearson_rowform: val column missing"); + int8_t xt = x_vec->type, yt = y_vec->type; + int xt_ok = (xt == RAY_I64 || xt == RAY_I32 || xt == RAY_I16 || + xt == RAY_U8 || xt == RAY_BOOL || xt == RAY_F64); + int yt_ok = (yt == RAY_I64 || yt == RAY_I32 || yt == RAY_I16 || + yt == RAY_U8 || yt == RAY_BOOL || yt == RAY_F64); + if (!xt_ok || !yt_ok) + return ray_error("nyi", "group_pearson_rowform: val type"); + + int64_t nrows = k_vecs[0]->len; + int64_t y_used_sym = yext->sym; + (void)y_used_sym; + /* Output sym for r² column: use y's name as default (the planner will + * supply a different name via the surrounding select; here we just + * preserve a valid sym). */ + int64_t r2_sym = ray_sym_intern("r", 1); + if (nrows == 0) { + ray_t* out = ray_table_new((int64_t)ext->n_keys + 1); + for (uint8_t k = 0; k < ext->n_keys; k++) { + ray_t* ev = (k_types[k] == RAY_SYM) + ? ray_sym_vec_new(k_attrs[k] & RAY_SYM_W_MASK, 0) + : ray_vec_new(k_types[k], 0); + out = ray_table_add_col(out, k_syms[k], ev); + ray_release(ev); + } + ray_t* r2v = ray_vec_new(RAY_F64, 0); + out = ray_table_add_col(out, r2_sym, r2v); + ray_release(r2v); + return out; + } + + ray_pool_t* pool = ray_pool_get(); + uint32_t n_workers = pool ? ray_pool_total_workers(pool) : 1; + bool parallel = pool && nrows >= 16384; + if (!parallel) n_workers = 1; + + size_t n_bufs = (size_t)n_workers * RADIX_P; + ray_t* bufs_hdr = NULL; + grpc_scat_buf_t* bufs = (grpc_scat_buf_t*)scratch_calloc(&bufs_hdr, + n_bufs * sizeof(grpc_scat_buf_t)); + if (!bufs) return ray_error("oom", NULL); + + uint32_t init_cap = 256; + for (size_t i = 0; i < n_bufs; i++) { + bufs[i].data = (char*)scratch_alloc(&bufs[i]._hdr, + (size_t)init_cap * GRPC_SCATTER_STRIDE); + if (!bufs[i].data) { + for (size_t j = 0; j <= i; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + bufs[i].cap = init_cap; + } + + grpc_phase1_ctx_t p1 = { + .k0_data = ray_data(k_vecs[0]), + .k1_data = (ext->n_keys == 2) ? ray_data(k_vecs[1]) : NULL, + .x_data = ray_data(x_vec), + .y_data = ray_data(y_vec), + .k0_type = k_types[0], + .k1_type = k_types[1], + .x_type = xt, + .y_type = yt, + .k0_attrs = k_attrs[0], + .k1_attrs = k_attrs[1], + .k0_null_bm = (k_vecs[0]->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(k_vecs[0], NULL, NULL) : NULL, + .k1_null_bm = (ext->n_keys == 2 && (k_vecs[1]->attrs & RAY_ATTR_HAS_NULLS)) + ? ray_vec_nullmap_bytes(k_vecs[1], NULL, NULL) : NULL, + .x_null_bm = (x_vec->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(x_vec, NULL, NULL) : NULL, + .y_null_bm = (y_vec->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(y_vec, NULL, NULL) : NULL, + .n_keys = ext->n_keys, + .x_is_f64 = (xt == RAY_F64) ? 1 : 0, + .y_is_f64 = (yt == RAY_F64) ? 1 : 0, + .bufs = bufs, + .n_workers = n_workers, + }; + + if (parallel) { + ray_pool_dispatch(pool, grpc_phase1_fn, &p1, nrows); + } else { + grpc_phase1_fn(&p1, 0, 0, nrows); + } + + for (size_t i = 0; i < n_bufs; i++) { + if (bufs[i].oom) { + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + /* Phase 2. */ + ray_t* phts_hdr = NULL; + grpc_ht_t* part_hts = (grpc_ht_t*)scratch_calloc(&phts_hdr, + (size_t)RADIX_P * sizeof(grpc_ht_t)); + ray_t* per_hdr = NULL; + int64_t* part_emit_rows = (int64_t*)scratch_calloc(&per_hdr, + (size_t)RADIX_P * sizeof(int64_t)); + if (!part_hts || !part_emit_rows) { + if (phts_hdr) scratch_free(phts_hdr); + if (per_hdr) scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + grpc_phase2_ctx_t p2 = { + .bufs = bufs, + .n_workers = n_workers, + .part_hts = part_hts, + .part_emit_rows = part_emit_rows, + }; + if (parallel) { + ray_pool_dispatch_n(pool, grpc_phase2_fn, &p2, RADIX_P); + } else { + grpc_phase2_fn(&p2, 0, 0, RADIX_P); + } + + for (uint32_t p = 0; p < RADIX_P; p++) { + if (part_hts[p].oom) { + for (uint32_t i = 0; i < RADIX_P; i++) grpc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + /* Phase 3 — emit row form. Allocate output columns sized to total + * entries, fill sequentially by walking partitions in order. */ + int64_t total_rows = 0; + for (uint32_t p = 0; p < RADIX_P; p++) total_rows += part_emit_rows[p]; + + ray_t* k0_out = (k_types[0] == RAY_SYM) + ? ray_sym_vec_new(k_attrs[0] & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(k_types[0], total_rows); + ray_t* k1_out = NULL; + if (ext->n_keys == 2) + k1_out = (k_types[1] == RAY_SYM) + ? ray_sym_vec_new(k_attrs[1] & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(k_types[1], total_rows); + ray_t* r2_out = ray_vec_new(RAY_F64, total_rows); + if (!k0_out || !r2_out || (ext->n_keys == 2 && !k1_out)) { + if (k0_out) ray_release(k0_out); + if (k1_out) ray_release(k1_out); + if (r2_out) ray_release(r2_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + k0_out->len = total_rows; + if (k1_out) k1_out->len = total_rows; + r2_out->len = total_rows; + + void* k0_data_out = ray_data(k0_out); + void* k1_data_out = k1_out ? ray_data(k1_out) : NULL; + double* r2_data = (double*)ray_data(r2_out); + + int64_t row = 0; + /* Mark r² as nullable since cnt<2 / degenerate cases emit NaN. */ + bool r2_has_nulls = false; + ray_t* r2_nbm_hdr = NULL; + uint8_t* r2_nbm = NULL; + for (uint32_t p = 0; p < RADIX_P; p++) { + grpc_ht_t* ph = &part_hts[p]; + for (uint32_t i = 0; i < ph->count; i++) { + grpc_entry_t* e = &ph->entries[i]; + write_col_i64(k0_data_out, row, e->key0, k_types[0], k_attrs[0]); + if (k1_data_out) + write_col_i64(k1_data_out, row, e->key1, k_types[1], k_attrs[1]); + + double r2 = 0.0 / 0.0; /* NaN by default */ + if (e->cnt >= 2) { + double n = (double)e->cnt; + double num = n * e->sumxy - e->sum_x * e->sum_y; + double dx = n * e->sumsq_x - e->sum_x * e->sum_x; + double dy = n * e->sumsq_y - e->sum_y * e->sum_y; + if (dx > 0.0 && dy > 0.0) { + double r = num / sqrt(dx * dy); + r2 = r * r; + } + } + r2_data[row] = r2; + row++; + } + } + (void)r2_has_nulls; (void)r2_nbm_hdr; (void)r2_nbm; + + /* Build output table. Columns: keys then r². */ + ray_t* result = ray_table_new((int64_t)ext->n_keys + 1); + if (!result || RAY_IS_ERR(result)) { + ray_release(k0_out); + if (k1_out) ray_release(k1_out); + ray_release(r2_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return result ? result : ray_error("oom", NULL); + } + result = ray_table_add_col(result, k_syms[0], k0_out); + if (k1_out) + result = ray_table_add_col(result, k_syms[1], k1_out); + result = ray_table_add_col(result, r2_sym, r2_out); + ray_release(k0_out); + if (k1_out) ray_release(k1_out); + ray_release(r2_out); + + for (uint32_t i = 0; i < RADIX_P; i++) grpc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); + scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + + return result; +} + +/* ════════════════════════════════════════════════════════════════════════ + * exec_group_maxmin_rowform — dedicated per-group max(x) + min(y) with + * row-form emission. Same morsel-scatter + per-partition HT pattern + * as exec_group_pearson_rowform. Closes q7's first stage + * (max(v1) + min(v2) by id3). Integer x/y; SYM/I64 key. + * ════════════════════════════════════════════════════════════════════════ */ + +#define GRPMM_SCATTER_STRIDE 32u + +typedef struct { + char* data; + uint32_t count; + uint32_t cap; + bool oom; + ray_t* _hdr; +} grpmm_scat_buf_t; + +typedef struct { + int64_t key; + int64_t max_x; + int64_t min_y; + int64_t cnt; +} grpmm_entry_t; + +typedef struct { + uint32_t* slots; + grpmm_entry_t* entries; + uint32_t count; + uint32_t cap; + uint32_t entry_cap; + bool oom; + ray_t* _slots_hdr; + ray_t* _entries_hdr; +} grpmm_ht_t; + +#define GRPMM_EMPTY UINT32_MAX +#define GRPMM_PACK(salt, idx) (((uint32_t)(uint8_t)(salt) << 24) | ((idx) & 0xFFFFFF)) +#define GRPMM_IDX(s) ((s) & 0xFFFFFF) +#define GRPMM_SALT(s) ((uint8_t)((s) >> 24)) +#define GRPMM_HASH_SALT(h) ((uint8_t)((h) >> 56)) + +static bool grpmm_ht_init(grpmm_ht_t* ht, uint32_t init_cap) { + memset(ht, 0, sizeof(*ht)); + if (init_cap < 32) init_cap = 32; + uint32_t cap = 1; + while (cap < init_cap) cap <<= 1; + ht->cap = cap; + ht->entry_cap = cap / 2; + if (ht->entry_cap < 16) ht->entry_cap = 16; + ht->slots = (uint32_t*)scratch_alloc(&ht->_slots_hdr, (size_t)cap * 4); + if (!ht->slots) { ht->oom = true; return false; } + memset(ht->slots, 0xFF, (size_t)cap * 4); + ht->entries = (grpmm_entry_t*)scratch_alloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpmm_entry_t)); + if (!ht->entries) { ht->oom = true; return false; } + return true; +} + +static void grpmm_ht_free(grpmm_ht_t* ht) { + if (ht->_slots_hdr) scratch_free(ht->_slots_hdr); + if (ht->_entries_hdr) scratch_free(ht->_entries_hdr); + memset(ht, 0, sizeof(*ht)); +} + +static bool grpmm_ht_grow_slots(grpmm_ht_t* ht) { + uint32_t old_cap = ht->cap, new_cap = old_cap * 2; + ray_t* new_hdr = NULL; + uint32_t* new_slots = (uint32_t*)scratch_alloc(&new_hdr, (size_t)new_cap * 4); + if (!new_slots) { ht->oom = true; return false; } + memset(new_slots, 0xFF, (size_t)new_cap * 4); + uint32_t mask = new_cap - 1; + for (uint32_t i = 0; i < ht->count; i++) { + grpmm_entry_t* e = &ht->entries[i]; + uint64_t h = ray_hash_i64(e->key); + uint32_t p = (uint32_t)(h & mask); + uint8_t salt = GRPMM_HASH_SALT(h); + for (;;) { + if (new_slots[p] == GRPMM_EMPTY) { + new_slots[p] = GRPMM_PACK(salt, i); break; + } + p = (p + 1) & mask; + } + } + scratch_free(ht->_slots_hdr); + ht->_slots_hdr = new_hdr; ht->slots = new_slots; ht->cap = new_cap; + return true; +} + +static bool grpmm_ht_grow_entries(grpmm_ht_t* ht) { + uint32_t new_ecap = ht->entry_cap * 2; + grpmm_entry_t* new_e = (grpmm_entry_t*)scratch_realloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpmm_entry_t), + (size_t)new_ecap * sizeof(grpmm_entry_t)); + if (!new_e) { ht->oom = true; return false; } + ht->entries = new_e; ht->entry_cap = new_ecap; + return true; +} + +static inline grpmm_entry_t* +grpmm_ht_get(grpmm_ht_t* ht, uint64_t hash, int64_t k) { + if (ht->cap == 0 || (ht->count + 1) * 2 > ht->cap) { + if (!grpmm_ht_grow_slots(ht)) return NULL; + } + if (ht->count >= ht->entry_cap) { + if (!grpmm_ht_grow_entries(ht)) return NULL; + } + uint32_t mask = ht->cap - 1; + uint32_t p = (uint32_t)(hash & mask); + uint8_t salt = GRPMM_HASH_SALT(hash); + for (;;) { + uint32_t s = ht->slots[p]; + if (s == GRPMM_EMPTY) { + uint32_t idx = ht->count++; + ht->slots[p] = GRPMM_PACK(salt, idx); + grpmm_entry_t* e = &ht->entries[idx]; + e->key = k; + e->max_x = INT64_MIN; + e->min_y = INT64_MAX; + e->cnt = 0; + return e; + } + if (GRPMM_SALT(s) == salt) { + grpmm_entry_t* e = &ht->entries[GRPMM_IDX(s)]; + if (e->key == k) return e; + } + p = (p + 1) & mask; + } +} + +typedef struct { + const void* k_data; + const void* x_data; + const void* y_data; + int8_t k_type; + int8_t x_type; + int8_t y_type; + uint8_t k_attrs; + const uint8_t* k_null_bm; + const uint8_t* x_null_bm; + const uint8_t* y_null_bm; + grpmm_scat_buf_t* bufs; + uint32_t n_workers; +} grpmm_phase1_ctx_t; + +static inline void grpmm_scat_push(grpmm_scat_buf_t* buf, uint64_t hash, + int64_t k, int64_t x, int64_t y) { + if (__builtin_expect(buf->count >= buf->cap, 0)) { + uint32_t old_cap = buf->cap ? buf->cap : 64; + uint32_t new_cap = old_cap * 2; + char* new_data = (char*)scratch_realloc(&buf->_hdr, + (size_t)buf->cap * GRPMM_SCATTER_STRIDE, + (size_t)new_cap * GRPMM_SCATTER_STRIDE); + if (!new_data) { buf->oom = true; return; } + buf->data = new_data; buf->cap = new_cap; + } + char* dst = buf->data + (size_t)buf->count * GRPMM_SCATTER_STRIDE; + memcpy(dst, &hash, 8); + memcpy(dst + 8, &k, 8); + memcpy(dst + 16, &x, 8); + memcpy(dst + 24, &y, 8); + buf->count++; +} + +static void grpmm_phase1_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + grpmm_phase1_ctx_t* c = (grpmm_phase1_ctx_t*)ctx_v; + grpmm_scat_buf_t* my_bufs = &c->bufs[(size_t)worker_id * RADIX_P]; + + for (int64_t r = start; r < end; r++) { + if (c->x_null_bm && (c->x_null_bm[r >> 3] >> (r & 7)) & 1) continue; + if (c->y_null_bm && (c->y_null_bm[r >> 3] >> (r & 7)) & 1) continue; + if (c->k_null_bm && (c->k_null_bm[r >> 3] >> (r & 7)) & 1) continue; + int64_t k = read_col_i64(c->k_data, r, c->k_type, c->k_attrs); + int64_t x = read_col_i64(c->x_data, r, c->x_type, 0); + int64_t y = read_col_i64(c->y_data, r, c->y_type, 0); + uint64_t h = ray_hash_i64(k); + uint32_t part = RADIX_PART(h); + grpmm_scat_push(&my_bufs[part], h, k, x, y); + } +} + +typedef struct { + grpmm_scat_buf_t* bufs; + uint32_t n_workers; + grpmm_ht_t* part_hts; + int64_t* part_emit_rows; +} grpmm_phase2_ctx_t; + +static void grpmm_phase2_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpmm_phase2_ctx_t* c = (grpmm_phase2_ctx_t*)ctx_v; + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpmm_ht_t* ph = &c->part_hts[p]; + if (!grpmm_ht_init(ph, 8192)) return; + for (uint32_t w = 0; w < c->n_workers; w++) { + grpmm_scat_buf_t* buf = &c->bufs[(size_t)w * RADIX_P + p]; + if (!buf->data || buf->oom) continue; + uint32_t nbuf = buf->count; + const char* base = buf->data; + enum { PF_DIST = 8 }; + uint32_t pf_end = (nbuf < PF_DIST) ? nbuf : PF_DIST; + uint32_t mask = ph->cap - 1; + for (uint32_t j = 0; j < pf_end; j++) { + uint64_t h; memcpy(&h, base + (size_t)j * GRPMM_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(h & mask)], 0, 1); + } + for (uint32_t i = 0; i < nbuf; i++) { + if (i + PF_DIST < nbuf) { + uint64_t hpf; + memcpy(&hpf, base + (size_t)(i + PF_DIST) * GRPMM_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(hpf & (ph->cap - 1))], 0, 1); + } + uint64_t h; + int64_t k, x, y; + const char* e = base + (size_t)i * GRPMM_SCATTER_STRIDE; + memcpy(&h, e, 8); + memcpy(&k, e + 8, 8); + memcpy(&x, e + 16, 8); + memcpy(&y, e + 24, 8); + grpmm_entry_t* me = grpmm_ht_get(ph, h, k); + if (!me) return; + if (me->cnt == 0) { + me->max_x = x; + me->min_y = y; + } else { + if (x > me->max_x) me->max_x = x; + if (y < me->min_y) me->min_y = y; + } + me->cnt++; + } + } + c->part_emit_rows[p] = (int64_t)ph->count; + } +} + +ray_t* exec_group_maxmin_rowform(ray_graph_t* g, ray_op_t* op) { + ray_op_ext_t* ext = find_ext(g, op->id); + if (!ext || ext->n_keys != 1 || ext->n_aggs != 2 || !ext->agg_ins) + return ray_error("domain", "group_maxmin_rowform: bad shape"); + + ray_t* tbl = g->table; + if (!tbl || RAY_IS_ERR(tbl)) return tbl; + + ray_op_ext_t* kext = find_ext(g, ext->keys[0]->id); + ray_op_ext_t* xext = find_ext(g, ext->agg_ins[0]->id); + ray_op_ext_t* yext = find_ext(g, ext->agg_ins[1]->id); + if (!kext || !xext || !yext || + kext->base.opcode != OP_SCAN || + xext->base.opcode != OP_SCAN || + yext->base.opcode != OP_SCAN) + return ray_error("domain", "group_maxmin_rowform: non-scan child"); + + ray_t* k_vec = ray_table_get_col(tbl, kext->sym); + ray_t* x_vec = ray_table_get_col(tbl, xext->sym); + ray_t* y_vec = ray_table_get_col(tbl, yext->sym); + if (!k_vec || !x_vec || !y_vec) + return ray_error("domain", "group_maxmin_rowform: column missing"); + + int8_t kt = k_vec->type, xt = x_vec->type, yt = y_vec->type; + int kt_ok = (kt == RAY_I64 || kt == RAY_I32 || kt == RAY_I16 || + kt == RAY_U8 || kt == RAY_BOOL || kt == RAY_DATE || + kt == RAY_TIME || kt == RAY_TIMESTAMP || kt == RAY_SYM); + int xt_int = (xt == RAY_I64 || xt == RAY_I32 || xt == RAY_I16 || + xt == RAY_U8 || xt == RAY_BOOL); + int yt_int = (yt == RAY_I64 || yt == RAY_I32 || yt == RAY_I16 || + yt == RAY_U8 || yt == RAY_BOOL); + if (!kt_ok || !xt_int || !yt_int) + return ray_error("nyi", "group_maxmin_rowform: type"); + + int64_t nrows = k_vec->len; + if (nrows == 0) { + ray_t* out = ray_table_new(3); + ray_t* k0 = (kt == RAY_SYM) + ? ray_sym_vec_new(k_vec->attrs & RAY_SYM_W_MASK, 0) + : ray_vec_new(kt, 0); + ray_t* x0 = ray_vec_new(xt, 0); + ray_t* y0 = ray_vec_new(yt, 0); + out = ray_table_add_col(out, kext->sym, k0); + out = ray_table_add_col(out, xext->sym, x0); + out = ray_table_add_col(out, yext->sym, y0); + ray_release(k0); ray_release(x0); ray_release(y0); + return out; + } + + ray_pool_t* pool = ray_pool_get(); + uint32_t n_workers = pool ? ray_pool_total_workers(pool) : 1; + bool parallel = pool && nrows >= 16384; + if (!parallel) n_workers = 1; + + size_t n_bufs = (size_t)n_workers * RADIX_P; + ray_t* bufs_hdr = NULL; + grpmm_scat_buf_t* bufs = (grpmm_scat_buf_t*)scratch_calloc(&bufs_hdr, + n_bufs * sizeof(grpmm_scat_buf_t)); + if (!bufs) return ray_error("oom", NULL); + uint32_t init_cap = 256; + for (size_t i = 0; i < n_bufs; i++) { + bufs[i].data = (char*)scratch_alloc(&bufs[i]._hdr, + (size_t)init_cap * GRPMM_SCATTER_STRIDE); + if (!bufs[i].data) { + for (size_t j = 0; j <= i; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + bufs[i].cap = init_cap; + } + + grpmm_phase1_ctx_t p1 = { + .k_data = ray_data(k_vec), + .x_data = ray_data(x_vec), + .y_data = ray_data(y_vec), + .k_type = kt, .x_type = xt, .y_type = yt, + .k_attrs = k_vec->attrs, + .k_null_bm = (k_vec->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(k_vec, NULL, NULL) : NULL, + .x_null_bm = (x_vec->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(x_vec, NULL, NULL) : NULL, + .y_null_bm = (y_vec->attrs & RAY_ATTR_HAS_NULLS) + ? ray_vec_nullmap_bytes(y_vec, NULL, NULL) : NULL, + .bufs = bufs, + .n_workers = n_workers, + }; + if (parallel) + ray_pool_dispatch(pool, grpmm_phase1_fn, &p1, nrows); + else + grpmm_phase1_fn(&p1, 0, 0, nrows); + + for (size_t i = 0; i < n_bufs; i++) { + if (bufs[i].oom) { + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + ray_t* phts_hdr = NULL; + grpmm_ht_t* part_hts = (grpmm_ht_t*)scratch_calloc(&phts_hdr, + (size_t)RADIX_P * sizeof(grpmm_ht_t)); + ray_t* per_hdr = NULL; + int64_t* part_emit_rows = (int64_t*)scratch_calloc(&per_hdr, + (size_t)RADIX_P * sizeof(int64_t)); + if (!part_hts || !part_emit_rows) { + if (phts_hdr) scratch_free(phts_hdr); + if (per_hdr) scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + grpmm_phase2_ctx_t p2 = { + .bufs = bufs, .n_workers = n_workers, + .part_hts = part_hts, .part_emit_rows = part_emit_rows, + }; + if (parallel) + ray_pool_dispatch_n(pool, grpmm_phase2_fn, &p2, RADIX_P); + else + grpmm_phase2_fn(&p2, 0, 0, RADIX_P); + + for (uint32_t p = 0; p < RADIX_P; p++) { + if (part_hts[p].oom) { + for (uint32_t i = 0; i < RADIX_P; i++) grpmm_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + int64_t total_rows = 0; + for (uint32_t p = 0; p < RADIX_P; p++) total_rows += part_emit_rows[p]; + + ray_t* k_out = (kt == RAY_SYM) + ? ray_sym_vec_new(k_vec->attrs & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(kt, total_rows); + ray_t* x_out = ray_vec_new(xt, total_rows); + ray_t* y_out = ray_vec_new(yt, total_rows); + if (!k_out || !x_out || !y_out) { + if (k_out) ray_release(k_out); + if (x_out) ray_release(x_out); + if (y_out) ray_release(y_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpmm_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + k_out->len = total_rows; + x_out->len = total_rows; + y_out->len = total_rows; + + void* k_dst = ray_data(k_out); + void* x_dst = ray_data(x_out); + void* y_dst = ray_data(y_out); + + int64_t row = 0; + for (uint32_t p = 0; p < RADIX_P; p++) { + grpmm_ht_t* ph = &part_hts[p]; + for (uint32_t i = 0; i < ph->count; i++) { + grpmm_entry_t* e = &ph->entries[i]; + write_col_i64(k_dst, row, e->key, kt, k_vec->attrs); + write_col_i64(x_dst, row, e->max_x, xt, 0); + write_col_i64(y_dst, row, e->min_y, yt, 0); + row++; + } + } + + ray_t* result = ray_table_new(3); + result = ray_table_add_col(result, kext->sym, k_out); + result = ray_table_add_col(result, xext->sym, x_out); + result = ray_table_add_col(result, yext->sym, y_out); + ray_release(k_out); ray_release(x_out); ray_release(y_out); + + for (uint32_t i = 0; i < RADIX_P; i++) grpmm_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + + return result; +} + +/* ════════════════════════════════════════════════════════════════════════ + * exec_group_median_stddev_rowform — dedicated per-group median(v) + std(v) + * with row-form emission. Same morsel-scatter + per-partition HT pattern + * as exec_group_pearson_rowform / exec_group_maxmin_rowform, extended with + * a per-partition append-only F64 value buffer for the holistic quantile + * kernel. Closes q6 canonical (median(v3), std(v3) by id4, id5). + * + * Bypasses the shared OP_GROUP path's two-stage holistic fill (reprobe + + * histogram + scatter) by computing both aggregates from a single radix + * pipeline: + * Phase 1 (parallel): scatter rows into per-(worker,partition) bufs + * as (hash, key0, key1, v3) fat entries. + * Phase 2 (parallel per partition): + * Pass 1 — probe HT, accumulate {cnt, sum, sumsq} per group. + * Cumsum cnt → per-group offsets into the partition's v_buf. + * Pass 2 — re-walk entries, scatter v3 into v_buf at the + * bucketed position for each group. + * Result: per-partition v_buf is group-contiguous, ready for + * a per-group quickselect (no cross-partition scatter). + * Phase 3 (parallel per partition): + * For each group, run ray_median_dbl_inplace on its slice and + * emit median + std(sample) into the output columns. + * ════════════════════════════════════════════════════════════════════════ */ + +#define GRPMS_SCATTER_STRIDE 32u + +typedef struct { + char* data; + uint32_t count; + uint32_t cap; + bool oom; + ray_t* _hdr; +} grpms_scat_buf_t; + +typedef struct { + int64_t key0; + int64_t key1; + int64_t cnt; /* count of non-null v3 added */ + double sum; + double sumsq; + uint32_t val_off; /* offset into ph->v_buf for this group's slice */ + uint32_t val_pos; /* cursor during Phase 2 Pass 2 (scatter v3) */ +} grpms_entry_t; + +typedef struct { + uint32_t* slots; + grpms_entry_t* entries; + uint32_t count; + uint32_t cap; /* slot count, power of 2 */ + uint32_t entry_cap; + double* v_buf; /* sized to total_cnt over the partition */ + uint32_t v_buf_cap; + bool oom; + ray_t* _slots_hdr; + ray_t* _entries_hdr; + ray_t* _v_buf_hdr; +} grpms_ht_t; + +#define GRPMS_EMPTY UINT32_MAX +#define GRPMS_PACK(salt, idx) (((uint32_t)(uint8_t)(salt) << 24) | ((idx) & 0xFFFFFF)) +#define GRPMS_IDX(s) ((s) & 0xFFFFFF) +#define GRPMS_SALT(s) ((uint8_t)((s) >> 24)) +#define GRPMS_HASH_SALT(h) ((uint8_t)((h) >> 56)) + +static bool grpms_ht_init(grpms_ht_t* ht, uint32_t init_cap) { + memset(ht, 0, sizeof(*ht)); + if (init_cap < 32) init_cap = 32; + uint32_t cap = 1; + while (cap < init_cap) cap <<= 1; + ht->cap = cap; + ht->entry_cap = cap / 2; + if (ht->entry_cap < 16) ht->entry_cap = 16; + ht->slots = (uint32_t*)scratch_alloc(&ht->_slots_hdr, (size_t)cap * 4); + if (!ht->slots) { ht->oom = true; return false; } + memset(ht->slots, 0xFF, (size_t)cap * 4); + ht->entries = (grpms_entry_t*)scratch_alloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpms_entry_t)); + if (!ht->entries) { ht->oom = true; return false; } + return true; +} + +static void grpms_ht_free(grpms_ht_t* ht) { + if (ht->_slots_hdr) scratch_free(ht->_slots_hdr); + if (ht->_entries_hdr) scratch_free(ht->_entries_hdr); + if (ht->_v_buf_hdr) scratch_free(ht->_v_buf_hdr); + memset(ht, 0, sizeof(*ht)); +} + +static bool grpms_ht_grow_slots(grpms_ht_t* ht) { + uint32_t old_cap = ht->cap, new_cap = old_cap * 2; + ray_t* new_hdr = NULL; + uint32_t* new_slots = (uint32_t*)scratch_alloc(&new_hdr, (size_t)new_cap * 4); + if (!new_slots) { ht->oom = true; return false; } + memset(new_slots, 0xFF, (size_t)new_cap * 4); + uint32_t mask = new_cap - 1; + for (uint32_t i = 0; i < ht->count; i++) { + grpms_entry_t* e = &ht->entries[i]; + uint64_t h = ray_hash_combine(ray_hash_i64(e->key0), ray_hash_i64(e->key1)); + uint32_t pp = (uint32_t)(h & mask); + uint8_t salt = GRPMS_HASH_SALT(h); + for (;;) { + if (new_slots[pp] == GRPMS_EMPTY) { + new_slots[pp] = GRPMS_PACK(salt, i); break; + } + pp = (pp + 1) & mask; + } + } + scratch_free(ht->_slots_hdr); + ht->_slots_hdr = new_hdr; ht->slots = new_slots; ht->cap = new_cap; + return true; +} + +static bool grpms_ht_grow_entries(grpms_ht_t* ht) { + uint32_t new_ecap = ht->entry_cap * 2; + grpms_entry_t* new_e = (grpms_entry_t*)scratch_realloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * sizeof(grpms_entry_t), + (size_t)new_ecap * sizeof(grpms_entry_t)); + if (!new_e) { ht->oom = true; return false; } + ht->entries = new_e; ht->entry_cap = new_ecap; + return true; +} + +static inline grpms_entry_t* +grpms_ht_get(grpms_ht_t* ht, uint64_t hash, int64_t k0, int64_t k1) { + if (ht->cap == 0 || (ht->count + 1) * 2 > ht->cap) { + if (!grpms_ht_grow_slots(ht)) return NULL; + } + if (ht->count >= ht->entry_cap) { + if (!grpms_ht_grow_entries(ht)) return NULL; + } + uint32_t mask = ht->cap - 1; + uint32_t pp = (uint32_t)(hash & mask); + uint8_t salt = GRPMS_HASH_SALT(hash); + for (;;) { + uint32_t s = ht->slots[pp]; + if (s == GRPMS_EMPTY) { + uint32_t idx = ht->count++; + ht->slots[pp] = GRPMS_PACK(salt, idx); + grpms_entry_t* e = &ht->entries[idx]; + e->key0 = k0; e->key1 = k1; + e->cnt = 0; e->sum = 0.0; e->sumsq = 0.0; + e->val_off = 0; e->val_pos = 0; + return e; + } + if (GRPMS_SALT(s) == salt) { + grpms_entry_t* e = &ht->entries[GRPMS_IDX(s)]; + if (e->key0 == k0 && e->key1 == k1) return e; + } + pp = (pp + 1) & mask; + } +} + +/* Lookup-only variant for Pass 2 (HT fully built; never inserts). */ +static inline grpms_entry_t* +grpms_ht_lookup(grpms_ht_t* ht, uint64_t hash, int64_t k0, int64_t k1) { + uint32_t mask = ht->cap - 1; + uint32_t pp = (uint32_t)(hash & mask); + uint8_t salt = GRPMS_HASH_SALT(hash); + for (;;) { + uint32_t s = ht->slots[pp]; + if (s == GRPMS_EMPTY) return NULL; + if (GRPMS_SALT(s) == salt) { + grpms_entry_t* e = &ht->entries[GRPMS_IDX(s)]; + if (e->key0 == k0 && e->key1 == k1) return e; + } + pp = (pp + 1) & mask; + } +} + +typedef struct { + const void* k0_data; + const void* k1_data; + const void* v_data; + int8_t k0_type; + int8_t k1_type; + int8_t v_type; + uint8_t k0_attrs; + uint8_t k1_attrs; + grpms_scat_buf_t* bufs; + uint32_t n_workers; +} grpms_phase1_ctx_t; + +static inline void grpms_scat_push(grpms_scat_buf_t* buf, uint64_t hash, + int64_t k0, int64_t k1, int64_t v_bits) { + if (__builtin_expect(buf->count >= buf->cap, 0)) { + uint32_t old_cap = buf->cap ? buf->cap : 64; + uint32_t new_cap = old_cap * 2; + char* new_data = (char*)scratch_realloc(&buf->_hdr, + (size_t)buf->cap * GRPMS_SCATTER_STRIDE, + (size_t)new_cap * GRPMS_SCATTER_STRIDE); + if (!new_data) { buf->oom = true; return; } + buf->data = new_data; buf->cap = new_cap; + } + char* dst = buf->data + (size_t)buf->count * GRPMS_SCATTER_STRIDE; + memcpy(dst, &hash, 8); + memcpy(dst + 8, &k0, 8); + memcpy(dst + 16, &k1, 8); + memcpy(dst + 24, &v_bits, 8); + buf->count++; +} + +static void grpms_phase1_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + grpms_phase1_ctx_t* c = (grpms_phase1_ctx_t*)ctx_v; + grpms_scat_buf_t* my_bufs = &c->bufs[(size_t)worker_id * RADIX_P]; + bool v_is_f64 = (c->v_type == RAY_F64); + + for (int64_t r = start; r < end; r++) { + int64_t k0 = read_col_i64(c->k0_data, r, c->k0_type, c->k0_attrs); + int64_t k1 = read_col_i64(c->k1_data, r, c->k1_type, c->k1_attrs); + int64_t v_bits; + if (v_is_f64) { + memcpy(&v_bits, &((const double*)c->v_data)[r], 8); + } else { + int64_t vi = read_col_i64(c->v_data, r, c->v_type, 0); + double vd = (double)vi; + memcpy(&v_bits, &vd, 8); + } + uint64_t h = ray_hash_combine(ray_hash_i64(k0), ray_hash_i64(k1)); + uint32_t part = RADIX_PART(h); + grpms_scat_push(&my_bufs[part], h, k0, k1, v_bits); + } +} + +typedef struct { + grpms_scat_buf_t* bufs; + uint32_t n_workers; + grpms_ht_t* part_hts; + int64_t* part_emit_rows; +} grpms_phase2_ctx_t; + +static void grpms_phase2_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpms_phase2_ctx_t* c = (grpms_phase2_ctx_t*)ctx_v; + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpms_ht_t* ph = &c->part_hts[p]; + + /* Estimate initial HT cap from total entries for this partition. */ + uint32_t total_entries = 0; + for (uint32_t w = 0; w < c->n_workers; w++) + total_entries += c->bufs[(size_t)w * RADIX_P + p].count; + if (total_entries == 0) { c->part_emit_rows[p] = 0; continue; } + uint32_t init_ht = 64; + while (init_ht < total_entries / 4 && init_ht < (1u << 20)) init_ht <<= 1; + if (!grpms_ht_init(ph, init_ht)) return; + + /* Pass 1: probe, accumulate {cnt, sum, sumsq}. */ + for (uint32_t w = 0; w < c->n_workers; w++) { + grpms_scat_buf_t* buf = &c->bufs[(size_t)w * RADIX_P + p]; + if (!buf->data || buf->oom) continue; + const char* base = buf->data; + uint32_t nbuf = buf->count; + enum { PF_DIST = 8 }; + uint32_t pf_end = (nbuf < PF_DIST) ? nbuf : PF_DIST; + uint32_t mask = ph->cap - 1; + for (uint32_t j = 0; j < pf_end; j++) { + uint64_t h; memcpy(&h, base + (size_t)j * GRPMS_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(h & mask)], 0, 1); + } + for (uint32_t i = 0; i < nbuf; i++) { + if (i + PF_DIST < nbuf) { + uint64_t hpf; + memcpy(&hpf, base + (size_t)(i + PF_DIST) * GRPMS_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(hpf & (ph->cap - 1))], 0, 1); + } + uint64_t h; + int64_t k0, k1, v_bits; + const char* e = base + (size_t)i * GRPMS_SCATTER_STRIDE; + memcpy(&h, e, 8); + memcpy(&k0, e + 8, 8); + memcpy(&k1, e + 16, 8); + memcpy(&v_bits, e + 24, 8); + grpms_entry_t* me = grpms_ht_get(ph, h, k0, k1); + if (!me) return; + double v; memcpy(&v, &v_bits, 8); + me->cnt++; + me->sum += v; + me->sumsq += v * v; + } + } + + /* Cumsum cnt → val_off; allocate v_buf. */ + uint32_t total_v = 0; + for (uint32_t g = 0; g < ph->count; g++) { + ph->entries[g].val_off = total_v; + ph->entries[g].val_pos = total_v; + total_v += (uint32_t)ph->entries[g].cnt; + } + ph->v_buf_cap = total_v; + ph->v_buf = (double*)scratch_alloc(&ph->_v_buf_hdr, + (size_t)(total_v > 0 ? total_v : 1) * sizeof(double)); + if (!ph->v_buf) { ph->oom = true; return; } + + /* Pass 2: probe (lookup only — HT is full), scatter v into v_buf. */ + for (uint32_t w = 0; w < c->n_workers; w++) { + grpms_scat_buf_t* buf = &c->bufs[(size_t)w * RADIX_P + p]; + if (!buf->data || buf->oom) continue; + const char* base = buf->data; + uint32_t nbuf = buf->count; + enum { PF_DIST = 8 }; + uint32_t pf_end = (nbuf < PF_DIST) ? nbuf : PF_DIST; + uint32_t mask = ph->cap - 1; + for (uint32_t j = 0; j < pf_end; j++) { + uint64_t h; memcpy(&h, base + (size_t)j * GRPMS_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(h & mask)], 0, 1); + } + for (uint32_t i = 0; i < nbuf; i++) { + if (i + PF_DIST < nbuf) { + uint64_t hpf; + memcpy(&hpf, base + (size_t)(i + PF_DIST) * GRPMS_SCATTER_STRIDE, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(hpf & (ph->cap - 1))], 0, 1); + } + uint64_t h; + int64_t k0, k1, v_bits; + const char* e = base + (size_t)i * GRPMS_SCATTER_STRIDE; + memcpy(&h, e, 8); + memcpy(&k0, e + 8, 8); + memcpy(&k1, e + 16, 8); + memcpy(&v_bits, e + 24, 8); + grpms_entry_t* me = grpms_ht_lookup(ph, h, k0, k1); + if (!me) continue; /* shouldn't happen after Pass 1 */ + double v; memcpy(&v, &v_bits, 8); + ph->v_buf[me->val_pos++] = v; + } + } + + c->part_emit_rows[p] = (int64_t)ph->count; + } +} + +typedef struct { + grpms_ht_t* part_hts; + const int64_t* part_offsets; /* [RADIX_P+1] */ + int8_t k0_type; + int8_t k1_type; + uint8_t k0_attrs; + uint8_t k1_attrs; + void* k0_out; + void* k1_out; + double* med_out; + double* std_out; + int64_t* cnt_out; /* NULL when caller doesn't want a count col */ + ray_t* std_vec; /* for null bit writes when cnt<=1 */ +} grpms_phase3_ctx_t; + +static void grpms_phase3_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpms_phase3_ctx_t* c = (grpms_phase3_ctx_t*)ctx_v; + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpms_ht_t* ph = &c->part_hts[p]; + int64_t base_row = c->part_offsets[p]; + for (uint32_t g = 0; g < ph->count; g++) { + grpms_entry_t* e = &ph->entries[g]; + int64_t out_row = base_row + (int64_t)g; + write_col_i64(c->k0_out, out_row, e->key0, c->k0_type, c->k0_attrs); + write_col_i64(c->k1_out, out_row, e->key1, c->k1_type, c->k1_attrs); + + /* Median: quickselect on the group's slice (in-place). */ + int64_t n = e->cnt; + if (n <= 0) { + c->med_out[out_row] = 0.0; + } else { + double* slice = ph->v_buf + e->val_off; + c->med_out[out_row] = ray_median_dbl_inplace(slice, n); + } + + /* Stddev (sample): sqrt((sumsq - sum²/n) / (n-1)); NaN/null when n<2. */ + if (n < 2) { + c->std_out[out_row] = 0.0; + ray_vec_set_null(c->std_vec, out_row, true); + } else { + double nd = (double)n; + double var = (e->sumsq - e->sum * e->sum / nd) / (nd - 1.0); + if (var < 0.0) var = 0.0; /* fp noise guard */ + c->std_out[out_row] = sqrt(var); + } + if (c->cnt_out) c->cnt_out[out_row] = n; + } + } +} + +ray_t* exec_group_median_stddev_rowform(ray_graph_t* g, ray_op_t* op) { + ray_op_ext_t* ext = find_ext(g, op->id); + if (!ext || ext->n_keys != 2 || + (ext->n_aggs != 2 && ext->n_aggs != 3) || !ext->agg_ins) + return ray_error("domain", "group_median_stddev_rowform: bad shape"); + bool with_count = (ext->n_aggs == 3); + + ray_t* tbl = g->table; + if (!tbl || RAY_IS_ERR(tbl)) return tbl; + + /* Resolve keys. */ + ray_op_ext_t* k0ext = find_ext(g, ext->keys[0]->id); + ray_op_ext_t* k1ext = find_ext(g, ext->keys[1]->id); + ray_op_ext_t* vext = find_ext(g, ext->agg_ins[0]->id); + if (!k0ext || !k1ext || !vext + || k0ext->base.opcode != OP_SCAN + || k1ext->base.opcode != OP_SCAN + || vext->base.opcode != OP_SCAN) + return ray_error("domain", "group_median_stddev_rowform: non-scan child"); + + ray_t* k0_vec = ray_table_get_col(tbl, k0ext->sym); + ray_t* k1_vec = ray_table_get_col(tbl, k1ext->sym); + ray_t* v_vec = ray_table_get_col(tbl, vext->sym); + if (!k0_vec || !k1_vec || !v_vec) + return ray_error("domain", "group_median_stddev_rowform: column missing"); + + int8_t k0t = k0_vec->type, k1t = k1_vec->type, vt = v_vec->type; + int kt_ok = 1; + int8_t kts[2] = { k0t, k1t }; + for (int i = 0; i < 2; i++) { + int8_t kt = kts[i]; + if (kt != RAY_I64 && kt != RAY_I32 && kt != RAY_I16 && kt != RAY_U8 && + kt != RAY_BOOL && kt != RAY_DATE && kt != RAY_TIME && + kt != RAY_TIMESTAMP && kt != RAY_SYM) + kt_ok = 0; + } + int vt_ok = (vt == RAY_I64 || vt == RAY_I32 || vt == RAY_I16 || + vt == RAY_U8 || vt == RAY_BOOL || vt == RAY_F64); + if (!kt_ok || !vt_ok) + return ray_error("nyi", "group_median_stddev_rowform: type"); + + /* Planner gates non-nullable; defensive guard here too. */ + if ((k0_vec->attrs & RAY_ATTR_HAS_NULLS) || + (k1_vec->attrs & RAY_ATTR_HAS_NULLS) || + (v_vec->attrs & RAY_ATTR_HAS_NULLS)) + return ray_error("nyi", "group_median_stddev_rowform: nullable"); + + int64_t nrows = k0_vec->len; + int64_t med_sym = ray_sym_intern("v_median", 8); + int64_t std_sym = ray_sym_intern("v_std", 5); + int64_t cnt_sym = ray_sym_intern("v_count", 7); + int64_t ncols = with_count ? 5 : 4; + if (nrows == 0) { + ray_t* out = ray_table_new(ncols); + ray_t* k0e = (k0t == RAY_SYM) ? ray_sym_vec_new(k0_vec->attrs & RAY_SYM_W_MASK, 0) + : ray_vec_new(k0t, 0); + ray_t* k1e = (k1t == RAY_SYM) ? ray_sym_vec_new(k1_vec->attrs & RAY_SYM_W_MASK, 0) + : ray_vec_new(k1t, 0); + ray_t* mev = ray_vec_new(RAY_F64, 0); + ray_t* sdv = ray_vec_new(RAY_F64, 0); + out = ray_table_add_col(out, k0ext->sym, k0e); + out = ray_table_add_col(out, k1ext->sym, k1e); + out = ray_table_add_col(out, med_sym, mev); + out = ray_table_add_col(out, std_sym, sdv); + if (with_count) { + ray_t* cnv = ray_vec_new(RAY_I64, 0); + out = ray_table_add_col(out, cnt_sym, cnv); + ray_release(cnv); + } + ray_release(k0e); ray_release(k1e); ray_release(mev); ray_release(sdv); + return out; + } + + ray_pool_t* pool = ray_pool_get(); + uint32_t n_workers = pool ? ray_pool_total_workers(pool) : 1; + bool parallel = pool && nrows >= 16384; + if (!parallel) n_workers = 1; + + size_t n_bufs = (size_t)n_workers * RADIX_P; + ray_t* bufs_hdr = NULL; + grpms_scat_buf_t* bufs = (grpms_scat_buf_t*)scratch_calloc(&bufs_hdr, + n_bufs * sizeof(grpms_scat_buf_t)); + if (!bufs) return ray_error("oom", NULL); + + uint32_t init_cap = 256; + for (size_t i = 0; i < n_bufs; i++) { + bufs[i].data = (char*)scratch_alloc(&bufs[i]._hdr, + (size_t)init_cap * GRPMS_SCATTER_STRIDE); + if (!bufs[i].data) { + for (size_t j = 0; j <= i; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + bufs[i].cap = init_cap; + } + + grpms_phase1_ctx_t p1 = { + .k0_data = ray_data(k0_vec), + .k1_data = ray_data(k1_vec), + .v_data = ray_data(v_vec), + .k0_type = k0t, + .k1_type = k1t, + .v_type = vt, + .k0_attrs = k0_vec->attrs, + .k1_attrs = k1_vec->attrs, + .bufs = bufs, + .n_workers = n_workers, + }; + if (parallel) { + ray_pool_dispatch(pool, grpms_phase1_fn, &p1, nrows); + } else { + grpms_phase1_fn(&p1, 0, 0, nrows); + } + + for (size_t i = 0; i < n_bufs; i++) { + if (bufs[i].oom) { + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + /* Phase 2. */ + ray_t* phts_hdr = NULL; + grpms_ht_t* part_hts = (grpms_ht_t*)scratch_calloc(&phts_hdr, + (size_t)RADIX_P * sizeof(grpms_ht_t)); + ray_t* per_hdr = NULL; + int64_t* part_emit_rows = (int64_t*)scratch_calloc(&per_hdr, + (size_t)RADIX_P * sizeof(int64_t)); + if (!part_hts || !part_emit_rows) { + if (phts_hdr) scratch_free(phts_hdr); + if (per_hdr) scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + grpms_phase2_ctx_t p2 = { + .bufs = bufs, + .n_workers = n_workers, + .part_hts = part_hts, + .part_emit_rows = part_emit_rows, + }; + if (parallel) { + ray_pool_dispatch_n(pool, grpms_phase2_fn, &p2, RADIX_P); + } else { + grpms_phase2_fn(&p2, 0, 0, RADIX_P); + } + + for (uint32_t p = 0; p < RADIX_P; p++) { + if (part_hts[p].oom) { + for (uint32_t i = 0; i < RADIX_P; i++) grpms_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + /* Scatter bufs no longer needed — release before Phase 3 to lower peak RSS. */ + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) { scratch_free(bufs[j]._hdr); bufs[j]._hdr = NULL; } + scratch_free(bufs_hdr); bufs_hdr = NULL; bufs = NULL; + + /* Prefix sum → part_offsets. */ + ray_t* po_hdr = NULL; + int64_t* part_offsets = (int64_t*)scratch_alloc(&po_hdr, + (size_t)(RADIX_P + 1) * sizeof(int64_t)); + if (!part_offsets) { + for (uint32_t i = 0; i < RADIX_P; i++) grpms_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + return ray_error("oom", NULL); + } + int64_t total_rows = 0; + for (uint32_t p = 0; p < RADIX_P; p++) { + part_offsets[p] = total_rows; + total_rows += part_emit_rows[p]; + } + part_offsets[RADIX_P] = total_rows; + + /* Allocate output columns. */ + ray_t* k0_out = (k0t == RAY_SYM) + ? ray_sym_vec_new(k0_vec->attrs & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(k0t, total_rows); + ray_t* k1_out = (k1t == RAY_SYM) + ? ray_sym_vec_new(k1_vec->attrs & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(k1t, total_rows); + ray_t* med_out = ray_vec_new(RAY_F64, total_rows); + ray_t* std_out = ray_vec_new(RAY_F64, total_rows); + ray_t* cnt_out = with_count ? ray_vec_new(RAY_I64, total_rows) : NULL; + if (!k0_out || !k1_out || !med_out || !std_out || + (with_count && !cnt_out) || + RAY_IS_ERR(k0_out) || RAY_IS_ERR(k1_out) || + RAY_IS_ERR(med_out) || RAY_IS_ERR(std_out) || + (with_count && RAY_IS_ERR(cnt_out))) { + if (k0_out) ray_release(k0_out); + if (k1_out) ray_release(k1_out); + if (med_out) ray_release(med_out); + if (std_out) ray_release(std_out); + if (cnt_out) ray_release(cnt_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpms_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + return ray_error("oom", NULL); + } + k0_out->len = total_rows; + k1_out->len = total_rows; + med_out->len = total_rows; + std_out->len = total_rows; + if (cnt_out) cnt_out->len = total_rows; + + /* Phase 3: per partition, emit keys + median + stddev. */ + grpms_phase3_ctx_t p3 = { + .part_hts = part_hts, + .part_offsets = part_offsets, + .k0_type = k0t, + .k1_type = k1t, + .k0_attrs = k0_vec->attrs, + .k1_attrs = k1_vec->attrs, + .k0_out = ray_data(k0_out), + .k1_out = ray_data(k1_out), + .med_out = (double*)ray_data(med_out), + .std_out = (double*)ray_data(std_out), + .cnt_out = cnt_out ? (int64_t*)ray_data(cnt_out) : NULL, + .std_vec = std_out, + }; + if (parallel) { + ray_pool_dispatch_n(pool, grpms_phase3_fn, &p3, RADIX_P); + } else { + grpms_phase3_fn(&p3, 0, 0, RADIX_P); + } + + /* Build output table. */ + ray_t* result = ray_table_new(ncols); + if (!result || RAY_IS_ERR(result)) { + ray_release(k0_out); ray_release(k1_out); + ray_release(med_out); ray_release(std_out); + if (cnt_out) ray_release(cnt_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpms_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + return result ? result : ray_error("oom", NULL); + } + result = ray_table_add_col(result, k0ext->sym, k0_out); + result = ray_table_add_col(result, k1ext->sym, k1_out); + result = ray_table_add_col(result, med_sym, med_out); + result = ray_table_add_col(result, std_sym, std_out); + if (cnt_out) result = ray_table_add_col(result, cnt_sym, cnt_out); + ray_release(k0_out); ray_release(k1_out); + ray_release(med_out); ray_release(std_out); + if (cnt_out) ray_release(cnt_out); + + for (uint32_t i = 0; i < RADIX_P; i++) grpms_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + + return result; +} + +/* ════════════════════════════════════════════════════════════════════════ + * exec_group_sum_count_rowform — dedicated multi-key (N=3..8) group-by + * with SUM(v) + COUNT(v). Specialized for canonical H2O q10 shape + * `(select (sum v) (count v) from t by k1 k2 .. kN)`. Bypasses Anton- + * merge slowdown on the shared OP_GROUP path: skips agg_strlen probes, + * direct-array eligibility scans (which always fail on q10's 6-key + * composite), rowsel branches, nullable defensive checks, and the + * holistic-fill scaffolding. Inputs gated non-nullable; integer/SYM + * keys; integer or F64 v. Variadic key count via VLA-style entries + * (stride = (N + 2) × 8). + * ════════════════════════════════════════════════════════════════════════ */ + +/* Entry layout: hash (8) + keys[N] (N*8) + v (8). Stride = 16 + 8N. */ +typedef struct { + char* data; + uint32_t count; + uint32_t cap; + bool oom; + ray_t* _hdr; +} grpsc_scat_buf_t; + +/* HT entry: keys[N] (N*8) + cnt (8) + sum (8). Stride = 16 + 8N. */ +typedef struct { + uint32_t* slots; + char* entries; /* variable-size rows */ + uint32_t count; + uint32_t cap; /* slot count, pow2 */ + uint32_t entry_cap; + uint16_t entry_stride; + uint8_t n_keys; + bool oom; + ray_t* _slots_hdr; + ray_t* _entries_hdr; +} grpsc_ht_t; + +#define GRPSC_EMPTY UINT32_MAX +#define GRPSC_PACK(salt, idx) (((uint32_t)(uint8_t)(salt) << 24) | ((idx) & 0xFFFFFF)) +#define GRPSC_IDX(s) ((s) & 0xFFFFFF) +#define GRPSC_SALT(s) ((uint8_t)((s) >> 24)) +#define GRPSC_HASH_SALT(h) ((uint8_t)((h) >> 56)) + +static inline int64_t* grpsc_entry_keys(grpsc_ht_t* ht, uint32_t idx) { + return (int64_t*)(ht->entries + (size_t)idx * ht->entry_stride); +} +static inline int64_t* grpsc_entry_cnt(grpsc_ht_t* ht, uint32_t idx) { + return (int64_t*)(ht->entries + (size_t)idx * ht->entry_stride + + (size_t)ht->n_keys * 8); +} +static inline double* grpsc_entry_sum(grpsc_ht_t* ht, uint32_t idx) { + return (double*)(ht->entries + (size_t)idx * ht->entry_stride + + (size_t)ht->n_keys * 8 + 8); +} + +static bool grpsc_ht_init(grpsc_ht_t* ht, uint32_t init_cap, uint8_t n_keys) { + memset(ht, 0, sizeof(*ht)); + if (init_cap < 32) init_cap = 32; + uint32_t cap = 1; + while (cap < init_cap) cap <<= 1; + ht->cap = cap; + ht->n_keys = n_keys; + ht->entry_stride = (uint16_t)((size_t)n_keys * 8 + 16); + ht->entry_cap = cap / 2; + if (ht->entry_cap < 16) ht->entry_cap = 16; + ht->slots = (uint32_t*)scratch_alloc(&ht->_slots_hdr, (size_t)cap * 4); + if (!ht->slots) { ht->oom = true; return false; } + memset(ht->slots, 0xFF, (size_t)cap * 4); + ht->entries = (char*)scratch_alloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * ht->entry_stride); + if (!ht->entries) { ht->oom = true; return false; } + return true; +} + +static void grpsc_ht_free(grpsc_ht_t* ht) { + if (ht->_slots_hdr) scratch_free(ht->_slots_hdr); + if (ht->_entries_hdr) scratch_free(ht->_entries_hdr); + memset(ht, 0, sizeof(*ht)); +} + +/* Hash the keys[N] tuple — combines per-key i64 hashes. */ +static inline uint64_t grpsc_hash_keys(const int64_t* keys, uint8_t n) { + uint64_t h = ray_hash_i64(keys[0]); + for (uint8_t i = 1; i < n; i++) + h = ray_hash_combine(h, ray_hash_i64(keys[i])); + return h; +} + +static bool grpsc_ht_grow_slots(grpsc_ht_t* ht) { + uint32_t old_cap = ht->cap, new_cap = old_cap * 2; + ray_t* new_hdr = NULL; + uint32_t* new_slots = (uint32_t*)scratch_alloc(&new_hdr, (size_t)new_cap * 4); + if (!new_slots) { ht->oom = true; return false; } + memset(new_slots, 0xFF, (size_t)new_cap * 4); + uint32_t mask = new_cap - 1; + for (uint32_t i = 0; i < ht->count; i++) { + const int64_t* k = grpsc_entry_keys(ht, i); + uint64_t h = grpsc_hash_keys(k, ht->n_keys); + uint32_t pp = (uint32_t)(h & mask); + uint8_t salt = GRPSC_HASH_SALT(h); + for (;;) { + if (new_slots[pp] == GRPSC_EMPTY) { + new_slots[pp] = GRPSC_PACK(salt, i); break; + } + pp = (pp + 1) & mask; + } + } + scratch_free(ht->_slots_hdr); + ht->_slots_hdr = new_hdr; ht->slots = new_slots; ht->cap = new_cap; + return true; +} + +static bool grpsc_ht_grow_entries(grpsc_ht_t* ht) { + uint32_t new_ecap = ht->entry_cap * 2; + char* new_e = (char*)scratch_realloc(&ht->_entries_hdr, + (size_t)ht->entry_cap * ht->entry_stride, + (size_t)new_ecap * ht->entry_stride); + if (!new_e) { ht->oom = true; return false; } + ht->entries = new_e; ht->entry_cap = new_ecap; + return true; +} + +/* Probe-or-insert. `keys_in` points to N consecutive int64s. Returns + * the entry index (always valid) or UINT32_MAX on OOM. */ +static inline uint32_t +grpsc_ht_get(grpsc_ht_t* ht, uint64_t hash, const int64_t* keys_in) { + if (ht->cap == 0 || (ht->count + 1) * 2 > ht->cap) { + if (!grpsc_ht_grow_slots(ht)) return UINT32_MAX; + } + if (ht->count >= ht->entry_cap) { + if (!grpsc_ht_grow_entries(ht)) return UINT32_MAX; + } + uint8_t n = ht->n_keys; + size_t key_bytes = (size_t)n * 8; + uint32_t mask = ht->cap - 1; + uint32_t pp = (uint32_t)(hash & mask); + uint8_t salt = GRPSC_HASH_SALT(hash); + for (;;) { + uint32_t s = ht->slots[pp]; + if (s == GRPSC_EMPTY) { + uint32_t idx = ht->count++; + ht->slots[pp] = GRPSC_PACK(salt, idx); + int64_t* dst_keys = grpsc_entry_keys(ht, idx); + memcpy(dst_keys, keys_in, key_bytes); + *grpsc_entry_cnt(ht, idx) = 0; + *grpsc_entry_sum(ht, idx) = 0.0; + return idx; + } + if (GRPSC_SALT(s) == salt) { + uint32_t idx = GRPSC_IDX(s); + const int64_t* eks = grpsc_entry_keys(ht, idx); + if (memcmp(eks, keys_in, key_bytes) == 0) return idx; + } + pp = (pp + 1) & mask; + } +} + +typedef struct { + const void** key_data; /* [n_keys] base pointers */ + const int8_t* key_types; /* [n_keys] */ + const uint8_t* key_attrs; /* [n_keys] */ + const void* v_data; + int8_t v_type; + uint8_t n_keys; + grpsc_scat_buf_t* bufs; + uint32_t n_workers; + uint16_t entry_stride; +} grpsc_phase1_ctx_t; + +static inline void grpsc_scat_push(grpsc_scat_buf_t* buf, uint16_t stride, + uint64_t hash, const int64_t* keys, + uint8_t n, int64_t v_bits) { + if (__builtin_expect(buf->count >= buf->cap, 0)) { + uint32_t old_cap = buf->cap ? buf->cap : 64; + uint32_t new_cap = old_cap * 2; + char* new_data = (char*)scratch_realloc(&buf->_hdr, + (size_t)buf->cap * stride, + (size_t)new_cap * stride); + if (!new_data) { buf->oom = true; return; } + buf->data = new_data; buf->cap = new_cap; + } + char* dst = buf->data + (size_t)buf->count * stride; + memcpy(dst, &hash, 8); + memcpy(dst + 8, keys, (size_t)n * 8); + memcpy(dst + 8 + (size_t)n * 8, &v_bits, 8); + buf->count++; +} + +static void grpsc_phase1_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + grpsc_phase1_ctx_t* c = (grpsc_phase1_ctx_t*)ctx_v; + grpsc_scat_buf_t* my_bufs = &c->bufs[(size_t)worker_id * RADIX_P]; + uint8_t n = c->n_keys; + uint16_t stride = c->entry_stride; + bool v_is_f64 = (c->v_type == RAY_F64); + int64_t keys[8]; + + for (int64_t r = start; r < end; r++) { + for (uint8_t k = 0; k < n; k++) + keys[k] = read_col_i64(c->key_data[k], r, + c->key_types[k], c->key_attrs[k]); + int64_t v_bits; + if (v_is_f64) { + memcpy(&v_bits, &((const double*)c->v_data)[r], 8); + } else { + int64_t vi = read_col_i64(c->v_data, r, c->v_type, 0); + double vd = (double)vi; + memcpy(&v_bits, &vd, 8); + } + uint64_t h = grpsc_hash_keys(keys, n); + uint32_t part = RADIX_PART(h); + grpsc_scat_push(&my_bufs[part], stride, h, keys, n, v_bits); + } +} + +typedef struct { + grpsc_scat_buf_t* bufs; + uint32_t n_workers; + grpsc_ht_t* part_hts; + int64_t* part_emit_rows; + uint8_t n_keys; + uint16_t entry_stride; +} grpsc_phase2_ctx_t; + +static void grpsc_phase2_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpsc_phase2_ctx_t* c = (grpsc_phase2_ctx_t*)ctx_v; + uint8_t n = c->n_keys; + uint16_t stride = c->entry_stride; + size_t key_bytes = (size_t)n * 8; + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpsc_ht_t* ph = &c->part_hts[p]; + + /* Estimate HT size from partition's total entry count. */ + uint32_t total_entries = 0; + for (uint32_t w = 0; w < c->n_workers; w++) + total_entries += c->bufs[(size_t)w * RADIX_P + p].count; + if (total_entries == 0) { c->part_emit_rows[p] = 0; continue; } + + /* q10 worst case: nearly all entries are distinct, so load + * factor 0.5 ≈ 2× total_entries. Cap at 16M (24-bit). */ + uint32_t init_ht = 256; + while (init_ht < total_entries * 2 && init_ht < (1u << 24)) + init_ht <<= 1; + if (!grpsc_ht_init(ph, init_ht, n)) return; + + for (uint32_t w = 0; w < c->n_workers; w++) { + grpsc_scat_buf_t* buf = &c->bufs[(size_t)w * RADIX_P + p]; + if (!buf->data || buf->oom) continue; + const char* base = buf->data; + uint32_t nbuf = buf->count; + enum { PF_DIST = 8 }; + uint32_t pf_end = (nbuf < PF_DIST) ? nbuf : PF_DIST; + uint32_t mask = ph->cap - 1; + for (uint32_t j = 0; j < pf_end; j++) { + uint64_t h; memcpy(&h, base + (size_t)j * stride, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(h & mask)], 0, 1); + } + for (uint32_t i = 0; i < nbuf; i++) { + if (i + PF_DIST < nbuf) { + uint64_t hpf; + memcpy(&hpf, base + (size_t)(i + PF_DIST) * stride, 8); + __builtin_prefetch(&ph->slots[(uint32_t)(hpf & (ph->cap - 1))], 0, 1); + } + uint64_t h; + const char* e = base + (size_t)i * stride; + memcpy(&h, e, 8); + const int64_t* keys_in = (const int64_t*)(const void*)(e + 8); + int64_t v_bits; + memcpy(&v_bits, e + 8 + key_bytes, 8); + uint32_t idx = grpsc_ht_get(ph, h, keys_in); + if (idx == UINT32_MAX) return; + double v; memcpy(&v, &v_bits, 8); + (*grpsc_entry_cnt(ph, idx))++; + *grpsc_entry_sum(ph, idx) += v; + } + } + c->part_emit_rows[p] = (int64_t)ph->count; + } +} + +typedef struct { + grpsc_ht_t* part_hts; + const int64_t* part_offsets; /* [RADIX_P+1] */ + const int8_t* key_types; /* [n_keys] */ + const uint8_t* key_attrs; /* [n_keys] */ + void** key_outs; /* [n_keys] data pointers */ + int64_t* cnt_out; + double* sum_out; + uint8_t n_keys; +} grpsc_phase3_ctx_t; + +static void grpsc_phase3_fn(void* ctx_v, uint32_t worker_id, + int64_t start, int64_t end) { + (void)worker_id; + grpsc_phase3_ctx_t* c = (grpsc_phase3_ctx_t*)ctx_v; + uint8_t n = c->n_keys; + for (int64_t pi = start; pi < end; pi++) { + uint32_t p = (uint32_t)pi; + grpsc_ht_t* ph = &c->part_hts[p]; + int64_t base_row = c->part_offsets[p]; + for (uint32_t g = 0; g < ph->count; g++) { + int64_t out_row = base_row + (int64_t)g; + const int64_t* eks = grpsc_entry_keys(ph, g); + for (uint8_t k = 0; k < n; k++) { + write_col_i64(c->key_outs[k], out_row, eks[k], + c->key_types[k], c->key_attrs[k]); + } + c->sum_out[out_row] = *grpsc_entry_sum(ph, g); + c->cnt_out[out_row] = *grpsc_entry_cnt(ph, g); + } + } +} + +ray_t* exec_group_sum_count_rowform(ray_graph_t* g, ray_op_t* op) { + ray_op_ext_t* ext = find_ext(g, op->id); + if (!ext || ext->n_keys < 3 || ext->n_keys > 8 || + ext->n_aggs != 2 || !ext->agg_ins) + return ray_error("domain", "group_sum_count_rowform: bad shape"); + + ray_t* tbl = g->table; + if (!tbl || RAY_IS_ERR(tbl)) return tbl; + + uint8_t n_keys = ext->n_keys; + + /* Resolve keys + v. */ + ray_op_ext_t* kexts[8]; + ray_t* k_vecs[8]; + int64_t k_syms[8]; + int8_t k_types[8]; + uint8_t k_attrs[8]; + for (uint8_t k = 0; k < n_keys; k++) { + kexts[k] = find_ext(g, ext->keys[k]->id); + if (!kexts[k] || kexts[k]->base.opcode != OP_SCAN) + return ray_error("domain", "group_sum_count_rowform: non-scan key"); + k_vecs[k] = ray_table_get_col(tbl, kexts[k]->sym); + if (!k_vecs[k]) + return ray_error("domain", "group_sum_count_rowform: key column missing"); + k_syms[k] = kexts[k]->sym; + k_types[k] = k_vecs[k]->type; + k_attrs[k] = k_vecs[k]->attrs; + int8_t kt = k_types[k]; + if (kt != RAY_I64 && kt != RAY_I32 && kt != RAY_I16 && kt != RAY_U8 && + kt != RAY_BOOL && kt != RAY_DATE && kt != RAY_TIME && + kt != RAY_TIMESTAMP && kt != RAY_SYM) + return ray_error("nyi", "group_sum_count_rowform: key type"); + if (k_vecs[k]->attrs & RAY_ATTR_HAS_NULLS) + return ray_error("nyi", "group_sum_count_rowform: nullable key"); + } + + /* agg[0] = SUM(v), agg[1] = COUNT(any). COUNT input is ignored + * since gate guarantees non-null v (count of rows in group). */ + ray_op_ext_t* vext = find_ext(g, ext->agg_ins[0]->id); + if (!vext || vext->base.opcode != OP_SCAN) + return ray_error("domain", "group_sum_count_rowform: non-scan val"); + ray_t* v_vec = ray_table_get_col(tbl, vext->sym); + if (!v_vec) + return ray_error("domain", "group_sum_count_rowform: val column missing"); + int8_t vt = v_vec->type; + int vt_ok = (vt == RAY_I64 || vt == RAY_I32 || vt == RAY_I16 || + vt == RAY_U8 || vt == RAY_BOOL || vt == RAY_F64); + if (!vt_ok) + return ray_error("nyi", "group_sum_count_rowform: val type"); + if (v_vec->attrs & RAY_ATTR_HAS_NULLS) + return ray_error("nyi", "group_sum_count_rowform: nullable val"); + + int64_t nrows = k_vecs[0]->len; + int64_t sum_sym = ray_sym_intern("v_sum", 5); + int64_t cnt_sym = ray_sym_intern("v_count", 7); + int64_t ncols = (int64_t)n_keys + 2; + if (nrows == 0) { + ray_t* out = ray_table_new(ncols); + for (uint8_t k = 0; k < n_keys; k++) { + ray_t* ev = (k_types[k] == RAY_SYM) + ? ray_sym_vec_new(k_attrs[k] & RAY_SYM_W_MASK, 0) + : ray_vec_new(k_types[k], 0); + out = ray_table_add_col(out, k_syms[k], ev); + ray_release(ev); + } + ray_t* sv = ray_vec_new(RAY_F64, 0); + ray_t* cv = ray_vec_new(RAY_I64, 0); + out = ray_table_add_col(out, sum_sym, sv); + out = ray_table_add_col(out, cnt_sym, cv); + ray_release(sv); ray_release(cv); + return out; + } + + ray_pool_t* pool = ray_pool_get(); + uint32_t n_workers = pool ? ray_pool_total_workers(pool) : 1; + bool parallel = pool && nrows >= 16384; + if (!parallel) n_workers = 1; + + uint16_t entry_stride = (uint16_t)((size_t)n_keys * 8 + 16); + + size_t n_bufs = (size_t)n_workers * RADIX_P; + ray_t* bufs_hdr = NULL; + grpsc_scat_buf_t* bufs = (grpsc_scat_buf_t*)scratch_calloc(&bufs_hdr, + n_bufs * sizeof(grpsc_scat_buf_t)); + if (!bufs) return ray_error("oom", NULL); + + uint32_t init_cap = 256; + for (size_t i = 0; i < n_bufs; i++) { + bufs[i].data = (char*)scratch_alloc(&bufs[i]._hdr, + (size_t)init_cap * entry_stride); + if (!bufs[i].data) { + for (size_t j = 0; j <= i; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + bufs[i].cap = init_cap; + } + + const void* key_data[8]; + for (uint8_t k = 0; k < n_keys; k++) key_data[k] = ray_data(k_vecs[k]); + + grpsc_phase1_ctx_t p1 = { + .key_data = key_data, + .key_types = k_types, + .key_attrs = k_attrs, + .v_data = ray_data(v_vec), + .v_type = vt, + .n_keys = n_keys, + .bufs = bufs, + .n_workers = n_workers, + .entry_stride = entry_stride, + }; + if (parallel) ray_pool_dispatch(pool, grpsc_phase1_fn, &p1, nrows); + else grpsc_phase1_fn(&p1, 0, 0, nrows); + + for (size_t i = 0; i < n_bufs; i++) { + if (bufs[i].oom) { + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + ray_t* phts_hdr = NULL; + grpsc_ht_t* part_hts = (grpsc_ht_t*)scratch_calloc(&phts_hdr, + (size_t)RADIX_P * sizeof(grpsc_ht_t)); + ray_t* per_hdr = NULL; + int64_t* part_emit_rows = (int64_t*)scratch_calloc(&per_hdr, + (size_t)RADIX_P * sizeof(int64_t)); + if (!part_hts || !part_emit_rows) { + if (phts_hdr) scratch_free(phts_hdr); + if (per_hdr) scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + grpsc_phase2_ctx_t p2 = { + .bufs = bufs, + .n_workers = n_workers, + .part_hts = part_hts, + .part_emit_rows = part_emit_rows, + .n_keys = n_keys, + .entry_stride = entry_stride, + }; + if (parallel) ray_pool_dispatch_n(pool, grpsc_phase2_fn, &p2, RADIX_P); + else grpsc_phase2_fn(&p2, 0, 0, RADIX_P); + + for (uint32_t p = 0; p < RADIX_P; p++) { + if (part_hts[p].oom) { + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) scratch_free(bufs[j]._hdr); + scratch_free(bufs_hdr); + return ray_error("oom", NULL); + } + } + + for (size_t j = 0; j < n_bufs; j++) + if (bufs[j]._hdr) { scratch_free(bufs[j]._hdr); bufs[j]._hdr = NULL; } + scratch_free(bufs_hdr); bufs_hdr = NULL; bufs = NULL; + + ray_t* po_hdr = NULL; + int64_t* part_offsets = (int64_t*)scratch_alloc(&po_hdr, + (size_t)(RADIX_P + 1) * sizeof(int64_t)); + if (!part_offsets) { + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(phts_hdr); scratch_free(per_hdr); + return ray_error("oom", NULL); + } + int64_t total_rows = 0; + for (uint32_t p = 0; p < RADIX_P; p++) { + part_offsets[p] = total_rows; + total_rows += part_emit_rows[p]; + } + part_offsets[RADIX_P] = total_rows; + + /* Allocate output columns. */ + ray_t* key_outs[8] = {0}; + for (uint8_t k = 0; k < n_keys; k++) { + key_outs[k] = (k_types[k] == RAY_SYM) + ? ray_sym_vec_new(k_attrs[k] & RAY_SYM_W_MASK, total_rows) + : ray_vec_new(k_types[k], total_rows); + if (!key_outs[k] || RAY_IS_ERR(key_outs[k])) { + for (uint8_t j = 0; j <= k; j++) + if (key_outs[j]) ray_release(key_outs[j]); + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + return ray_error("oom", NULL); + } + key_outs[k]->len = total_rows; + } + ray_t* sum_out = ray_vec_new(RAY_F64, total_rows); + ray_t* cnt_out = ray_vec_new(RAY_I64, total_rows); + if (!sum_out || !cnt_out || RAY_IS_ERR(sum_out) || RAY_IS_ERR(cnt_out)) { + for (uint8_t k = 0; k < n_keys; k++) + if (key_outs[k]) ray_release(key_outs[k]); + if (sum_out) ray_release(sum_out); + if (cnt_out) ray_release(cnt_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + return ray_error("oom", NULL); + } + sum_out->len = total_rows; + cnt_out->len = total_rows; + + void* key_out_data[8]; + for (uint8_t k = 0; k < n_keys; k++) key_out_data[k] = ray_data(key_outs[k]); + + grpsc_phase3_ctx_t p3 = { + .part_hts = part_hts, + .part_offsets = part_offsets, + .key_types = k_types, + .key_attrs = k_attrs, + .key_outs = key_out_data, + .cnt_out = (int64_t*)ray_data(cnt_out), + .sum_out = (double*)ray_data(sum_out), + .n_keys = n_keys, + }; + if (parallel) ray_pool_dispatch_n(pool, grpsc_phase3_fn, &p3, RADIX_P); + else grpsc_phase3_fn(&p3, 0, 0, RADIX_P); + + ray_t* result = ray_table_new(ncols); + if (!result || RAY_IS_ERR(result)) { + for (uint8_t k = 0; k < n_keys; k++) + if (key_outs[k]) ray_release(key_outs[k]); + ray_release(sum_out); ray_release(cnt_out); + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + return result ? result : ray_error("oom", NULL); + } + for (uint8_t k = 0; k < n_keys; k++) { + result = ray_table_add_col(result, k_syms[k], key_outs[k]); + } + result = ray_table_add_col(result, sum_sym, sum_out); + result = ray_table_add_col(result, cnt_sym, cnt_out); + for (uint8_t k = 0; k < n_keys; k++) ray_release(key_outs[k]); + ray_release(sum_out); ray_release(cnt_out); + + for (uint32_t i = 0; i < RADIX_P; i++) grpsc_ht_free(&part_hts[i]); + scratch_free(po_hdr); + scratch_free(phts_hdr); scratch_free(per_hdr); + + return result; +} + diff --git a/src/ops/internal.h b/src/ops/internal.h index cf4e7517..6badf146 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -837,6 +837,10 @@ ray_t* ray_topk_per_group_buf(ray_t* src, ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t group_limit); ray_t* exec_group_topk_rowform(ray_graph_t* g, ray_op_t* op); +ray_t* exec_group_pearson_rowform(ray_graph_t* g, ray_op_t* op); +ray_t* exec_group_maxmin_rowform(ray_graph_t* g, ray_op_t* op); +ray_t* exec_group_median_stddev_rowform(ray_graph_t* g, ray_op_t* op); +ray_t* exec_group_sum_count_rowform(ray_graph_t* g, ray_op_t* op); /* ── collection.c ── */ ray_t* distinct_vec_eager(ray_t* x); diff --git a/src/ops/ops.h b/src/ops/ops.h index 97e59689..86a4aba9 100644 --- a/src/ops/ops.h +++ b/src/ops/ops.h @@ -206,6 +206,36 @@ void ray_cancel(void); * partition in phase 2; emit a 2-column table (key, value) in row form. */ #define OP_GROUP_TOPK_ROWFORM 91 #define OP_GROUP_BOTK_ROWFORM 110 +/* Dedicated single-pass per-group Pearson² with row-form emission for + * the canonical shape `(select (pearson_corr x y) from t by [k0 k1])`. + * Two-phase parallel: per-worker scatter into RADIX_P partitions in + * phase 1; per-partition open-addressing HT with fixed Pearson state + * (Σx, Σy, Σx², Σy², Σxy, cnt) in phase 2; emit a 3-column table + * (key0, key1, r²) directly. Bypasses Anton-merge slowdown that + * affects OP_PEARSON_CORR via the shared radix HT path. 1 or 2 keys. */ +#define OP_GROUP_PEARSON_ROWFORM 111 +/* Dedicated single-pass per-group MAX(x)+MIN(y) with row-form emission + * for the canonical shape `(select (max x) (min y) from t by k)`. + * Bypasses Anton-merge slowdown on the shared radix HT path. Closes + * the first stage of H2O canonical q7 (max(v1)-min(v2) per id3); the + * second stage is element-wise arithmetic on the small result. 1 key, + * 2 fixed-state aggs (MAX, MIN), integer x/y. */ +#define OP_GROUP_MAXMIN_ROWFORM 112 +/* Dedicated single-pass per-group MEDIAN(v)+STDDEV(v) with row-form + * emission for canonical shape `(select (median v) (std v) from t by + * k0 k1)`. Phase 2 builds per-partition HT + group-contiguous F64 + * v_buf in two passes; Phase 3 runs ray_median_dbl_inplace per group. + * Bypasses the shared OP_GROUP path's reprobe-and-histogram holistic + * fill. Closes H2O canonical q6. 2 keys, both aggs on the same + * column, non-nullable inputs. */ +#define OP_GROUP_MEDIAN_STDDEV_ROWFORM 113 +/* Dedicated multi-key (N=3..8) per-group sum(v)+count(v) with row-form + * emission for canonical shape `(select (sum v) (count v) from t by + * k1 k2 .. kN)`. Bypasses the shared OP_GROUP path's direct-array + * eligibility scans, rowsel + nullable defensive checks, and Anton- + * merge regressions. Closes H2O canonical q10 (6-key composite with + * ~10M unique groups, essentially a row-dedup workload). */ +#define OP_GROUP_SUM_COUNT_ROWFORM 114 /* Opcodes — Graph */ #define OP_EXPAND 80 /* 1-hop CSR neighbor expansion */ @@ -609,6 +639,27 @@ ray_op_t* ray_group3(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys, * (type-matched to `val`), both flat — one row per (group, kept-value). */ ray_op_t* ray_group_topk_rowform(ray_graph_t* g, ray_op_t* key, ray_op_t* val, int64_t k, uint8_t desc); +/* Dedicated per-group Pearson² with row-form emission. See + * OP_GROUP_PEARSON_ROWFORM comment above. keys[0..n_keys) are the + * group-by columns (1 or 2); x and y are the two value columns. Output: + * (key0, [key1,] r2) table where r² = corr(x, y)² per group. */ +ray_op_t* ray_group_pearson_rowform(ray_graph_t* g, ray_op_t** keys, + uint8_t n_keys, ray_op_t* x, ray_op_t* y); +/* Dedicated per-group max(x) + min(y) with row-form emission. See + * OP_GROUP_MAXMIN_ROWFORM comment. Output: (key, max_x, min_y). */ +ray_op_t* ray_group_maxmin_rowform(ray_graph_t* g, ray_op_t* key, + ray_op_t* x, ray_op_t* y); +/* Dedicated per-group median(v) + std(v) with row-form emission. See + * OP_GROUP_MEDIAN_STDDEV_ROWFORM comment. keys[0..1] are two group + * columns; v is the value column for both aggregates. Output: + * (key0, key1, v_median, v_std). */ +ray_op_t* ray_group_median_stddev_rowform(ray_graph_t* g, ray_op_t** keys, + ray_op_t* v, int with_count); +/* Dedicated multi-key per-group sum(v)+count(v) with row-form emission. + * See OP_GROUP_SUM_COUNT_ROWFORM comment. N keys (3..8); v is the + * value column for sum (count counts non-null v rows). */ +ray_op_t* ray_group_sum_count_rowform(ray_graph_t* g, ray_op_t** keys, + uint8_t n_keys, ray_op_t* v); ray_op_t* ray_distinct(ray_graph_t* g, ray_op_t** keys, uint8_t n_keys); ray_op_t* ray_pivot_op(ray_graph_t* g, ray_op_t** index_cols, uint8_t n_index, diff --git a/src/ops/query.c b/src/ops/query.c index 3e9669f4..8492523d 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -5923,10 +5923,217 @@ ray_t* ray_select(ray_t** args, int64_t n) { agg_k, n_aggs); } } else if (has_binary_agg) { - root = ray_group2(g, key_ops, n_keys, agg_ops, - agg_ins, agg_ins2, n_aggs); + /* Fast path: dedicated row-form per-group Pearson² for + * the exact shape `(select (pearson_corr x y) from T + * by [k0] or [k0 k1])` with no other aggs / non-aggs / + * where. Bypasses Anton-merge slowdown that affects + * OP_PEARSON_CORR via the shared radix HT path. q9 + * canonical (id2 SYM + id4 I64; v1 I64, v2 I64) hits + * this. */ + int prf_ok = 0; + if (n_aggs == 1 && n_nonaggs == 0 + && !where_expr + && agg_ops[0] == OP_PEARSON_CORR + && (n_keys == 1 || n_keys == 2) + && key_ops[0] && key_ops[0]->opcode == OP_SCAN + && agg_ins[0] && agg_ins[0]->opcode == OP_SCAN + && agg_ins2[0] && agg_ins2[0]->opcode == OP_SCAN + && (n_keys == 1 || (key_ops[1] && key_ops[1]->opcode == OP_SCAN))) + { + prf_ok = 1; + for (uint8_t k = 0; k < n_keys && prf_ok; k++) { + ray_op_ext_t* kext = find_ext(g, key_ops[k]->id); + ray_t* kc = (kext && tbl) ? ray_table_get_col(tbl, kext->sym) : NULL; + if (!kc) { prf_ok = 0; break; } + int8_t kt = kc->type; + int kt_ok = (kt == RAY_I64 || kt == RAY_I32 || + kt == RAY_I16 || kt == RAY_U8 || + kt == RAY_BOOL || kt == RAY_DATE || + kt == RAY_TIME || kt == RAY_TIMESTAMP || + kt == RAY_SYM); + if (!kt_ok) prf_ok = 0; + } + if (prf_ok) { + ray_op_ext_t* xext = find_ext(g, agg_ins[0]->id); + ray_op_ext_t* yext = find_ext(g, agg_ins2[0]->id); + ray_t* xc = (xext && tbl) ? ray_table_get_col(tbl, xext->sym) : NULL; + ray_t* yc = (yext && tbl) ? ray_table_get_col(tbl, yext->sym) : NULL; + if (!xc || !yc) prf_ok = 0; + else { + int8_t xt = xc->type, yt = yc->type; + int xt_ok = (xt == RAY_I64 || xt == RAY_I32 || + xt == RAY_I16 || xt == RAY_U8 || + xt == RAY_BOOL || xt == RAY_F64); + int yt_ok = (yt == RAY_I64 || yt == RAY_I32 || + yt == RAY_I16 || yt == RAY_U8 || + yt == RAY_BOOL || yt == RAY_F64); + if (!xt_ok || !yt_ok) prf_ok = 0; + } + } + } + if (prf_ok) { + root = ray_group_pearson_rowform(g, key_ops, n_keys, + agg_ins[0], agg_ins2[0]); + } else { + root = ray_group2(g, key_ops, n_keys, agg_ops, + agg_ins, agg_ins2, n_aggs); + } } else { - root = ray_group(g, key_ops, n_keys, agg_ops, agg_ins, n_aggs); + /* Fast path: dedicated row-form per-group max(x)+min(y) + * for shape `(select (max x) (min y) from T by k)`. + * Bypasses radix HT slowdown; closes q7 first stage. */ + int mm_ok = 0; + if (n_aggs == 2 && n_keys == 1 && n_nonaggs == 0 + && !where_expr + && agg_ops[0] == OP_MAX && agg_ops[1] == OP_MIN + && key_ops[0] && key_ops[0]->opcode == OP_SCAN + && agg_ins[0] && agg_ins[0]->opcode == OP_SCAN + && agg_ins[1] && agg_ins[1]->opcode == OP_SCAN) + { + ray_op_ext_t* kext = find_ext(g, key_ops[0]->id); + ray_op_ext_t* xext = find_ext(g, agg_ins[0]->id); + ray_op_ext_t* yext = find_ext(g, agg_ins[1]->id); + ray_t* kc = (kext && tbl) ? ray_table_get_col(tbl, kext->sym) : NULL; + ray_t* xc = (xext && tbl) ? ray_table_get_col(tbl, xext->sym) : NULL; + ray_t* yc = (yext && tbl) ? ray_table_get_col(tbl, yext->sym) : NULL; + if (kc && xc && yc) { + int8_t kt = kc->type, xt = xc->type, yt = yc->type; + int kt_ok = (kt == RAY_I64 || kt == RAY_I32 || + kt == RAY_I16 || kt == RAY_U8 || + kt == RAY_BOOL || kt == RAY_DATE || + kt == RAY_TIME || kt == RAY_TIMESTAMP || + kt == RAY_SYM); + int xt_int = (xt == RAY_I64 || xt == RAY_I32 || + xt == RAY_I16 || xt == RAY_U8 || + xt == RAY_BOOL); + int yt_int = (yt == RAY_I64 || yt == RAY_I32 || + yt == RAY_I16 || yt == RAY_U8 || + yt == RAY_BOOL); + if (kt_ok && xt_int && yt_int) mm_ok = 1; + } + } + /* Fast path: dedicated row-form per-group median(v)+std(v) + * for shape `(select (median v) (std v) [(count v)] + * from T by k0 k1)`. Optional 3rd COUNT agg matches + * the canonical Python adapter wrapper (null surrogate + * for std(n<=1)). Bypasses radix HT slowdown + holistic + * reprobe; closes canonical H2O q6. */ + int ms_ok = 0; + int ms_with_count = 0; + /* All aggs must reference the same source column — + * compare by SYM, not node id, because each Column + * builder creates a fresh OP_SCAN node even when + * they alias the same column name. */ + int ms_aggs_same_col = 0; + if (n_aggs == 2 || n_aggs == 3) { + ray_op_ext_t* v0e = agg_ins[0] ? find_ext(g, agg_ins[0]->id) : NULL; + ray_op_ext_t* v1e = agg_ins[1] ? find_ext(g, agg_ins[1]->id) : NULL; + ray_op_ext_t* v2e = (n_aggs == 3 && agg_ins[2]) + ? find_ext(g, agg_ins[2]->id) : v0e; + ms_aggs_same_col = (v0e && v1e && v2e + && v0e->sym == v1e->sym + && v0e->sym == v2e->sym) ? 1 : 0; + } + if (!mm_ok && n_keys == 2 && n_nonaggs == 0 + && !where_expr + && (n_aggs == 2 || n_aggs == 3) + && agg_ops[0] == OP_MEDIAN && agg_ops[1] == OP_STDDEV + && (n_aggs == 2 || agg_ops[2] == OP_COUNT) + && key_ops[0] && key_ops[0]->opcode == OP_SCAN + && key_ops[1] && key_ops[1]->opcode == OP_SCAN + && agg_ins[0] && agg_ins[0]->opcode == OP_SCAN + && agg_ins[1] && agg_ins[1]->opcode == OP_SCAN + && (n_aggs == 2 || + (agg_ins[2] && agg_ins[2]->opcode == OP_SCAN)) + && ms_aggs_same_col) + { + ms_with_count = (n_aggs == 3) ? 1 : 0; + ray_op_ext_t* k0ext = find_ext(g, key_ops[0]->id); + ray_op_ext_t* k1ext = find_ext(g, key_ops[1]->id); + ray_op_ext_t* vxt = find_ext(g, agg_ins[0]->id); + ray_t* k0c = (k0ext && tbl) ? ray_table_get_col(tbl, k0ext->sym) : NULL; + ray_t* k1c = (k1ext && tbl) ? ray_table_get_col(tbl, k1ext->sym) : NULL; + ray_t* vc = (vxt && tbl) ? ray_table_get_col(tbl, vxt->sym) : NULL; + if (k0c && k1c && vc + && !(k0c->attrs & RAY_ATTR_HAS_NULLS) + && !(k1c->attrs & RAY_ATTR_HAS_NULLS) + && !(vc->attrs & RAY_ATTR_HAS_NULLS)) + { + int8_t k0t = k0c->type, k1t = k1c->type, vt = vc->type; + int k0_ok = (k0t == RAY_I64 || k0t == RAY_I32 || + k0t == RAY_I16 || k0t == RAY_U8 || + k0t == RAY_BOOL || k0t == RAY_DATE || + k0t == RAY_TIME || k0t == RAY_TIMESTAMP || + k0t == RAY_SYM); + int k1_ok = (k1t == RAY_I64 || k1t == RAY_I32 || + k1t == RAY_I16 || k1t == RAY_U8 || + k1t == RAY_BOOL || k1t == RAY_DATE || + k1t == RAY_TIME || k1t == RAY_TIMESTAMP || + k1t == RAY_SYM); + int vt_ok = (vt == RAY_I64 || vt == RAY_I32 || + vt == RAY_I16 || vt == RAY_U8 || + vt == RAY_BOOL || vt == RAY_F64); + if (k0_ok && k1_ok && vt_ok) ms_ok = 1; + } + } + /* Fast path: dedicated multi-key sum(v)+count(v) for + * shape `(select (sum v) (count v) from T by k1..kN)` + * where N ∈ {3..8}. Closes canonical H2O q10. */ + int sc_ok = 0; + if (!mm_ok && !ms_ok && n_keys >= 3 && n_keys <= 8 + && n_aggs == 2 && n_nonaggs == 0 && !where_expr + && agg_ops[0] == OP_SUM && agg_ops[1] == OP_COUNT + && agg_ins[0] && agg_ins[0]->opcode == OP_SCAN + && agg_ins[1] && agg_ins[1]->opcode == OP_SCAN) + { + int all_scan_keys = 1; + for (uint8_t k = 0; k < n_keys && all_scan_keys; k++) + if (!key_ops[k] || key_ops[k]->opcode != OP_SCAN) + all_scan_keys = 0; + if (all_scan_keys) { + ray_op_ext_t* vext = find_ext(g, agg_ins[0]->id); + ray_t* vc = (vext && tbl) ? ray_table_get_col(tbl, vext->sym) : NULL; + int all_keys_typed = 1; + int all_keys_nonnull = 1; + for (uint8_t k = 0; k < n_keys; k++) { + ray_op_ext_t* kxt = find_ext(g, key_ops[k]->id); + ray_t* kc = (kxt && tbl) ? ray_table_get_col(tbl, kxt->sym) : NULL; + if (!kc) { all_keys_typed = 0; break; } + int8_t kt = kc->type; + int kt_ok = (kt == RAY_I64 || kt == RAY_I32 || + kt == RAY_I16 || kt == RAY_U8 || + kt == RAY_BOOL || kt == RAY_DATE || + kt == RAY_TIME || kt == RAY_TIMESTAMP || + kt == RAY_SYM); + if (!kt_ok) { all_keys_typed = 0; break; } + if (kc->attrs & RAY_ATTR_HAS_NULLS) { + all_keys_nonnull = 0; + break; + } + } + if (vc && all_keys_typed && all_keys_nonnull + && !(vc->attrs & RAY_ATTR_HAS_NULLS)) { + int8_t vt = vc->type; + int vt_ok = (vt == RAY_I64 || vt == RAY_I32 || + vt == RAY_I16 || vt == RAY_U8 || + vt == RAY_BOOL || vt == RAY_F64); + if (vt_ok) sc_ok = 1; + } + } + } + if (mm_ok) { + root = ray_group_maxmin_rowform(g, key_ops[0], + agg_ins[0], agg_ins[1]); + } else if (ms_ok) { + root = ray_group_median_stddev_rowform(g, key_ops, + agg_ins[0], + ms_with_count); + } else if (sc_ok) { + root = ray_group_sum_count_rowform(g, key_ops, + n_keys, agg_ins[0]); + } else { + root = ray_group(g, key_ops, n_keys, agg_ops, agg_ins, n_aggs); + } } } else { /* No aggs but non-agg expressions exist — still need group