Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/rayforce.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ static inline void* ray_data_fn(ray_t* v) {
#define ray_slice_data(v) ray_data_fn(v) /* alias — ray_data is always slice-safe */
#define ray_data(v) ray_data_fn(v)

/* ===== Introspection helpers (FFI-safe access for foreign consumers) ===== */

int8_t ray_obj_type(ray_t* v);
uint8_t ray_obj_attrs(ray_t* v);
int64_t ray_vec_get_i64(ray_t* vec, int64_t idx);
double ray_vec_get_f64(ray_t* vec, int64_t idx);
int64_t ray_vec_get_sym_id(ray_t* vec, int64_t idx);

/* ===== Memory Allocator API ===== */

ray_t* ray_alloc(size_t data_size);
Expand Down
77 changes: 62 additions & 15 deletions src/core/runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "runtime.h"
#include "mem/heap.h"
#include "mem/sys.h"
#include "table/sym.h"
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
Expand All @@ -44,6 +45,15 @@ extern void ray_lang_destroy(void);
ray_runtime_t *__RUNTIME = NULL;
_Thread_local ray_vm_t *__VM = NULL;

/* Persistent error message buffer.
*
* `__VM->err.msg` lives inside the VM struct, which is freed at the end of
* every eval (eval.c sets __VM = NULL right after `ray_free(vm_block)`). By
* the time the FFI caller reaches `ray_error_msg()`, the VM is gone. Stash a
* copy in a thread-local buffer that outlives the VM so callers can still
* read what went wrong. */
static _Thread_local char ray_last_err_msg[256] = {0};

/* Static null singleton — type RAY_NULL, ARENA flag makes retain/release no-ops */
ray_t __ray_null = { .type = RAY_NULL, .attrs = RAY_ATTR_ARENA, .rc = 0, .len = 0 };

Expand Down Expand Up @@ -118,18 +128,21 @@ ray_err_t ray_err_from_obj(ray_t* err) {
/* ===== Error API ===== */

static ray_t* ray_verror(const char* code, const char* fmt, va_list ap) {
/* Populate / clear the per-VM message buffer FIRST. On the deep-OOM
* path below we return the static __ray_oom sentinel, but that path
* still has to leave __VM->err.msg consistent with this call —
* otherwise ray_error_msg() returns text from whatever earlier error
* happened to land in the buffer last, which a user would naturally
* read as the message for THIS error. The vsnprintf target is a
* fixed-size member of __VM (allocated at runtime-init), so this
* step does not depend on the heap and stays valid even when
* ray_alloc below fails. */
if (__VM) {
if (fmt) vsnprintf(__VM->err.msg, sizeof(__VM->err.msg), fmt, ap);
else __VM->err.msg[0] = '\0';
/* Populate / clear the persistent message buffer FIRST. On the
* deep-OOM path below we return the static __ray_oom sentinel, but
* that path still has to leave ray_error_msg() consistent with this
* call. The buffer is thread-local storage, so this does not depend
* on the heap and stays valid even when ray_alloc below fails. */
if (fmt) {
va_list copy;
va_copy(copy, ap);
vsnprintf(ray_last_err_msg, sizeof(ray_last_err_msg), fmt, copy);
va_end(copy);
if (__VM)
memcpy(__VM->err.msg, ray_last_err_msg, sizeof(__VM->err.msg));
} else {
ray_last_err_msg[0] = '\0';
if (__VM) __VM->err.msg[0] = '\0';
}

ray_t* err = ray_alloc(0);
Expand All @@ -155,8 +168,9 @@ ray_t* ray_error(const char* code, const char* fmt, ...) {
return err;
}
/* No format string — skip va_list entirely for portability. Clear
* the per-VM message buffer FIRST so the deep-OOM sentinel path
* the persistent message buffer FIRST so the deep-OOM sentinel path
* doesn't leave stale text from an earlier error visible. */
ray_last_err_msg[0] = '\0';
if (__VM) __VM->err.msg[0] = '\0';
ray_t* err = ray_alloc(0);
if (!err) return &__ray_oom; /* sentinel — see __ray_oom comment */
Expand Down Expand Up @@ -200,11 +214,12 @@ const char* ray_err_code(ray_t* err) {
}

const char* ray_error_msg(void) {
if (!__VM || !__VM->err.msg[0]) return NULL;
return __VM->err.msg;
if (!ray_last_err_msg[0]) return NULL;
return ray_last_err_msg;
}

void ray_error_clear(void) {
ray_last_err_msg[0] = '\0';
if (__VM) __VM->err.msg[0] = '\0';
}

Expand Down Expand Up @@ -343,6 +358,38 @@ bool ray_mem_pressure(void) {
return (int64_t)(st.bytes_allocated + st.direct_bytes) > __RUNTIME->mem_budget;
}

int8_t ray_obj_type(ray_t* v) {
return v ? v->type : 0;
}

uint8_t ray_obj_attrs(ray_t* v) {
return v ? v->attrs : 0;
}

int64_t ray_vec_get_i64(ray_t* vec, int64_t idx) {
if (!vec || idx < 0 || idx >= vec->len) return 0;
if (vec->type == RAY_I64 || vec->type == RAY_DATE || vec->type == RAY_TIME || vec->type == RAY_TIMESTAMP) {
return ((const int64_t*)ray_data(vec))[idx];
}
if (vec->type == RAY_I32) return ((const int32_t*)ray_data(vec))[idx];
if (vec->type == RAY_I16) return ((const int16_t*)ray_data(vec))[idx];
if (vec->type == RAY_U8 || vec->type == RAY_BOOL) return ((const uint8_t*)ray_data(vec))[idx];
return 0;
}

double ray_vec_get_f64(ray_t* vec, int64_t idx) {
if (!vec || idx < 0 || idx >= vec->len) return 0.0;
if (vec->type == RAY_F64) return ((const double*)ray_data(vec))[idx];
if (vec->type == RAY_F32) return ((const float*)ray_data(vec))[idx];
return 0.0;
}

int64_t ray_vec_get_sym_id(ray_t* vec, int64_t idx) {
if (!vec || idx < 0 || idx >= vec->len) return 0;
if (vec->type != RAY_SYM) return 0;
return ray_read_sym(ray_data(vec), idx, vec->type, vec->attrs);
}

void ray_runtime_destroy(ray_runtime_t* rt) {
if (!rt) return;

Expand Down
7 changes: 6 additions & 1 deletion src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,12 @@ ray_t* call_lambda(ray_t* lambda, ray_t** call_args, int64_t argc) {
* Stack-based VM executor (computed goto, frame-based)
* ══════════════════════════════════════════ */

static _Thread_local ray_vm_t *__VM = NULL;
/* Shared thread-local with runtime.c (declared extern in core/runtime.h).
* Defining it locally here would shadow runtime.c's symbol, leaving
* ray_error_msg() reading a NULL pointer on any thread that ran an eval
* without going through ray_runtime_create — which is every tokio worker
* thread in ray-exomem. */
extern _Thread_local ray_vm_t *__VM;

static ray_t* vm_exec(ray_t* lambda, ray_t** call_args, int64_t argc) {
/* Computed goto dispatch table */
Expand Down
95 changes: 78 additions & 17 deletions src/ops/datalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include <string.h>
#include <stdio.h>

/* From core/runtime.h — avoiding the include because it pulls in a ray_vm_t
* typedef that conflicts with the one in lang/eval.h. */
extern const char* ray_error_msg(void);

/* ========================================================================
* Program lifecycle
* ======================================================================== */
Expand Down Expand Up @@ -319,11 +323,20 @@ void dl_body_set_var(dl_rule_t* rule, int body_idx, int pos, int var_idx) {
if (var_idx + 1 > rule->n_vars) rule->n_vars = var_idx + 1;
}

void dl_body_set_const(dl_rule_t* rule, int body_idx, int pos, int64_t val) {
void dl_body_set_const_typed(dl_rule_t* rule, int body_idx, int pos,
int64_t val, int8_t ray_type) {
if (body_idx < 0 || body_idx >= rule->n_body) return;
if (pos < 0 || pos >= rule->body[body_idx].arity) return;
rule->body[body_idx].vars[pos] = DL_CONST;
rule->body[body_idx].const_vals[pos] = val;
rule->body[body_idx].const_types[pos] = ray_type;
}

void dl_body_set_const(dl_rule_t* rule, int body_idx, int pos, int64_t val) {
/* Default the type tag to RAY_I64 — callers that care about
* RAY_SYM / RAY_STR literals (and thus DATOM-tag-aware compares
* against I64 columns) should call dl_body_set_const_typed. */
dl_body_set_const_typed(rule, body_idx, pos, val, RAY_I64);
}

int dl_rule_add_neg(dl_rule_t* rule, const char* pred, int arity) {
Expand Down Expand Up @@ -952,19 +965,51 @@ static ray_t* dl_antijoin_tables(ray_t* left, ray_t* right,
return result;
}

/* Helper: filter a table to rows where column col_idx == value */
/* Helper: filter a table to rows where column col_idx == value.
* `const_type` is the source ray type of the body literal (RAY_STR /
* RAY_SYM / RAY_I64 / RAY_F64) so the row-equality helper can dispatch
* a tag-aware compare for DATOM-encoded I64 columns — see
* dl_col_eq_row below. */
/* Row-at-index read helper: read an I64 from either a RAY_I64 column
* or from a RAY_SYM column (of any adaptive width) as a sym ID. Other
* types aren't supported by the constant-filter path and cause the
* caller to pass through the input table unchanged. */
static bool dl_col_eq_row(ray_t* col, int64_t row, int64_t value) {
if (col->type == RAY_I64) return ((int64_t*)ray_data(col))[row] == value;
* caller to pass through the input table unchanged.
*
* RAY_I64 columns may hold either plain integers or DATOM-tagged sym
* IDs (`(0x4000... | sym_id)` for STR, `(0x2000... | sym_id)` for SYM)
* deposited by an EAV frontend. The body literal's source ray type
* (`const_type`) is the only signal we have for which encoding the
* caller intended: a `"foo"` literal interns as a plain sym ID but
* targets a STR-tagged cell; a `'foo` literal interns plain and
* targets a SYM-tagged cell. We always try the direct compare first
* (plain-int columns and same-tag columns hit this path) and fall
* back to a payload compare when the cell carries the matching tag —
* so a body literal can pin both an untagged RAY_SYM column built
* from a rule head and a DATOM-tagged RAY_I64 column built from EAV
* storage without the frontend having to know which is which. */
static bool dl_col_eq_row(ray_t* col, int64_t row, int64_t value,
int8_t const_type) {
if (col->type == RAY_I64) {
int64_t cell = ((int64_t*)ray_data(col))[row];
if (cell == value) return true;
int64_t cell_tag = cell & (int64_t)0x6000000000000000;
if (cell_tag == 0) return false; /* plain int column */
int64_t cell_payload = cell & (int64_t)0x1FFFFFFFFFFFFFFF;
if (const_type == RAY_STR &&
cell_tag == (int64_t)0x4000000000000000)
return cell_payload == value;
if (const_type == RAY_SYM &&
cell_tag == (int64_t)0x2000000000000000)
return cell_payload == value;
return false;
}
if (col->type == RAY_SYM)
return ray_read_sym(ray_data(col), row, col->type, col->attrs) == value;
return false;
}

static ray_t* dl_filter_eq(ray_t* tbl, int col_idx, int64_t value) {
static ray_t* dl_filter_eq(ray_t* tbl, int col_idx, int64_t value,
int8_t const_type) {
/* Contract: always return an owned reference (rc bumped) so the
* caller can release uniformly. Every pass-through must therefore
* retain — else the caller's `ray_release(body_tbl); body_tbl =
Expand All @@ -988,7 +1033,7 @@ static ray_t* dl_filter_eq(ray_t* tbl, int col_idx, int64_t value) {
/* Count matching rows — type-aware read for RAY_SYM adaptive width. */
int64_t count = 0;
for (int64_t r = 0; r < nrows; r++)
if (dl_col_eq_row(col, r, value)) count++;
if (dl_col_eq_row(col, r, value, const_type)) count++;

if (count == nrows) { ray_retain(tbl); return tbl; }

Expand All @@ -1015,7 +1060,7 @@ static ray_t* dl_filter_eq(ray_t* tbl, int col_idx, int64_t value) {
uint8_t* dst_b = (uint8_t*)ray_data(dst);
int64_t j = 0;
for (int64_t r = 0; r < nrows; r++) {
if (dl_col_eq_row(col, r, value)) {
if (dl_col_eq_row(col, r, value, const_type)) {
memcpy(dst_b + (size_t)j * esz,
src_b + (size_t)r * esz,
(size_t)esz);
Expand Down Expand Up @@ -1257,7 +1302,8 @@ ray_op_t* dl_compile_rule(dl_program_t* prog, dl_rule_t* rule,
/* Apply constant filters */
for (int c = 0; c < body->arity; c++) {
if (body->vars[c] == DL_CONST) {
ray_t* filtered = dl_filter_eq(body_tbl, c, body->const_vals[c]);
ray_t* filtered = dl_filter_eq(body_tbl, c, body->const_vals[c],
body->const_types[c]);
ray_release(body_tbl);
if (!filtered) {
/* Treat as genuine failure — dl_filter_eq returns an
Expand Down Expand Up @@ -1418,7 +1464,8 @@ ray_op_t* dl_compile_rule(dl_program_t* prog, dl_rule_t* rule,
ray_retain(neg_tbl);
for (int c = 0; c < body->arity; c++) {
if (body->vars[c] == DL_CONST) {
ray_t* filtered = dl_filter_eq(neg_tbl, c, body->const_vals[c]);
ray_t* filtered = dl_filter_eq(neg_tbl, c, body->const_vals[c],
body->const_types[c]);
ray_release(neg_tbl);
if (!filtered) {
ray_release(accum);
Expand Down Expand Up @@ -3517,7 +3564,7 @@ static ray_t* dl_set_body_pos(dl_rule_t* rule, int bidx, int pos,
return NULL;
}
if (node->type == -RAY_I64) {
dl_body_set_const(rule, bidx, pos, node->i64);
dl_body_set_const_typed(rule, bidx, pos, node->i64, RAY_I64);
return NULL;
}
if (node->type == -RAY_SYM) {
Expand All @@ -3528,26 +3575,27 @@ static ray_t* dl_set_body_pos(dl_rule_t* rule, int bidx, int pos,
vars->syms[vi] = -1 - vi;
dl_body_set_var(rule, bidx, pos, vi);
} else {
dl_body_set_const(rule, bidx, pos, node->i64);
dl_body_set_const_typed(rule, bidx, pos, node->i64, RAY_SYM);
}
return NULL;
}
if (node->type == -RAY_STR) {
/* Quoted string literal in body: intern as sym so it compares
* equal to other sym-interned constants. Mirrors the head
* parser convention. */
* equal to other sym-interned constants. Record the source type
* as RAY_STR so the row-equality helper can also try a tagged-
* payload compare against DATOM-encoded I64 columns. */
int64_t sym = ray_sym_intern(ray_str_ptr(node), ray_str_len(node));
dl_body_set_const(rule, bidx, pos, sym);
dl_body_set_const_typed(rule, bidx, pos, sym, RAY_STR);
return NULL;
}
/* For other forms (e.g., (quote x)), evaluate to get constant */
ray_t* val = ray_eval(node);
if (!val || RAY_IS_ERR(val))
return val ? val : ray_error("type", "rule: cannot evaluate constant in body");
if (val->type == -RAY_I64) {
dl_body_set_const(rule, bidx, pos, val->i64);
dl_body_set_const_typed(rule, bidx, pos, val->i64, RAY_I64);
} else if (val->type == -RAY_SYM) {
dl_body_set_const(rule, bidx, pos, val->i64);
dl_body_set_const_typed(rule, bidx, pos, val->i64, RAY_SYM);
} else {
ray_release(val);
return ray_error("type", "rule: unsupported constant type in body");
Expand Down Expand Up @@ -4174,8 +4222,21 @@ ray_t* ray_query_fn(ray_t** args, int64_t n) {
}

if (dl_eval(prog) != 0) {
/* Preserve any inner-error detail that ray_error() stashed in the
* thread-local error buffer before dl_eval freed the offending
* object. Without this, callers see a generic "evaluation failed". */
char prev[256];
const char* p = ray_error_msg();
prev[0] = '\0';
if (p && *p) {
strncpy(prev, p, sizeof(prev) - 1);
prev[sizeof(prev) - 1] = '\0';
}
dl_program_free(prog);
ray_release(db);
if (prev[0]) {
return ray_error("domain", "query: evaluation failed: %s", prev);
}
return ray_error("domain", "query: evaluation failed");
}

Expand Down
19 changes: 18 additions & 1 deletion src/ops/datalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ typedef struct {
int arity; /* number of argument positions */
int vars[DL_MAX_ARITY]; /* variable indices (DL_CONST for constants) */
int64_t const_vals[DL_MAX_ARITY]; /* constant values (I64/SYM) */
int8_t const_types[DL_MAX_ARITY]; /* ray type tag per body slot when
* vars[i]==DL_CONST: RAY_I64 / RAY_SYM /
* RAY_STR / RAY_F64. 0 when vars[i] is
* a variable. Used by the row-equality
* helper to dispatch tag-aware compares
* against DATOM-encoded I64 columns
* (see dl_col_eq_row). */
int cmp_op; /* comparison operator (for DL_CMP) */
int cmp_lhs; /* left variable index (for DL_CMP) */
int cmp_rhs; /* right variable index or DL_CONST */
Expand Down Expand Up @@ -265,9 +272,19 @@ int dl_rule_add_atom(dl_rule_t* rule, const char* pred, int arity);
/* Set a body atom argument to a variable */
void dl_body_set_var(dl_rule_t* rule, int body_idx, int pos, int var_idx);

/* Set a body atom argument to a constant */
/* Set a body atom argument to a constant. Defaults the type tag to RAY_I64;
* use dl_body_set_const_typed when the literal's source ray type matters
* (RAY_STR / RAY_SYM body literals must record their type so the row-
* equality helper can compare them against DATOM-encoded I64 columns). */
void dl_body_set_const(dl_rule_t* rule, int body_idx, int pos, int64_t val);

/* Set a body atom argument to a typed constant. `ray_type` is the source
* ray type of the literal (RAY_I64 / RAY_SYM / RAY_STR / RAY_F64), used at
* filter time to decide whether a tagged-payload compare is needed when
* the column is RAY_I64 with DATOM-encoded entries. */
void dl_body_set_const_typed(dl_rule_t* rule, int body_idx, int pos,
int64_t val, int8_t ray_type);

/* Add a negated body atom. Returns body literal index. */
int dl_rule_add_neg(dl_rule_t* rule, const char* pred, int arity);

Expand Down
Loading