forked from valkey-io/valkey
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcluster.tcl
More file actions
290 lines (242 loc) · 10.1 KB
/
cluster.tcl
File metadata and controls
290 lines (242 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# Primitive tests on cluster-enabled server with modules
source tests/support/cli.tcl
# cluster creation is complicated with TLS, and the current tests don't really need that coverage
tags {tls:skip external:skip cluster modules} {
set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]
test "Cluster module send message API - VM_SendClusterMessage" {
assert_equal OK [$node1 test.pingall]
assert_equal 2 [CI 0 cluster_stats_messages_module_sent]
wait_for_condition 50 100 {
[CI 1 cluster_stats_messages_module_received] eq 1 &&
[CI 2 cluster_stats_messages_module_received] eq 1
} else {
fail "node 2 or node 3 didn't receive cluster module message"
}
verify_log_message -1 "*DING (type 1) RECEIVED*Hey*" 0
verify_log_message -2 "*DING (type 1) RECEIVED*Hey*" 0
}
test "Cluster module receive message API - VM_RegisterClusterMessageReceiver" {
wait_for_condition 50 100 {
[CI 0 cluster_stats_messages_module_received] eq 2
} else {
fail "node 1 didn't receive DONG messages"
}
wait_for_condition 50 100 {
[count_log_message 0 "* <cluster> DONG (type 2) RECEIVED*"] eq 2
} else {
fail "node 1 didn't log DONG message twice"
}
}
}
set testmodule_nokey [file normalize tests/modules/blockonbackground.so]
set testmodule_blockedclient [file normalize tests/modules/blockedclient.so]
set testmodule [file normalize tests/modules/blockonkeys.so]
set testmodule_auth [file normalize tests/modules/auth.so]
set modules [list loadmodule $testmodule loadmodule $testmodule_nokey loadmodule $testmodule_blockedclient loadmodule $testmodule_auth]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]
set node3_pid [srv -2 pid]
test "Run blocking command (blocked on key) on cluster node3" {
# key9184688 is mapped to slot 10923 (first slot of node 3)
set node3_rd [valkey_deferring_client -2]
$node3_rd fsl.bpop key9184688 0
$node3_rd flush
wait_for_condition 50 100 {
[s -2 blocked_clients] eq {1}
} else {
fail "Client executing blocking command (blocked on key) not blocked"
}
}
test "Run blocking command (no keys) on cluster node2" {
set node2_rd [valkey_deferring_client -1]
$node2_rd block.block 0
$node2_rd flush
wait_for_condition 50 100 {
[s -1 blocked_clients] eq {1}
} else {
fail "Client executing blocking command (no keys) not blocked"
}
}
test "Perform a Resharding" {
exec src/valkey-cli --cluster-yes --cluster reshard 127.0.0.1:[srv -2 port] \
--cluster-to [$node1 cluster myid] \
--cluster-from [$node3 cluster myid] \
--cluster-slots 1
}
test "Verify command (no keys) is unaffected after resharding" {
# verify there are blocked clients on node2
assert_equal [s -1 blocked_clients] {1}
#release client
$node2 block.release 0
}
test "Verify command (blocked on key) got unblocked after resharding" {
# this (read) will wait for the node3 to realize the new topology
assert_error {*MOVED*} {$node3_rd read}
# verify there are no blocked clients
assert_equal [s 0 blocked_clients] {0}
assert_equal [s -1 blocked_clients] {0}
assert_equal [s -2 blocked_clients] {0}
}
test "Wait for cluster to be stable" {
wait_for_condition 1000 50 {
[catch {exec src/valkey-cli --cluster check 127.0.0.1:[srv 0 port]}] == 0 &&
[catch {exec src/valkey-cli --cluster check 127.0.0.1:[srv -1 port]}] == 0 &&
[catch {exec src/valkey-cli --cluster check 127.0.0.1:[srv -2 port]}] == 0 &&
[CI 0 cluster_state] eq {ok} &&
[CI 1 cluster_state] eq {ok} &&
[CI 2 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
}
test "Sanity test push cmd after resharding" {
assert_error {*MOVED*} {$node3 fsl.push key9184688 1}
set node1_rd [valkey_deferring_client 0]
$node1_rd fsl.bpop key9184688 0
$node1_rd flush
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
puts "Client not blocked"
puts "read from blocked client: [$node1_rd read]"
fail "Client not blocked"
}
$node1 fsl.push key9184688 2
assert_equal {2} [$node1_rd read]
}
$node1_rd close
$node2_rd close
$node3_rd close
test "Run blocking command (blocked on key) again on cluster node1" {
$node1 del key9184688
# key9184688 is mapped to slot 10923 which has been moved to node1
set node1_rd [valkey_deferring_client 0]
$node1_rd fsl.bpop key9184688 0
$node1_rd flush
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client executing blocking command (blocked on key) again not blocked"
}
}
test "Run blocking command (no keys) again on cluster node2" {
set node2_rd [valkey_deferring_client -1]
$node2_rd block.block 0
$node2_rd flush
wait_for_condition 50 100 {
[s -1 blocked_clients] eq {1}
} else {
fail "Client executing blocking command (no keys) again not blocked"
}
}
test "Kill a cluster node and wait for fail state" {
# kill node3 in cluster
pause_process $node3_pid
wait_for_condition 1000 50 {
[CI 0 cluster_state] eq {fail} &&
[CI 1 cluster_state] eq {fail}
} else {
fail "Cluster doesn't fail"
}
}
test "Verify command (blocked on key) got unblocked after cluster failure" {
assert_error {*CLUSTERDOWN*} {$node1_rd read}
}
test "Verify command (with no keys) is not unblocked after cluster failure" {
assert_no_match {*CLUSTERDOWN*} {$node2_rd read}
# verify there are blocked clients
assert_equal [s -1 blocked_clients] {1}
}
test "Verify command RM_Call is rejected when cluster is down" {
assert_error "ERR Can not execute a command 'set' while the cluster is down" {$node1 do_rm_call set x 1}
}
test "Verify Module Auth Succeeds when cluster is down" {
r acl setuser foo >pwd on ~* &* +@all
assert_error "*CLUSTERDOWN*" {r set x 1}
# Non Blocking Module Auth
assert_equal {OK} [r testmoduleone.rm_register_auth_cb]
assert_equal {OK} [r AUTH foo allow]
# Blocking Module Auth
assert_equal {OK} [r testmoduleone.rm_register_blocking_auth_cb]
assert_equal {OK} [r AUTH foo block_allow]
}
resume_process $node3_pid
$node1_rd close
$node2_rd close
}
set testmodule_keyspace_events [file normalize tests/modules/keyspace_events.so]
set testmodule_postnotifications "[file normalize tests/modules/postnotifications.so] with_key_events"
set modules [list loadmodule $testmodule_keyspace_events loadmodule $testmodule_postnotifications]
start_cluster 2 2 [list config_lines $modules] {
set master1 [srv 0 client]
set master2 [srv -1 client]
set replica1 [srv -2 client]
set replica2 [srv -3 client]
test "Verify keys deletion and notification effects happened on cluster slots change are replicated inside multi exec" {
$master2 set count_dels_{4oi} 1
$master2 del count_dels_{4oi}
assert_equal 1 [$master2 keyspace.get_dels]
assert_equal 1 [$replica2 keyspace.get_dels]
$master2 set count_dels_{4oi} 1
set repl [attach_to_replication_stream_on_connection -3]
$master1 cluster bumpepoch
$master1 cluster setslot 16382 node [$master1 cluster myid]
wait_for_cluster_propagation
wait_for_condition 50 100 {
[$master2 keyspace.get_dels] eq 2
} else {
fail "master did not delete the key"
}
wait_for_condition 50 100 {
[$replica2 keyspace.get_dels] eq 2
} else {
fail "replica did not increase del counter"
}
# the {lpush before_deleted count_dels_{4oi}} is a post notification job registered when 'count_dels_{4oi}' was removed
assert_replication_stream $repl {
{multi}
{unlink count_dels_{4oi}}
{keyspace.incr_dels}
{lpush before_deleted count_dels_{4oi}}
{exec}
}
close_replication_stream $repl
}
}
set testmodule [file normalize tests/modules/basics.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]
test "Verify RM_Call inside module load function on cluster mode" {
assert_equal {PONG} [$node1 PING]
assert_equal {PONG} [$node2 PING]
assert_equal {PONG} [$node3 PING]
}
}
set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]
test "VM_CALL with cluster slots" {
assert_equal [lsort [$node1 cluster slots]] [lsort [$node1 test.cluster_slots]]
assert_equal [lsort [$node2 cluster slots]] [lsort [$node2 test.cluster_slots]]
assert_equal [lsort [$node3 cluster slots]] [lsort [$node3 test.cluster_slots]]
}
test "VM_CALL with cluster shards" {
assert_equal [lsort [$node1 cluster shards]] [lsort [$node1 test.cluster_shards]]
assert_equal [lsort [$node2 cluster shards]] [lsort [$node2 test.cluster_shards]]
assert_equal [lsort [$node3 cluster shards]] [lsort [$node3 test.cluster_shards]]
}
}
} ;# end tag