Skip to content

Commit 864de55

Browse files
authored
Make ValkeyModule_Call compatible with calling commands from scripting engines (valkey-io#2782)
Note: these changes are another step towards being able to run Lua engine as an external scripting engine module. In this commit we improve the `ValkeyModule_Call` API function code to match the validations and behavior of the `scriptCall` function, currently used by the Lua engine to run commands using `server.call` Lua Valkey API. The changes made are backward compatible. The new behavior/validations are only enabled when calling `ValkeyModule_Call` while running a script using `EVAL` or `FCALL`. To test these changes, we improved the `HELLO` dummy scripting engine module to support calling commands, and compare the behavior with calling the same command from a Lua script. Signed-off-by: Ricardo Dias <ricardo.dias@percona.com>
1 parent ea103da commit 864de55

5 files changed

Lines changed: 559 additions & 117 deletions

File tree

src/module.c

Lines changed: 129 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6382,6 +6382,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
63826382
int argc = 0, flags = 0;
63836383
va_list ap;
63846384
ValkeyModuleCallReply *reply = NULL;
6385+
sds reply_error_msg = NULL;
63856386
int replicate = 0; /* Replicate this command? */
63866387
int error_as_call_replies = 0; /* return errors as ValkeyModuleCallReply object */
63876388
uint64_t cmd_flags;
@@ -6419,8 +6420,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
64196420
if (!user) {
64206421
errno = ENOTSUP;
64216422
if (error_as_call_replies) {
6422-
sds msg = sdsnew("cannot run as user, no user directly attached to context or context's client");
6423-
reply = callReplyCreateError(msg, ctx);
6423+
reply_error_msg = sdsnew("cannot run as user, no user directly attached to context or context's client");
64246424
}
64256425
goto cleanup;
64266426
}
@@ -6445,30 +6445,40 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
64456445
* if necessary.
64466446
*/
64476447
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv, c->argc);
6448-
sds err;
6449-
if (!commandCheckExistence(c, error_as_call_replies ? &err : NULL)) {
6448+
if (!commandCheckExistence(c, error_as_call_replies ? &reply_error_msg : NULL)) {
64506449
errno = ENOENT;
6451-
if (error_as_call_replies) reply = callReplyCreateError(err, ctx);
64526450
goto cleanup;
64536451
}
6454-
if (!commandCheckArity(c->cmd, c->argc, error_as_call_replies ? &err : NULL)) {
6452+
if (!commandCheckArity(c->cmd, c->argc, error_as_call_replies ? &reply_error_msg : NULL)) {
64556453
errno = EINVAL;
6456-
if (error_as_call_replies) reply = callReplyCreateError(err, ctx);
64576454
goto cleanup;
64586455
}
64596456

64606457
cmd_flags = getCommandFlags(c);
64616458

64626459
if (flags & VALKEYMODULE_ARGV_SCRIPT_MODE) {
6463-
/* Basically on script mode we want to only allow commands that can
6464-
* be executed on scripts (CMD_NOSCRIPT is not set on the command flags) */
6465-
if (cmd_flags & CMD_NOSCRIPT) {
6466-
errno = ESPIPE;
6467-
if (error_as_call_replies) {
6468-
sds msg = sdscatfmt(sdsempty(), "command '%S' is not allowed on script mode", c->cmd->fullname);
6469-
reply = callReplyCreateError(msg, ctx);
6460+
if (scriptIsRunning()) {
6461+
c->flag.module = 0;
6462+
c->flag.script = 1;
6463+
}
6464+
6465+
/* In script mode, commands with CMD_NOSCRIPT flag are normally forbidden.
6466+
* However, we allow them if both conditions are met:
6467+
* 1. We're running in the context of a scripting engine (scriptIsRunning())
6468+
* 2. The configuration option server.script_disable_deny_script is enabled
6469+
* If either condition is false, we block the command. */
6470+
if ((cmd_flags & CMD_NOSCRIPT)) {
6471+
if (!scriptIsRunning() || !server.script_disable_deny_script) {
6472+
errno = ESPIPE;
6473+
if (error_as_call_replies) {
6474+
reply_error_msg = sdscatfmt(sdsempty(), "command '%S' is not allowed on script mode", c->cmd->fullname);
6475+
}
6476+
goto cleanup;
64706477
}
6471-
goto cleanup;
6478+
}
6479+
6480+
if (scriptIsRunning() && scriptAllowsOOM()) {
6481+
flags &= ~VALKEYMODULE_ARGV_RESPECT_DENY_OOM;
64726482
}
64736483
}
64746484

@@ -6486,8 +6496,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
64866496
if (oom_state) {
64876497
errno = ENOSPC;
64886498
if (error_as_call_replies) {
6489-
sds msg = sdsdup(shared.oomerr->ptr);
6490-
reply = callReplyCreateError(msg, ctx);
6499+
reply_error_msg = sdsdup(shared.oomerr->ptr);
64916500
}
64926501
goto cleanup;
64936502
}
@@ -6501,60 +6510,10 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
65016510
if (cmd_flags & CMD_WRITE) {
65026511
errno = ENOSPC;
65036512
if (error_as_call_replies) {
6504-
sds msg = sdscatfmt(sdsempty(),
6505-
"Write command '%S' was "
6506-
"called while write is not allowed.",
6507-
c->cmd->fullname);
6508-
reply = callReplyCreateError(msg, ctx);
6509-
}
6510-
goto cleanup;
6511-
}
6512-
}
6513-
6514-
/* Script mode tests */
6515-
if (flags & VALKEYMODULE_ARGV_SCRIPT_MODE) {
6516-
if (cmd_flags & CMD_WRITE) {
6517-
/* on script mode, if a command is a write command,
6518-
* We will not run it if we encounter disk error
6519-
* or we do not have enough replicas */
6520-
6521-
if (!checkGoodReplicasStatus()) {
6522-
errno = ESPIPE;
6523-
if (error_as_call_replies) {
6524-
sds msg = sdsdup(shared.noreplicaserr->ptr);
6525-
reply = callReplyCreateError(msg, ctx);
6526-
}
6527-
goto cleanup;
6528-
}
6529-
6530-
int deny_write_type = writeCommandsDeniedByDiskError();
6531-
int obey_client = (server.current_client && mustObeyClient(server.current_client));
6532-
6533-
if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
6534-
errno = ESPIPE;
6535-
if (error_as_call_replies) {
6536-
sds msg = writeCommandsGetDiskErrorMessage(deny_write_type);
6537-
reply = callReplyCreateError(msg, ctx);
6538-
}
6539-
goto cleanup;
6540-
}
6541-
6542-
if (server.primary_host && server.repl_replica_ro && !obey_client) {
6543-
errno = ESPIPE;
6544-
if (error_as_call_replies) {
6545-
sds msg = sdsdup(shared.roreplicaerr->ptr);
6546-
reply = callReplyCreateError(msg, ctx);
6547-
}
6548-
goto cleanup;
6549-
}
6550-
}
6551-
6552-
if (server.primary_host && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 &&
6553-
!(cmd_flags & CMD_STALE)) {
6554-
errno = ESPIPE;
6555-
if (error_as_call_replies) {
6556-
sds msg = sdsdup(shared.primarydownerr->ptr);
6557-
reply = callReplyCreateError(msg, ctx);
6513+
reply_error_msg = sdscatfmt(sdsempty(),
6514+
"Write command '%S' was "
6515+
"called while write is not allowed.",
6516+
c->cmd->fullname);
65586517
}
65596518
goto cleanup;
65606519
}
@@ -6577,9 +6536,8 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
65776536
if (error_as_call_replies) {
65786537
/* verbosity should be same as processCommand() in server.c */
65796538
sds acl_msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
6580-
sds msg = sdscatfmt(sdsempty(), "-NOPERM %S\r\n", acl_msg);
6539+
reply_error_msg = sdscatfmt(sdsempty(), "-NOPERM %S\r\n", acl_msg);
65816540
sdsfree(acl_msg);
6582-
reply = callReplyCreateError(msg, ctx);
65836541
}
65846542
errno = EACCES;
65856543
goto cleanup;
@@ -6596,31 +6554,109 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
65966554
c->flag.asking = ctx->client->flag.asking;
65976555
c->slot = clusterSlotByCommand(c->cmd, c->argv, c->argc, &c->read_flags);
65986556
if (getNodeByQuery(c, &error_code) != getMyClusterNode()) {
6599-
sds msg = NULL;
6557+
serverAssert(reply_error_msg == NULL);
66006558
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
66016559
if (error_as_call_replies) {
6602-
msg = sdscatfmt(sdsempty(),
6603-
"Can not execute a write command '%S' while the cluster is down and readonly",
6604-
c->cmd->fullname);
6560+
reply_error_msg = sdscatfmt(sdsempty(),
6561+
"Can not execute a write command '%S' while the cluster is down and readonly",
6562+
c->cmd->fullname);
66056563
}
66066564
errno = EROFS;
66076565
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
66086566
if (error_as_call_replies) {
6609-
msg = sdscatfmt(sdsempty(), "Can not execute a command '%S' while the cluster is down",
6610-
c->cmd->fullname);
6567+
reply_error_msg = sdscatfmt(sdsempty(), "Can not execute a command '%S' while the cluster is down",
6568+
c->cmd->fullname);
66116569
}
66126570
errno = ENETDOWN;
66136571
} else {
66146572
if (error_as_call_replies) {
6615-
msg = sdsnew("Attempted to access a non local key in a cluster node");
6573+
reply_error_msg = sdsnew("Attempted to access a non local key in a cluster node");
66166574
}
66176575
errno = EPERM;
66186576
}
6619-
if (msg) {
6620-
reply = callReplyCreateError(msg, ctx);
6577+
goto cleanup;
6578+
}
6579+
}
6580+
6581+
/* Script mode tests */
6582+
if (flags & VALKEYMODULE_ARGV_SCRIPT_MODE) {
6583+
/* A write command, on an RO command or an RO script is rejected ASAP.
6584+
* Note: For scripts, we consider may-replicate commands as write commands.
6585+
* This also makes it possible to allow read-only scripts to be run during
6586+
* CLIENT PAUSE WRITE. */
6587+
if (scriptIsRunning() && scriptIsReadOnly() && (cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE))) {
6588+
errno = ENOSPC;
6589+
reply_error_msg = sdsnew("Write commands are not allowed from read-only scripts");
6590+
goto cleanup;
6591+
}
6592+
6593+
/* If the script already made a modification to the dataset, we can't
6594+
* fail it on unpredictable error state. */
6595+
if ((scriptIsRunning() && !scriptIsWriteDirty() && cmd_flags & CMD_WRITE) || (!scriptIsRunning() && cmd_flags & CMD_WRITE)) {
6596+
/* on script mode, if a command is a write command,
6597+
* We will not run it if we encounter disk error
6598+
* or we do not have enough replicas */
6599+
6600+
if (!checkGoodReplicasStatus()) {
6601+
errno = ESPIPE;
6602+
if (error_as_call_replies) {
6603+
reply_error_msg = sdsdup(shared.noreplicaserr->ptr);
6604+
}
6605+
goto cleanup;
6606+
}
6607+
6608+
int deny_write_type = writeCommandsDeniedByDiskError();
6609+
int obey_client = (server.current_client && mustObeyClient(server.current_client));
6610+
6611+
if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
6612+
errno = ESPIPE;
6613+
if (error_as_call_replies) {
6614+
reply_error_msg = writeCommandsGetDiskErrorMessage(deny_write_type);
6615+
}
6616+
goto cleanup;
6617+
}
6618+
6619+
if (server.primary_host && server.repl_replica_ro && !obey_client) {
6620+
errno = ESPIPE;
6621+
if (error_as_call_replies) {
6622+
reply_error_msg = sdsdup(shared.roreplicaerr->ptr);
6623+
}
6624+
goto cleanup;
6625+
}
6626+
6627+
if (scriptIsRunning()) {
6628+
scriptSetWriteDirtyFlag();
6629+
}
6630+
}
6631+
6632+
if (server.primary_host && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 &&
6633+
!(cmd_flags & CMD_STALE)) {
6634+
errno = ESPIPE;
6635+
if (error_as_call_replies) {
6636+
if (scriptIsRunning()) {
6637+
reply_error_msg = sdsnew("Can not execute the command on a stale replica");
6638+
} else {
6639+
reply_error_msg = sdsdup(shared.primarydownerr->ptr);
6640+
}
66216641
}
66226642
goto cleanup;
66236643
}
6644+
6645+
if (scriptIsRunning() && server.cluster_enabled && !mustObeyClient(ctx->client)) {
6646+
if (c->slot != -1 && !scriptAllowsCrossSlot()) {
6647+
if (scriptGetSlot() == -1) {
6648+
scriptSetSlot(c->slot);
6649+
} else if (scriptGetSlot() != c->slot) {
6650+
errno = ESPIPE;
6651+
if (error_as_call_replies) {
6652+
reply_error_msg = sdsnew("Script attempted to access keys that do not hash to the same slot");
6653+
}
6654+
goto cleanup;
6655+
}
6656+
}
6657+
6658+
scriptSetOriginalClientSlot(c->slot);
6659+
}
66246660
}
66256661

