Skip to content

Commit d2db0c2

Browse files
Fix module commandresult event cleanup during unsubscribe and module unload (valkey-io#3545)
This follows up on the commandresult API work and fixes cleanup around unsubscribe and module unload. The main issue was that command-result event listeners could leave stale state behind. On unload, we removed the listeners themselves but didn’t fully update the fast-path listener counters. Separately, unsubscribing with a NULL callback could behave badly if the listener wasn’t present anymore. In practice, that meant later commands could still walk into command-result event handling after the module was supposed to be cleaned up. Failed in Daily as well yesterday: https://github.com/valkey-io/valkey/actions/runs/24753491944/job/72421581610#step:10:852 Related Failures: valkey-io#2936 (comment) --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
1 parent cb2cfdd commit d2db0c2

3 files changed

Lines changed: 39 additions & 21 deletions

File tree

src/module.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12718,6 +12718,8 @@ int VM_SubscribeToServerEvent(ValkeyModuleCtx *ctx, ValkeyModuleEvent event, Val
1271812718
return VALKEYMODULE_OK;
1271912719
}
1272012720

12721+
if (callback == NULL) return VALKEYMODULE_OK;
12722+
1272112723
/* No event found, we need to add a new one. */
1272212724
el = zmalloc(sizeof(*el));
1272312725
el->module = ctx->module;
@@ -12877,11 +12879,19 @@ void moduleUnsubscribeAllServerEvents(ValkeyModule *module) {
1287712879
ValkeyModuleEventListener *el;
1287812880
listIter li;
1287912881
listNode *ln;
12880-
listRewind(ValkeyModule_EventListeners, &li);
1288112882

12883+
listRewind(ValkeyModule_EventListeners, &li);
1288212884
while ((ln = listNext(&li))) {
1288312885
el = ln->value;
1288412886
if (el->module == module) {
12887+
if (el->event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS)
12888+
commandResultSuccessListeners--;
12889+
else if (el->event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE)
12890+
commandResultFailureListeners--;
12891+
else if (el->event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED)
12892+
commandResultRejectedListeners--;
12893+
else if (el->event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED)
12894+
commandResultACLRejectedListeners--;
1288512895
listDelNode(ValkeyModule_EventListeners, ln);
1288612896
zfree(el);
1288712897
}

tests/modules/commandresult.c

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ static int log_count = 0;
6262
#define MODE_ACL_REJECTED 0x8
6363
static int subscription_mode = 0;
6464

65+
static void ResetState(void) {
66+
memset(&stats, 0, sizeof(stats));
67+
memset(result_log, 0, sizeof(result_log));
68+
log_head = 0;
69+
log_count = 0;
70+
subscription_mode = 0;
71+
}
72+
6573
/* Add entry to circular log */
6674
void LogResult(const char *cmd_name, int status, uint64_t subevent,
6775
long long duration, long long dirty,
@@ -243,30 +251,23 @@ int CmdResultRegister_ValkeyCommand(ValkeyModuleCtx *ctx,
243251
int CmdResultUnsubscribe_ValkeyCommand(ValkeyModuleCtx *ctx,
244252
ValkeyModuleString **argv, int argc) {
245253
VALKEYMODULE_NOT_USED(argv);
254+
int had_subscription = (subscription_mode != 0);
246255

247256
if (argc != 1) {
248257
return ValkeyModule_WrongArity(ctx);
249258
}
250259

251-
if (subscription_mode == 0) {
260+
ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
261+
ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultFailure, NULL);
262+
ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultRejected, NULL);
263+
ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultACLRejected, NULL);
264+
subscription_mode = 0;
265+
266+
if (!had_subscription) {
252267
return ValkeyModule_ReplyWithError(
253268
ctx, "ERR not subscribed to command result events");
254269
}
255270

256-
if (subscription_mode & MODE_SUCCESS)
257-
ValkeyModule_SubscribeToServerEvent(
258-
ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
259-
if (subscription_mode & MODE_FAILURE)
260-
ValkeyModule_SubscribeToServerEvent(
261-
ctx, ValkeyModuleEvent_CommandResultFailure, NULL);
262-
if (subscription_mode & MODE_REJECTED)
263-
ValkeyModule_SubscribeToServerEvent(
264-
ctx, ValkeyModuleEvent_CommandResultRejected, NULL);
265-
if (subscription_mode & MODE_ACL_REJECTED)
266-
ValkeyModule_SubscribeToServerEvent(
267-
ctx, ValkeyModuleEvent_CommandResultACLRejected, NULL);
268-
269-
subscription_mode = 0;
270271
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
271272
}
272273

@@ -440,6 +441,8 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
440441
VALKEYMODULE_NOT_USED(argv);
441442
VALKEYMODULE_NOT_USED(argc);
442443

444+
ResetState();
445+
443446
if (ValkeyModule_Init(ctx, "commandresult", 1, VALKEYMODULE_APIVER_1) ==
444447
VALKEYMODULE_ERR) {
445448
return VALKEYMODULE_ERR;

tests/unit/moduleapi/commandresult.tcl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,9 @@ start_server {tags {"modules"}} {
7171
cleanup_callback
7272
r cmdresult.register all
7373

74-
r cmdresult.success
75-
r ping
74+
r eval {local sum = 0; for i = 1, 100000 do sum = sum + i end; return sum} 0
7675

7776
set stats [r cmdresult.stats]
78-
# Duration should be > 0 microseconds
7977
assert {[dict get $stats total_duration_us] > 0}
8078

8179
r cmdresult.unsubscribe
@@ -267,11 +265,15 @@ start_server {tags {"modules"}} {
267265
set stats [r cmdresult.stats]
268266
assert {[dict get $stats total_callbacks] >= 2}
269267

270-
# Unload module while subscription is still active
268+
# Unload module while subscription is still active.
271269
assert_equal {OK} [r module unload commandresult]
272270

273-
# Reload module for remaining tests
271+
# Unsubscribing after reload has no matching listener. It must not add
272+
# NULL callbacks that would be invoked by later command result events.
274273
r module load $testmodule
274+
catch {r cmdresult.unsubscribe} err
275+
assert_match {*not subscribed*} $err
276+
assert_equal {PONG} [r ping]
275277
}
276278

277279
test {Module commandresult - Multiple callbacks from different operations} {
@@ -627,6 +629,9 @@ start_server {tags {"modules"}} {
627629

628630
test {Module commandresult - rejected: command not allowed in Pub/Sub context (PUBSUB)} {
629631
cleanup_callback
632+
if {$::force_resp3} {
633+
skip "RESP3 Pub/Sub clients may issue arbitrary commands"
634+
}
630635
r cmdresult.register rejected
631636

632637
set rd [valkey_deferring_client]

0 commit comments

Comments
 (0)