Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ GOOGLE_CLIENT_SECRET=
# Cron schedule for cleaning up expired device authorizations (UTC)
# DEVICE_FLOW_CLEANUP_CRON=*/15 * * * *

# -----------------------------------------------------------------------------
# Optional — Embedding
# -----------------------------------------------------------------------------

# Base URL for the embedding API (omit for OpenAI default)
# EMBEDDING_BASE_URL=

# Timeout per embedding API call (ms)
# EMBEDDING_TIMEOUT_MS=

# Max retries on transient failures (default: 2, from Vercel AI SDK)
# EMBEDDING_MAX_RETRIES=

# Max concurrent chunk requests for batch embedding (default: Infinity)
# EMBEDDING_MAX_PARALLEL_CALLS=

# -----------------------------------------------------------------------------
# Optional — Telemetry
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -144,6 +128,22 @@ GOOGLE_CLIENT_SECRET=
# PostgreSQL idle-in-transaction timeout for engine DB transactions
# ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT=30s

# -----------------------------------------------------------------------------
# Optional — Embedding
# -----------------------------------------------------------------------------

# Base URL for the embedding API (omit for OpenAI default)
# EMBEDDING_BASE_URL=

# Timeout per embedding API call (ms)
# EMBEDDING_TIMEOUT_MS=

# Max retries on transient failures (default: 2, from Vercel AI SDK)
# EMBEDDING_MAX_RETRIES=

# Max concurrent chunk requests for batch embedding (default: Infinity)
# EMBEDDING_MAX_PARALLEL_CALLS=

# -----------------------------------------------------------------------------
# Optional — Embedding Worker
# -----------------------------------------------------------------------------
Expand All @@ -166,6 +166,30 @@ GOOGLE_CLIENT_SECRET=
# Engine re-discovery interval (ms)
# WORKER_REFRESH_INTERVAL_MS=60000

# -----------------------------------------------------------------------------
# Optional — Embedding Worker Engine Database
# -----------------------------------------------------------------------------

# PostgreSQL connection string for embedding worker engine traffic.
# Defaults to ENGINE_DATABASE_URL when unset.
# WORKER_ENGINE_DATABASE_URL=postgres://postgres:postgres@localhost:5432/me

# Maximum connections in the dedicated embedding worker engine pool.
# Defaults to max(WORKER_COUNT, 1) when unset.
# WORKER_ENGINE_POOL_MAX=2

# Close idle embedding worker engine connections after N seconds.
# Defaults to ENGINE_POOL_IDLE_REAP_SECONDS when unset.
# WORKER_ENGINE_POOL_IDLE_REAP_SECONDS=300

# Max embedding worker engine connection lifetime in seconds (0 = forever).
# Defaults to ENGINE_POOL_MAX_LIFETIME when unset.
# WORKER_ENGINE_POOL_MAX_LIFETIME=0

# Timeout for establishing embedding worker engine connections (seconds).
# Defaults to ENGINE_POOL_CONNECTION_TIMEOUT when unset.
# WORKER_ENGINE_POOL_CONNECTION_TIMEOUT=30

