Skip to content

Commit f214286

Browse files
authored
Add concurrent/multi-threaded tests for thread-safety verification (#59)
Closes #50 Adds four new tests exercising simultaneous get/set/delete/clear operations from multiple threads on shared Storage and Cache instances, including scenarios with an active TTL expiration thread. Made-with: Cursor
1 parent cc68793 commit f214286

2 files changed

Lines changed: 123 additions & 0 deletions

File tree

tests/test_cache.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import time
2+
from concurrent.futures import ThreadPoolExecutor, as_completed
23
from typing import Any, cast
34

45
import pytest
@@ -398,3 +399,65 @@ def test_cache_default_ttl_sentinel():
398399
time.sleep(0.2)
399400
assert cache.get("key2") == "value2" # Should still be there
400401
cache.close()
402+
403+
404+
def test_cache_concurrent_get_set_delete_clear():
405+
"""Multiple threads performing mixed operations on a shared Cache must not raise exceptions."""
406+
num_threads = 10
407+
num_operations = 200
408+
cache = Cache(
409+
max_items=50,
410+
size_limit_in_bytes=None,
411+
default_ttl=None,
412+
expiration_thread_max_checks_per_iteration=0,
413+
)
414+
415+
def worker(thread_id: int) -> None:
416+
for i in range(num_operations):
417+
key = f"key{i % 20}"
418+
op = (thread_id * num_operations + i) % 4
419+
if op == 0:
420+
cache.set(key, f"value-{thread_id}-{i}")
421+
elif op == 1:
422+
result = cache.get(key)
423+
assert result is CACHE_MISS or isinstance(result, str)
424+
elif op == 2:
425+
cache.delete(key)
426+
else:
427+
cache.clear()
428+
429+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
430+
futures = [executor.submit(worker, t) for t in range(num_threads)]
431+
for future in as_completed(futures):
432+
future.result()
433+
434+
assert cache.number_of_items <= 50
435+
cache.close()
436+
437+
438+
def test_cache_concurrent_with_ttl():
439+
"""Concurrent Cache access with TTL expiration thread active must not corrupt state."""
440+
num_threads = 8
441+
num_operations = 100
442+
cache = Cache(
443+
max_items=100,
444+
default_ttl=0.05,
445+
expiration_thread_delay=0.01,
446+
expiration_thread_max_checks_per_iteration=50,
447+
)
448+
449+
def worker(thread_id: int) -> None:
450+
for i in range(num_operations):
451+
key = f"key{i % 10}"
452+
if i % 2 == 0:
453+
cache.set(key, f"v{thread_id}-{i}")
454+
else:
455+
result = cache.get(key)
456+
assert result is CACHE_MISS or isinstance(result, str)
457+
458+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
459+
futures = [executor.submit(worker, t) for t in range(num_threads)]
460+
for future in as_completed(futures):
461+
future.result()
462+
463+
cache.close(wait=True)

tests/test_storage.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import random
22
import time
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
34

45
import pytest
56

@@ -314,6 +315,65 @@ def test_overwrite_existing_key_size_tracking():
314315
storage.close()
315316

316317

318+
def test_storage_concurrent_get_set_delete():
319+
"""Multiple threads performing get/set/delete on a shared Storage must not raise exceptions."""
320+
num_threads = 10
321+
num_operations = 200
322+
storage = Storage[bytes](
323+
size_limit_in_bytes=None,
324+
max_items=50,
325+
expiration_thread_max_checks_per_iteration=0,
326+
)
327+
328+
def worker(thread_id: int) -> None:
329+
for i in range(num_operations):
330+
key = f"key{i % 20}"
331+
op = (thread_id * num_operations + i) % 3
332+
if op == 0:
333+
storage.set(key, f"value-{thread_id}-{i}".encode())
334+
elif op == 1:
335+
storage.get(key)
336+
else:
337+
storage.delete(key)
338+
339+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
340+
futures = [executor.submit(worker, t) for t in range(num_threads)]
341+
for future in as_completed(futures):
342+
future.result()
343+
344+
assert storage.number_of_items <= 50
345+
storage.close()
346+
347+
348+
def test_storage_concurrent_with_ttl():
349+
"""Concurrent get/set with a live expiration thread must not corrupt state."""
350+
num_threads = 8
351+
num_operations = 100
352+
storage = Storage[bytes](
353+
size_limit_in_bytes=None,
354+
max_items=100,
355+
default_ttl=0.05,
356+
expiration_thread_delay=0.01,
357+
expiration_thread_max_checks_per_iteration=50,
358+
)
359+
360+
def worker(thread_id: int) -> None:
361+
for i in range(num_operations):
362+
key = f"key{i % 10}"
363+
if i % 2 == 0:
364+
storage.set(key, f"v{thread_id}-{i}".encode())
365+
else:
366+
result = storage.get(key)
367+
assert result is CACHE_MISS or isinstance(result, bytes)
368+
369+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
370+
futures = [executor.submit(worker, t) for t in range(num_threads)]
371+
for future in as_completed(futures):
372+
future.result()
373+
374+
storage.close(wait=True)
375+
376+
317377
def test_clear():
318378
"""Test that clear() removes all items and resets size tracking."""
319379
storage = Storage[bytes](

0 commit comments

Comments
 (0)