66266662
if (flags & VALKEYMODULE_ARGV_DRY_RUN) {
@@ -6646,6 +6682,8 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
66466682
server.replication_allowed = prev_replication_allowed;
66476683

66486684
if (c->flag.blocked) {
6685+
/* Blocking commands are not allowed when calling commands in scripting engines. */
6686+
serverAssert(!scriptIsRunning());
66496687
serverAssert(flags & VALKEYMODULE_ARGV_ALLOW_BLOCK);
66506688
serverAssert(ctx->module);
66516689
ValkeyModuleAsyncRMCallPromise *promise = zmalloc(sizeof(ValkeyModuleAsyncRMCallPromise));
@@ -6675,6 +6713,15 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
66756713
}
66766714

66776715
cleanup:
6716+
if ((flags & VALKEYMODULE_ARGV_SCRIPT_MODE) && errno) {
6717+
afterErrorReply(c, reply_error_msg, sdslen(reply_error_msg), 0);
6718+
incrCommandStatsOnError(c->cmd, ERROR_COMMAND_REJECTED);
6719+
}
6720+
if (reply_error_msg != NULL) {
6721+
serverAssert(reply == NULL);
6722+
reply = callReplyCreateError(reply_error_msg, ctx);
6723+
}
6724+
66786725
if (reply) autoMemoryAdd(ctx, VALKEYMODULE_AM_REPLY, reply);
66796726
if (ctx->module) ctx->module->in_call--;
66806727
if (c) moduleReleaseTempClient(c);

src/script.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,3 +597,43 @@ long long scriptRunDuration(void) {
597597
serverAssert(scriptIsRunning());
598598
return elapsedMs(curr_run_ctx->start_time);
599599
}
600+
601+
int scriptAllowsOOM(void) {
602+
serverAssert(scriptIsRunning());
603+
return curr_run_ctx->flags & SCRIPT_ALLOW_OOM;
604+
}
605+
606+
int scriptIsReadOnly(void) {
607+
serverAssert(scriptIsRunning());
608+
return curr_run_ctx->flags & SCRIPT_READ_ONLY;
609+
}
610+
611+
int scriptIsWriteDirty(void) {
612+
serverAssert(scriptIsRunning());
613+
return curr_run_ctx->flags & SCRIPT_WRITE_DIRTY;
614+
}
615+
616+
void scriptSetWriteDirtyFlag(void) {
617+
serverAssert(scriptIsRunning());
618+
curr_run_ctx->flags |= SCRIPT_WRITE_DIRTY;
619+
}
620+
621+
int scriptAllowsCrossSlot(void) {
622+
serverAssert(scriptIsRunning());
623+
return curr_run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT;
624+
}
625+
626+
int scriptGetSlot(void) {
627+
serverAssert(scriptIsRunning());
628+
return curr_run_ctx->slot;
629+
}
630+
631+
void scriptSetSlot(int slot) {
632+
serverAssert(scriptIsRunning());
633+
curr_run_ctx->slot = slot;
634+
}
635+
636+
void scriptSetOriginalClientSlot(int slot) {
637+
serverAssert(scriptIsRunning());
638+
curr_run_ctx->original_client->slot = slot;
639+
}

src/script.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,13 @@ client *scriptGetClient(void);
116116
client *scriptGetCaller(void);
117117
long long scriptRunDuration(void);
118118

119+
int scriptAllowsOOM(void);
120+
int scriptIsReadOnly(void);
121+
int scriptIsWriteDirty(void);
122+
void scriptSetWriteDirtyFlag(void);
123+
int scriptAllowsCrossSlot(void);
124+
int scriptGetSlot(void);
125+
void scriptSetSlot(int slot);
126+
void scriptSetOriginalClientSlot(int slot);
127+
119128
#endif /* __SCRIPT_H_ */

0 commit comments

Comments
 (0)