Skip to content

Commit 20dd3e2

Browse files
committed
Add concurrent/multi-threaded tests for thread-safety verification
Closes #50 Adds four new tests exercising simultaneous get/set/delete/clear operations from multiple threads on shared Storage and Cache instances, including scenarios with an active TTL expiration thread. Made-with: Cursor
1 parent 0a4fe42 commit 20dd3e2

2 files changed

Lines changed: 123 additions & 0 deletions

File tree

tests/test_cache.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import time
2+
from concurrent.futures import ThreadPoolExecutor, as_completed
23
from typing import Any, cast
34

45
import pytest
@@ -381,3 +382,65 @@ def test_cache_default_ttl_sentinel():
381382

382383
time.sleep(0.2)
383384
assert cache.get("key2") == "value2" # Should still be there
385+
386+
387+
def test_cache_concurrent_get_set_delete_clear():
388+
"""Multiple threads performing mixed operations on a shared Cache must not raise exceptions."""
389+
num_threads = 10
390+
num_operations = 200
391+
cache = Cache(
392+
max_items=50,
393+
size_limit_in_bytes=None,
394+
default_ttl=None,
395+
expiration_thread_max_checks_per_iteration=0,
396+
)
397+
398+
def worker(thread_id: int) -> None:
399+
for i in range(num_operations):
400+
key = f"key{i % 20}"
401+
op = (thread_id * num_operations + i) % 4
402+
if op == 0:
403+
cache.set(key, f"value-{thread_id}-{i}")
404+
elif op == 1:
405+
result = cache.get(key)
406+
assert result is CACHE_MISS or isinstance(result, str)
407+
elif op == 2:
408+
cache.delete(key)
409+
else:
410+
cache.clear()
411+
412+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
413+
futures = [executor.submit(worker, t) for t in range(num_threads)]
414+
for future in as_completed(futures):
415+
future.result()
416+
417+
assert cache.number_of_items <= 50
418+
cache.close()
419+
420+
421+
def test_cache_concurrent_with_ttl():
422+
"""Concurrent Cache access with TTL expiration thread active must not corrupt state."""
423+
num_threads = 8
424+
num_operations = 100
425+
cache = Cache(
426+
max_items=100,
427+
default_ttl=0.05,
428+
expiration_thread_delay=0.01,
429+
expiration_thread_max_checks_per_iteration=50,
430+
)
431+
432+
def worker(thread_id: int) -> None:
433+
for i in range(num_operations):
434+
key = f"key{i % 10}"
435+
if i % 2 == 0:
436+
cache.set(key, f"v{thread_id}-{i}")
437+
else:
438+
result = cache.get(key)
439+
assert result is CACHE_MISS or isinstance(result, str)
440+
441+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
442+
futures = [executor.submit(worker, t) for t in range(num_threads)]
443+
for future in as_completed(futures):
444+
future.result()
445+
446+
cache.close(wait=True)

tests/test_storage.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import random
22
import time
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
34

45
import pytest
56

@@ -310,6 +311,65 @@ def test_overwrite_existing_key_size_tracking():
310311
storage.close()
311312

312313

314+
def test_storage_concurrent_get_set_delete():
315+
"""Multiple threads performing get/set/delete on a shared Storage must not raise exceptions."""
316+
num_threads = 10
317+
num_operations = 200
318+
storage = Storage[bytes](
319+
size_limit_in_bytes=None,
320+
max_items=50,
321+
expiration_thread_max_checks_per_iteration=0,
322+
)
323+
324+
def worker(thread_id: int) -> None:
325+
for i in range(num_operations):
326+
key = f"key{i % 20}"
327+
op = (thread_id * num_operations + i) % 3
328+
if op == 0:
329+
storage.set(key, f"value-{thread_id}-{i}".encode())
330+
elif op == 1:
331+
storage.get(key)
332+
else:
333+
storage.delete(key)
334+
335+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
336+
futures = [executor.submit(worker, t) for t in range(num_threads)]
337+
for future in as_completed(futures):
338+
future.result()
339+
340+
assert storage.number_of_items <= 50
341+
storage.close()
342+
343+
344+
def test_storage_concurrent_with_ttl():
345+
"""Concurrent get/set with a live expiration thread must not corrupt state."""
346+
num_threads = 8
347+
num_operations = 100
348+
storage = Storage[bytes](
349+
size_limit_in_bytes=None,
350+
max_items=100,
351+
default_ttl=0.05,
352+
expiration_thread_delay=0.01,
353+
expiration_thread_max_checks_per_iteration=50,
354+
)
355+
356+
def worker(thread_id: int) -> None:
357+
for i in range(num_operations):
358+
key = f"key{i % 10}"
359+
if i % 2 == 0:
360+
storage.set(key, f"v{thread_id}-{i}".encode())
361+
else:
362+
result = storage.get(key)
363+
assert result is CACHE_MISS or isinstance(result, bytes)
364+
365+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
366+
futures = [executor.submit(worker, t) for t in range(num_threads)]
367+
for future in as_completed(futures):
368+
future.result()
369+
370+
storage.close(wait=True)
371+
372+
313373
def test_clear():
314374
"""Test that clear() removes all items and resets size tracking."""
315375
storage = Storage[bytes](

0 commit comments

Comments
 (0)