Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ops/dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ const char* ray_opcode_name(uint16_t op) {
case OP_GROUP: return "GROUP";
case OP_GROUP_TOPK_ROWFORM: return "GROUP_TOPK_ROWFORM";
case OP_GROUP_BOTK_ROWFORM: return "GROUP_BOTK_ROWFORM";
case OP_GROUP_PEARSON_ROWFORM: return "GROUP_PEARSON_ROWFORM";
case OP_GROUP_MAXMIN_ROWFORM: return "GROUP_MAXMIN_ROWFORM";
case OP_GROUP_MEDIAN_STDDEV_ROWFORM: return "GROUP_MEDIAN_STDDEV_ROWFORM";
case OP_GROUP_SUM_COUNT_ROWFORM: return "GROUP_SUM_COUNT_ROWFORM";
case OP_FILTERED_GROUP:return "FILTERED_GROUP";
case OP_PIVOT: return "PIVOT";
case OP_ANTIJOIN: return "ANTIJOIN";
Expand Down
12 changes: 12 additions & 0 deletions src/ops/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,18 @@ static ray_t* exec_node_inner(ray_graph_t* g, ray_op_t* op) {
case OP_GROUP_BOTK_ROWFORM:
return exec_group_topk_rowform(g, op);

case OP_GROUP_PEARSON_ROWFORM:
return exec_group_pearson_rowform(g, op);

case OP_GROUP_MAXMIN_ROWFORM:
return exec_group_maxmin_rowform(g, op);

case OP_GROUP_MEDIAN_STDDEV_ROWFORM:
return exec_group_median_stddev_rowform(g, op);

case OP_GROUP_SUM_COUNT_ROWFORM:
return exec_group_sum_count_rowform(g, op);

case OP_PIVOT: {
ray_t* tbl = g->table;
ray_t* owned_tbl = NULL;
Expand Down
181 changes: 181 additions & 0 deletions src/ops/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,187 @@ ray_op_t* ray_group_topk_rowform(ray_graph_t* g, ray_op_t* key,
return &g->nodes[ext->base.id];
}

/* Dedicated per-group Pearson² with row-form emission. Mirrors the
* OP_GROUP ext-node layout (1-2 keys + 2 agg inputs x, y) so downstream
* optimiser passes can introspect ext->keys / ext->agg_ins the same
* way they do for OP_GROUP, but with a distinct opcode that exec.c
* routes to exec_group_pearson_rowform. */
ray_op_t* ray_group_pearson_rowform(ray_graph_t* g, ray_op_t** keys,
uint8_t n_keys, ray_op_t* x, ray_op_t* y) {
if (!g || !keys || n_keys < 1 || n_keys > 2 || !x || !y) return NULL;
for (uint8_t k = 0; k < n_keys; k++)
if (!keys[k]) return NULL;

size_t keys_sz = (size_t)n_keys * sizeof(ray_op_t*);
size_t ops_sz = sizeof(uint16_t);
size_t ins_sz = sizeof(ray_op_t*);
size_t ins2_sz = sizeof(ray_op_t*);
size_t ops_off = keys_sz;
/* align */
ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1);
size_t ins_off = ops_off + ops_sz;
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);
size_t ins2_off = ins_off + ins_sz;
ins2_off = (ins2_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);

ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins2_off + ins2_sz);
if (!ext) return NULL;

ext->base.opcode = OP_GROUP_PEARSON_ROWFORM;
ext->base.arity = 0;
ext->base.out_type = RAY_TABLE;
ext->base.est_rows = keys[0]->est_rows;
ext->base.inputs[0] = keys[0];

char* trail = EXT_TRAIL(ext);
ext->keys = (ray_op_t**)trail;
for (uint8_t k = 0; k < n_keys; k++) ext->keys[k] = keys[k];
ext->agg_ops = (uint16_t*)(trail + ops_off);
ext->agg_ops[0] = OP_PEARSON_CORR;
ext->agg_ins = (ray_op_t**)(trail + ins_off);
ext->agg_ins[0] = x;
ext->agg_ins2 = (ray_op_t**)(trail + ins2_off);
ext->agg_ins2[0] = y;
ext->agg_k = NULL;
ext->n_keys = n_keys;
ext->n_aggs = 1;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

/* Dedicated per-group max(x) + min(y) with row-form emission. Mirror
* of ray_group_pearson_rowform with 1 key and 2 fixed-state aggs
* (OP_MAX and OP_MIN). */
ray_op_t* ray_group_maxmin_rowform(ray_graph_t* g, ray_op_t* key,
ray_op_t* x, ray_op_t* y) {
if (!g || !key || !x || !y) return NULL;

size_t keys_sz = sizeof(ray_op_t*);
size_t ops_sz = 2 * sizeof(uint16_t);
size_t ins_sz = 2 * sizeof(ray_op_t*);
size_t ops_off = keys_sz;
ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1);
size_t ins_off = ops_off + ops_sz;
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);

ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz);
if (!ext) return NULL;