# PostgreSQL statement timeout for embedding worker engine DB transactions
# Defaults to ENGINE_STATEMENT_TIMEOUT when unset.
# WORKER_ENGINE_STATEMENT_TIMEOUT=25s
Expand Down
80 changes: 69 additions & 11 deletions packages/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import {
} from "@memory.build/engine/migrate";
import { bootstrap as bootstrapEngine } from "@memory.build/engine/migrate/bootstrap";
import { migrateAll as migrateEngines } from "@memory.build/engine/migrate/runner";
import {
DEFAULT_ENGINE_TIMEOUTS,
type EngineTimeouts,
} from "@memory.build/engine/ops/_tx";
import { WorkerPool } from "@memory.build/worker";
import { configure, info, reportError, span } from "@pydantic/logfire-node";
import { MIN_CLIENT_VERSION, SERVER_VERSION } from "../../version";
Expand Down Expand Up @@ -90,6 +94,17 @@ configure({
// ENGINE_TRANSACTION_TIMEOUT - Per-engine-transaction timeout (default: 30s)
// ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Idle-in-transaction timeout (default: 30s)
//
// Embedding Worker Engine Database:
// WORKER_ENGINE_DATABASE_URL - PostgreSQL connection string for worker engine traffic (default: ENGINE_DATABASE_URL)
// WORKER_ENGINE_POOL_MAX - Max worker engine connections (default: WORKER_COUNT)
// WORKER_ENGINE_POOL_IDLE_REAP_SECONDS - Close idle pooled connections after N seconds (default: ENGINE_POOL_IDLE_REAP_SECONDS)
// WORKER_ENGINE_POOL_MAX_LIFETIME - Max lifetime in seconds, 0=forever (default: ENGINE_POOL_MAX_LIFETIME)
// WORKER_ENGINE_POOL_CONNECTION_TIMEOUT - Connection timeout in seconds (default: ENGINE_POOL_CONNECTION_TIMEOUT)
// WORKER_ENGINE_STATEMENT_TIMEOUT - Worker engine query timeout (default: ENGINE_STATEMENT_TIMEOUT)
// WORKER_ENGINE_LOCK_TIMEOUT - Worker engine lock wait timeout (default: ENGINE_LOCK_TIMEOUT)
// WORKER_ENGINE_TRANSACTION_TIMEOUT - Worker engine transaction timeout (default: ENGINE_TRANSACTION_TIMEOUT)
// WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Worker engine idle-in-transaction timeout (default: ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
//
// Cleanup:
// DEVICE_FLOW_CLEANUP_CRON - Cron schedule for cleaning up expired device auths
// (default: "*/15 * * * *" = every 15 minutes, UTC)
Expand All @@ -101,10 +116,6 @@ configure({
// WORKER_IDLE_DELAY_MS - Poll interval when idle in ms (default: 10000)
// WORKER_MAX_BACKOFF_MS - Max error backoff in ms (default: 60000)
// WORKER_REFRESH_INTERVAL_MS - Engine re-discovery interval in ms (default: 60000)
// WORKER_ENGINE_STATEMENT_TIMEOUT - Worker engine query timeout (default: ENGINE_STATEMENT_TIMEOUT)
// WORKER_ENGINE_LOCK_TIMEOUT - Worker engine lock wait timeout (default: ENGINE_LOCK_TIMEOUT)
// WORKER_ENGINE_TRANSACTION_TIMEOUT - Worker engine transaction timeout (default: ENGINE_TRANSACTION_TIMEOUT)
// WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT - Worker engine idle-in-transaction timeout (default: ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
//
// =============================================================================

Expand Down Expand Up @@ -153,6 +164,12 @@ const deviceFlowCleanupCron =

const accountsSchema = process.env.ACCOUNTS_SCHEMA || "accounts";

const workerCount = parseIntEnv(
"WORKER_COUNT",
process.env.WORKER_COUNT || "",
"2",
);

// Connection pool settings - Accounts database
const accountsPoolMax = parseIntEnv(
"ACCOUNTS_POOL_MAX",
Expand Down Expand Up @@ -197,6 +214,44 @@ const enginePoolConnectionTimeout = parseIntEnv(
"30",
);

// Connection pool settings - Embedding worker engine database
const workerEngineDatabaseUrl =
process.env.WORKER_ENGINE_DATABASE_URL || engineDatabaseUrl;
const workerEnginePoolMax = parseIntEnv(
"WORKER_ENGINE_POOL_MAX",
process.env.WORKER_ENGINE_POOL_MAX || "",
String(Math.max(workerCount, 1)),
);
const workerEnginePoolIdleReapSeconds = parseIntEnv(
"WORKER_ENGINE_POOL_IDLE_REAP_SECONDS",
process.env.WORKER_ENGINE_POOL_IDLE_REAP_SECONDS || "",
String(enginePoolIdleReapSeconds),
);
const workerEnginePoolMaxLifetime = parseIntEnv(
"WORKER_ENGINE_POOL_MAX_LIFETIME",
process.env.WORKER_ENGINE_POOL_MAX_LIFETIME || "",
String(enginePoolMaxLifetime),
);
const workerEnginePoolConnectionTimeout = parseIntEnv(
"WORKER_ENGINE_POOL_CONNECTION_TIMEOUT",
process.env.WORKER_ENGINE_POOL_CONNECTION_TIMEOUT || "",
String(enginePoolConnectionTimeout),
);
const workerEngineTimeouts: EngineTimeouts = {
statementTimeout:
process.env.WORKER_ENGINE_STATEMENT_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.statementTimeout,
lockTimeout:
process.env.WORKER_ENGINE_LOCK_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.lockTimeout,
transactionTimeout:
process.env.WORKER_ENGINE_TRANSACTION_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.transactionTimeout,
idleInTransactionSessionTimeout:
process.env.WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout,
};

// =============================================================================
// Embedding Config
// =============================================================================
Expand Down Expand Up @@ -305,6 +360,13 @@ const engineSql = new Bun.SQL(engineDatabaseUrl, {
connectionTimeout: enginePoolConnectionTimeout,
});

const workerEngineSql = new Bun.SQL(workerEngineDatabaseUrl, {
max: workerEnginePoolMax,
idleTimeout: workerEnginePoolIdleReapSeconds,
maxLifetime: workerEnginePoolMaxLifetime,
connectionTimeout: workerEnginePoolConnectionTimeout,
});

// Create accounts DB with operations layer
const accountsDb = createAccountsDB(accountsSql, accountsSchema, {
masterKey: masterKeyBuffer,
Expand Down Expand Up @@ -410,13 +472,7 @@ const router = createRouter(serverContext);
// Embedding Worker Pool
// =============================================================================

const workerCount = parseIntEnv(
"WORKER_COUNT",
process.env.WORKER_COUNT || "",
"2",
);

const workerPool = new WorkerPool(engineSql, {
const workerPool = new WorkerPool(workerEngineSql, {
embedding: embeddingConfig,
discover: async () => {
const engines = await accountsDb.listActiveEngines();
Expand Down Expand Up @@ -446,6 +502,7 @@ const workerPool = new WorkerPool(engineSql, {
process.env.WORKER_REFRESH_INTERVAL_MS || "",
"60000",
),
workerEngineTimeouts,
});

await workerPool.start(workerCount);
Expand Down Expand Up @@ -535,6 +592,7 @@ async function shutdown() {
try {
await accountsSql.close();
await engineSql.close();
await workerEngineSql.close();
} catch (error) {
reportError("Error closing database connections", error as Error);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { WorkerConfig, WorkerStats } from "./types";
import { Worker } from "./worker";

/**
* Pool of N embedding workers sharing a single SQL connection pool.
* Pool of N embedding workers using the provided SQL connection pool.
* Each worker independently discovers engines, shuffles its target list,
* and polls queues. FOR UPDATE SKIP LOCKED prevents double-processing.
*/
Expand Down
53 changes: 24 additions & 29 deletions packages/worker/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,19 @@ import { info, reportError, span, warning } from "@pydantic/logfire-node";
import type { SQL } from "bun";
import type { EngineTarget, ProcessResult, WorkerConfig } from "./types";

const WORKER_ENGINE_TIMEOUTS: EngineTimeouts = {
statementTimeout:
process.env.WORKER_ENGINE_STATEMENT_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.statementTimeout,
lockTimeout:
process.env.WORKER_ENGINE_LOCK_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.lockTimeout,
transactionTimeout:
process.env.WORKER_ENGINE_TRANSACTION_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.transactionTimeout,
idleInTransactionSessionTimeout:
process.env.WORKER_ENGINE_IDLE_IN_TRANSACTION_SESSION_TIMEOUT ??
DEFAULT_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout,
};
function workerEngineTimeouts(config?: WorkerConfig): EngineTimeouts {
return config?.workerEngineTimeouts ?? DEFAULT_ENGINE_TIMEOUTS;
}

const WORKER_ENGINE_TIMEOUT_ATTRIBUTES = {
"db.statement_timeout": WORKER_ENGINE_TIMEOUTS.statementTimeout,
"db.lock_timeout": WORKER_ENGINE_TIMEOUTS.lockTimeout,
"db.transaction_timeout": WORKER_ENGINE_TIMEOUTS.transactionTimeout,
"db.idle_in_transaction_session_timeout":
WORKER_ENGINE_TIMEOUTS.idleInTransactionSessionTimeout,
};
function workerEngineTimeoutAttributes(timeouts: EngineTimeouts) {
return {
"db.statement_timeout": timeouts.statementTimeout,
"db.lock_timeout": timeouts.lockTimeout,
"db.transaction_timeout": timeouts.transactionTimeout,
"db.idle_in_transaction_session_timeout":
timeouts.idleInTransactionSessionTimeout,
};
}

function asError(error: unknown): Error {
return error instanceof Error ? error : new Error(String(error));
Expand All @@ -49,11 +40,13 @@ export async function pruneQueue(
sql: SQL,
target: EngineTarget,
retention: string,
config?: WorkerConfig,
): Promise<number> {
const { schema, shard } = target;
const timeouts = workerEngineTimeouts(config);
return sql.begin(async (tx) => {
await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`);
await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS);
await setLocalEngineTimeouts(tx, timeouts);
await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`);
await tx.unsafe("SET LOCAL ROLE me_embed");
const rows = (await tx.unsafe(
Expand Down Expand Up @@ -85,12 +78,14 @@ export async function processBatch(
const { schema, shard } = target;
const batchSize = config.batchSize ?? 10;
const lockDuration = config.lockDuration ?? "5 minutes";
const timeouts = workerEngineTimeouts(config);
const timeoutAttributes = workerEngineTimeoutAttributes(timeouts);

// --- Claim ---
const claimStart = performance.now();
const claimed = await sql.begin(async (tx) => {
await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`);
await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS);
await setLocalEngineTimeouts(tx, timeouts);
await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`);
await tx.unsafe("SET LOCAL ROLE me_embed");
return tx.unsafe(
Expand All @@ -113,7 +108,7 @@ export async function processBatch(
"batch.claim_duration_ms": claimDurationMs,
"batch.memoryIds": claimed.map((r) => r.memory_id),
"batch.queueIds": claimed.map((r) => r.queue_id),
...WORKER_ENGINE_TIMEOUT_ATTRIBUTES,
...timeoutAttributes,
});

// Process claimed items with telemetry
Expand All @@ -127,7 +122,7 @@ export async function processBatch(
"batch.claim_duration_ms": claimDurationMs,
"batch.memoryIds": claimed.map((r) => r.memory_id),
"batch.queueIds": claimed.map((r) => r.queue_id),
...WORKER_ENGINE_TIMEOUT_ATTRIBUTES,
...timeoutAttributes,
},
callback: async () => {
// --- Embed ---
Expand All @@ -145,7 +140,7 @@ export async function processBatch(
// and should not consume max_attempts
await sql.begin(async (tx) => {
await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`);
await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS);
await setLocalEngineTimeouts(tx, timeouts);
await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`);
await tx.unsafe("SET LOCAL ROLE me_embed");
for (const row of claimed) {
Expand Down Expand Up @@ -184,14 +179,14 @@ export async function processBatch(
"batch.size": claimed.length,
"batch.embed_successes": embedResults.filter((r) => !r.error).length,
"batch.embed_errors": embedResults.filter((r) => r.error).length,
...WORKER_ENGINE_TIMEOUT_ATTRIBUTES,
...timeoutAttributes,
},
callback: async () => {
for (const row of claimed) {
try {
await sql.begin(async (tx) => {
await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`);
await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS);
await setLocalEngineTimeouts(tx, timeouts);
await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`);
await tx.unsafe("SET LOCAL ROLE me_embed");

Expand Down Expand Up @@ -258,7 +253,7 @@ export async function processBatch(
try {
await sql.begin(async (tx) => {
await tx.unsafe(`SET LOCAL pgdog.shard TO ${shard}`);
await setLocalEngineTimeouts(tx, WORKER_ENGINE_TIMEOUTS);
await setLocalEngineTimeouts(tx, timeouts);
await tx.unsafe(`SET LOCAL search_path TO ${schema}, public`);
await tx.unsafe("SET LOCAL ROLE me_embed");
await tx.unsafe(
Expand Down
3 changes: 3 additions & 0 deletions packages/worker/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EmbeddingConfig } from "@memory.build/embedding";
import type { EngineTimeouts } from "@memory.build/engine/ops/_tx";

export interface EngineTarget {
schema: string;
Expand All @@ -19,6 +20,8 @@ export interface WorkerConfig {
maxBackoffMs?: number;
/** How often to re-discover engines (default: 60_000ms) */
refreshIntervalMs?: number;
/** PostgreSQL transaction/session timeouts for worker engine DB work */
workerEngineTimeouts?: EngineTimeouts;
/** Exit gracefully after this much idle time (optional) */
drainTimeoutMs?: number;
/**
Expand Down
7 changes: 6 additions & 1 deletion packages/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ async function run(
// terminal queue rows. Best-effort: failures are logged but do
// not trigger the worker error backoff path.
try {
const pruned = await pruneQueue(sql, target, pruneRetention);
const pruned = await pruneQueue(
sql,
target,
pruneRetention,
config,
);
stats.totalPruned += pruned;
} catch (pruneError) {
if (isMissingSchemaError(pruneError)) {
Expand Down