ext->base.opcode = OP_GROUP_MAXMIN_ROWFORM;
ext->base.arity = 0;
ext->base.out_type = RAY_TABLE;
ext->base.est_rows = key->est_rows;
ext->base.inputs[0] = key;

char* trail = EXT_TRAIL(ext);
ext->keys = (ray_op_t**)trail;
ext->keys[0] = key;
ext->agg_ops = (uint16_t*)(trail + ops_off);
ext->agg_ops[0] = OP_MAX;
ext->agg_ops[1] = OP_MIN;
ext->agg_ins = (ray_op_t**)(trail + ins_off);
ext->agg_ins[0] = x;
ext->agg_ins[1] = y;
ext->agg_ins2 = NULL;
ext->agg_k = NULL;
ext->n_keys = 1;
ext->n_aggs = 2;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

/* Dedicated per-group median(v) + std(v) with row-form emission. Mirror
* of ray_group_maxmin_rowform with 2 keys and 2 aggs on the same
* value column (OP_MEDIAN and OP_STDDEV). */
ray_op_t* ray_group_median_stddev_rowform(ray_graph_t* g, ray_op_t** keys,
ray_op_t* v, int with_count) {
if (!g || !keys || !keys[0] || !keys[1] || !v) return NULL;

uint8_t n_aggs = with_count ? 3 : 2;
size_t keys_sz = 2 * sizeof(ray_op_t*);
size_t ops_sz = (size_t)n_aggs * sizeof(uint16_t);
size_t ins_sz = (size_t)n_aggs * sizeof(ray_op_t*);
size_t ops_off = keys_sz;
ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1);
size_t ins_off = ops_off + ops_sz;
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);

ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz);
if (!ext) return NULL;

ext->base.opcode = OP_GROUP_MEDIAN_STDDEV_ROWFORM;
ext->base.arity = 0;
ext->base.out_type = RAY_TABLE;
ext->base.est_rows = keys[0]->est_rows;
ext->base.inputs[0] = keys[0];

char* trail = EXT_TRAIL(ext);
ext->keys = (ray_op_t**)trail;
ext->keys[0] = keys[0];
ext->keys[1] = keys[1];
ext->agg_ops = (uint16_t*)(trail + ops_off);
ext->agg_ops[0] = OP_MEDIAN;
ext->agg_ops[1] = OP_STDDEV;
if (with_count) ext->agg_ops[2] = OP_COUNT;
ext->agg_ins = (ray_op_t**)(trail + ins_off);
ext->agg_ins[0] = v;
ext->agg_ins[1] = v;
if (with_count) ext->agg_ins[2] = v;
ext->agg_ins2 = NULL;
ext->agg_k = NULL;
ext->n_keys = 2;
ext->n_aggs = n_aggs;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

/* Dedicated multi-key per-group sum(v) + count(v) with row-form
* emission. N keys (3..8) packed inline in the ext node trail. Mirror
* of ray_group_median_stddev_rowform extended for variadic keys. */
ray_op_t* ray_group_sum_count_rowform(ray_graph_t* g, ray_op_t** keys,
uint8_t n_keys, ray_op_t* v) {
if (!g || !keys || !v || n_keys < 3 || n_keys > 8) return NULL;
for (uint8_t k = 0; k < n_keys; k++)
if (!keys[k]) return NULL;

size_t keys_sz = (size_t)n_keys * sizeof(ray_op_t*);
size_t ops_sz = 2 * sizeof(uint16_t);
size_t ins_sz = 2 * sizeof(ray_op_t*);
size_t ops_off = keys_sz;
ops_off = (ops_off + sizeof(uint16_t) - 1) & ~(sizeof(uint16_t) - 1);
size_t ins_off = ops_off + ops_sz;
ins_off = (ins_off + sizeof(ray_op_t*) - 1) & ~(sizeof(ray_op_t*) - 1);

ray_op_ext_t* ext = graph_alloc_ext_node_ex(g, ins_off + ins_sz);
if (!ext) return NULL;

ext->base.opcode = OP_GROUP_SUM_COUNT_ROWFORM;
ext->base.arity = 0;
ext->base.out_type = RAY_TABLE;
ext->base.est_rows = keys[0]->est_rows;
ext->base.inputs[0] = keys[0];

char* trail = EXT_TRAIL(ext);
ext->keys = (ray_op_t**)trail;
for (uint8_t k = 0; k < n_keys; k++) ext->keys[k] = keys[k];
ext->agg_ops = (uint16_t*)(trail + ops_off);
ext->agg_ops[0] = OP_SUM;
ext->agg_ops[1] = OP_COUNT;
ext->agg_ins = (ray_op_t**)(trail + ins_off);
ext->agg_ins[0] = v;
ext->agg_ins[1] = v;
ext->agg_ins2 = NULL;
ext->agg_k = NULL;
ext->n_keys = n_keys;
ext->n_aggs = 2;

g->nodes[ext->base.id] = ext->base;
return &g->nodes[ext->base.id];
}

ray_op_t* ray_pivot_op(ray_graph_t* g,
ray_op_t** index_cols, uint8_t n_index,
ray_op_t* pivot_col,
Expand Down
Loading
Loading