From 4c1f517ac1eb5a9f4293e304a40cff8f2043fea5 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Mon, 6 Apr 2026 19:10:07 -0400 Subject: [PATCH] Add a ResourceBudget mechanism which keeps disk usage in check Artifacts stay on disk between the ArtifactDownloader stage and the ArtifactSaver stage. If too many large files build up, it can exceed the allotted filesystem space of the working directory. Previously we used unecessarily small batch sizes by default in order to ensure the worst case was avoided. This approach dynamically controls how much disk space is being used by the task and provides backpressure when the limit is exceeded, flushing batches and preventing new artifacts from being downloaded. closes #7559 Assisted-By: claude-opus-4.6 --- CHANGES/7559.feature | 1 + docs/admin/reference/settings.md | 37 ++- pulpcore/app/settings.py | 12 +- pulpcore/plugin/stages/__init__.py | 1 + pulpcore/plugin/stages/api.py | 29 ++- pulpcore/plugin/stages/artifact_stages.py | 166 +++++++++++++- pulpcore/plugin/stages/declarative_version.py | 7 +- pulpcore/plugin/stages/models.py | 8 +- .../tests/unit/stages/test_resource_budget.py | 216 ++++++++++++++++++ pulpcore/tests/unit/stages/test_stages.py | 90 ++++++++ 10 files changed, 548 insertions(+), 19 deletions(-) create mode 100644 CHANGES/7559.feature create mode 100644 pulpcore/tests/unit/stages/test_resource_budget.py diff --git a/CHANGES/7559.feature b/CHANGES/7559.feature new file mode 100644 index 0000000000..5ddc9b47b2 --- /dev/null +++ b/CHANGES/7559.feature @@ -0,0 +1 @@ +Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed. diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index 20cbf5b03f..2d9bf57f91 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -474,10 +474,41 @@ Defaults to `/var/lib/pulp/tmp/`. ### MAX\_CONCURRENT\_CONTENT -The size of the batch of content processed in one go when syncing content from -a remote. +The maximum number of concurrent downloads during sync. Controls how many HTTP +download tasks can run in parallel within the the sync pipeline. -Defaults to 25. +Defaults to 200. + +!!! warning "Deprecated" + This setting is deprecated and may be removed in a future release. + Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar + functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and + `SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as + `SYNC_MAX_IN_FLIGHT_ITEMS` automatically. + + +### SYNC\_MAX\_IN\_FLIGHT\_MB + +The maximum total size (in megabytes) of downloaded artifacts that are waiting to be +saved. This limits the temporary disk space consumed by artifacts that have been +downloaded but not yet persisted. + +When set, small artifacts will download with high concurrency while large artifacts +will automatically throttle to avoid exhausting disk space. This does not prevent +individual artifacts larger than the threshold from being synced, only limits +concurrency in such situations. + +Defaults to 5120 (5gb) + +### SYNC\_MAX\_IN\_FLIGHT\_ITEMS + +The maximum number of downloaded artifacts that are waiting to be saved. Like +`SYNC_MAX_IN_FLIGHT_MB`, this limits unpersisted artifacts, but counts items +rather than bytes. + +This is useful as a fallback when artifact sizes are not known ahead of time. + +Defaults to `None` (no limit). ## Redis Settings diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index a4411f50ea..e79a2f26ad 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -403,7 +403,17 @@ DOMAIN_ENABLED = False -MAX_CONCURRENT_CONTENT = 25 +MAX_CONCURRENT_CONTENT = 200 + +# Resource budget for sync pipeline: limits total in-flight artifact data between +# the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download +# concurrency for small artifacts while preventing disk exhaustion for large ones. +# None means no limit for that dimension. + +# Maximum megabytes of in-flight downloaded artifacts +SYNC_MAX_IN_FLIGHT_MB = 5120 +# Maximum number of items (content) able to be buffered between downloading and saving +SYNC_MAX_IN_FLIGHT_ITEMS = None SHELL_PLUS_IMPORTS = [ "from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk", diff --git a/pulpcore/plugin/stages/__init__.py b/pulpcore/plugin/stages/__init__.py index e526b52d8f..bba5fc41b5 100644 --- a/pulpcore/plugin/stages/__init__.py +++ b/pulpcore/plugin/stages/__init__.py @@ -4,6 +4,7 @@ from .artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, GenericDownloader, QueryExistingArtifacts, diff --git a/pulpcore/plugin/stages/api.py b/pulpcore/plugin/stages/api.py index 1d8939fd66..930672c137 100644 --- a/pulpcore/plugin/stages/api.py +++ b/pulpcore/plugin/stages/api.py @@ -77,7 +77,7 @@ async def run(self): break yield content - async def batches(self, minsize=500): + async def batches(self, minsize=500, flush_event=None): """ Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`. @@ -85,8 +85,17 @@ async def batches(self, minsize=500): [DeclarativeContent][] as possible without blocking, but at least `minsize` instances. + A batch may be yielded early (before reaching `minsize`) in two cases: + + - A queued item's `resolution()` is called while it sits in the pending batch, + signaling that another task is waiting on it. This prevents deadlock in the + content-futures pattern where related items depend on each other being saved. + - A `flush_event` is set by an external signal (e.g. resource budget pressure). + Args: minsize (int): The minimum batch size to yield (unless it is the final batch) + flush_event (asyncio.Event): Optional event that triggers an early batch yield + when set. Cleared after yielding. Used for pressure-based flushing. Yields: A list of [DeclarativeContent][] instances @@ -124,13 +133,20 @@ def add_to_batch(content): get_listener = asyncio.ensure_future(self._in_q.get()) thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) + flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None while not shutdown: - done, pending = await asyncio.wait( - [thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED - ) + waitables = [thaw_event_listener, get_listener] + if flush_event_listener: + waitables.append(flush_event_listener) + done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED) if thaw_event_listener in done: thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) no_block = True + if flush_event_listener and flush_event_listener in done: + # Don't re-arm until after we yield a batch, to avoid a spin loop + # when the event stays set but the batch is empty. + flush_event_listener = None + no_block = True if get_listener in done: content = await get_listener add_to_batch(content) @@ -153,8 +169,13 @@ def add_to_batch(content): yield batch batch = [] no_block = False + # Re-arm the flush listener after yielding + if flush_event and flush_event_listener is None: + flush_event_listener = asyncio.ensure_future(flush_event.wait()) thaw_event_listener.cancel() get_listener.cancel() + if flush_event_listener: + flush_event_listener.cancel() async def put(self, item): """ diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index eeff04c241..bfda6d1206 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -23,6 +23,107 @@ log = logging.getLogger(__name__) +class ArtifactResourceBudget: + """Tracks aggregate resource consumption of in-flight artifacts. + + Coordinates between `ArtifactDownloader` (acquires budget) and + `ArtifactSaver` (releases budget) to limit total temporary disk usage + from downloaded-but-not-yet-saved artifacts. + + This allows higher download concurrency for small artifacts while still + protecting against disk exhaustion when syncing large artifacts. + + Args: + max_bytes (int): Maximum total bytes of in-flight downloaded artifacts. + `None` means no byte limit (only item limit applies). + max_items (int): Maximum number of in-flight downloaded artifacts. + `None` means no item limit (only byte limit applies). + """ + + def __init__(self, max_bytes=None, max_items=None): + self.max_bytes = max_bytes + self.max_items = max_items + self._current_bytes = 0 # total bytes currently acquired and not yet released + self._current_items = 0 # total items currently acquired and not yet released + self._available = asyncio.Event() # cleared when budget is exhausted; set on release + self._available.set() + self._lock = asyncio.Lock() # serializes acquire checks to prevent races + # set when acquire is blocked; signals the stage to flush its batch early + self.flush_event = asyncio.Event() + + @classmethod + def from_settings(cls): + """Create an `ArtifactResourceBudget` from Django settings, or return `None`. + + Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings. + Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the + user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured. + Returns `None` if no settings are configured. + """ + max_mb = settings.SYNC_MAX_IN_FLIGHT_MB + max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS + + # Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT + if max_items is None: + max_items = settings.MAX_CONCURRENT_CONTENT + + if max_mb is None and max_items is None: + return None + return cls( + max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None, + max_items=max_items, + ) + + async def acquire(self, nbytes=0): + """Block until resource budget is available. + + Always allows at least one item through (even if over budget) when nothing + is currently in flight, to prevent deadlock. + + When the budget is exhausted and `acquire` must wait, the `flush_event` + is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their + batches early and free up budget. + + Args: + nbytes (int): Size in bytes of the artifact(s) to be downloaded. + When 0 (e.g. because the remote did not advertise a size), the + artifact is invisible to the byte budget but still counts toward + the item limit ()`SYNC_MAX_IN_FLIGHT_ITEMS`). + """ + while True: + async with self._lock: + # Always allow if nothing is in flight (prevents deadlock) + if self._current_items == 0: + self._current_bytes += nbytes + self._current_items += 1 + return + + bytes_ok = self.max_bytes is None or ( + self._current_bytes + nbytes <= self.max_bytes + ) + items_ok = self.max_items is None or self._current_items < self.max_items + + if bytes_ok and items_ok: + self._current_bytes += nbytes + self._current_items += 1 + return + + self._available.clear() + self.flush_event.set() + await self._available.wait() + + def release(self, nbytes=0): + """Release resources after an artifact is saved and its temp file deleted. + + Args: + nbytes (int): Size in bytes of the artifact that was saved. + """ + self._current_bytes = max(0, self._current_bytes - nbytes) + self._current_items = max(0, self._current_items - 1) + self.flush_event.clear() + self._available.set() + + def _check_for_forbidden_checksum_type(artifact): """Check if content doesn't have forbidden checksum type. @@ -122,7 +223,7 @@ class GenericDownloader(Stage): downloads completed. Since it's a stream the total count isn't known until it's finished. This stage drains all available items from `self._in_q` and starts as many concurrent - downloading tasks as possible, up to the limit defined by ``self.max_concurrent_content``. + downloading tasks as possible, up to the limit defined by `self.max_concurrent_content`. Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `_handle_content_unit`, which must be implemented by the subclass, to handle processing the content unit and starting @@ -220,28 +321,55 @@ class ArtifactDownloader(GenericDownloader): Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget that + limits total in-flight artifact bytes/items between download and save. + args: unused positional arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. + kwargs: unused keyword arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. """ PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts" PROGRESS_REPORTING_CODE = "sync.downloading.artifacts" + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def _handle_content_unit(self, d_content): """Handle one content unit. Returns: The number of downloads """ - downloaders_for_content = [ - d_artifact.download() + d_artifacts_to_download = [ + d_artifact for d_artifact in d_content.d_artifacts if d_artifact.artifact._state.adding and not d_artifact.deferred_download and not d_artifact.artifact.file ] - if downloaders_for_content: - await asyncio.gather(*downloaders_for_content) - await self.put(d_content) - return len(downloaders_for_content) + + budget_bytes = 0 + if d_artifacts_to_download and self.resource_budget: + budget_bytes = sum( + d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download + ) + await self.resource_budget.acquire(budget_bytes) + + try: + if d_artifacts_to_download: + await asyncio.gather(*(da.download() for da in d_artifacts_to_download)) + + await self.put(d_content) + except BaseException: + if budget_bytes and self.resource_budget: + self.resource_budget.release(budget_bytes) + raise + + return len(d_artifacts_to_download) class ArtifactSaver(Stage): @@ -259,8 +387,18 @@ class ArtifactSaver(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget. + When provided, budget is released after artifacts are saved and temp files deleted. + args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][]. + kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][]. """ + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def run(self): """ The coroutine for this stage. @@ -268,7 +406,9 @@ async def run(self): Returns: The coroutine for this stage. """ - async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT): + flush_event = self.resource_budget.flush_event if self.resource_budget else None + minsize = self.resource_budget.max_items if self.resource_budget else 200 + async for batch in self.batches(minsize=minsize, flush_event=flush_event): da_to_save = [] for d_content in batch: for d_artifact in d_content.d_artifacts: @@ -291,6 +431,16 @@ async def run(self): if await aos.path.exists(tmp_file_path): await aos.remove(tmp_file_path) + # Release budget after temp files are cleaned up so the downloader can proceed + if self.resource_budget: + for d_content in batch: + budget_bytes = sum( + d_artifact.artifact.size or 0 + for d_artifact in d_content.d_artifacts + if not d_artifact.deferred_download + ) + self.resource_budget.release(budget_bytes) + for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 18022e9ab3..22cd54173c 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -6,6 +6,7 @@ from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, @@ -129,6 +130,8 @@ def pipeline_stages(self, new_version): list: List of [pulpcore.plugin.stages.Stage][] instances """ + resource_budget = ArtifactResourceBudget.from_settings() + pipeline = [ self.first_stage, QueryExistingArtifacts(), @@ -137,8 +140,8 @@ def pipeline_stages(self, new_version): pipeline.append(ACSArtifactHandler()) pipeline.extend( [ - ArtifactDownloader(), - ArtifactSaver(), + ArtifactDownloader(resource_budget=resource_budget), + ArtifactSaver(resource_budget=resource_budget), QueryExistingContents(), ContentSaver(), RemoteArtifactSaver(), diff --git a/pulpcore/plugin/stages/models.py b/pulpcore/plugin/stages/models.py index b34f7b0fef..d9ef2e52e8 100644 --- a/pulpcore/plugin/stages/models.py +++ b/pulpcore/plugin/stages/models.py @@ -171,7 +171,13 @@ def does_batch(self): async def resolution(self): """Coroutine that waits for the content to be saved to database. - Returns the content unit.""" + Returns the content unit. + + If this item is currently sitting in a `batches()` queue waiting + for `minsize` to be reached, calling this signals the queue to + flush early so the item can proceed through the pipeline. This + prevents deadlock when two related items depend on each other. + """ if self._resolved: # Already resolved ~> shortcut return self.content diff --git a/pulpcore/tests/unit/stages/test_resource_budget.py b/pulpcore/tests/unit/stages/test_resource_budget.py new file mode 100644 index 0000000000..2664bbb600 --- /dev/null +++ b/pulpcore/tests/unit/stages/test_resource_budget.py @@ -0,0 +1,216 @@ +import asyncio + +import pytest + +from pulpcore.plugin.stages import ArtifactResourceBudget + + +class TestAcquireRelease: + """Basic acquire/release semantics.""" + + @pytest.mark.asyncio + async def test_acquire_and_release_items(self): + budget = ArtifactResourceBudget(max_items=3) + await budget.acquire(0) + await budget.acquire(0) + await budget.acquire(0) + assert budget._current_items == 3 + budget.release(0) + budget.release(0) + budget.release(0) + assert budget._current_items == 0 + + @pytest.mark.asyncio + async def test_acquire_and_release_bytes(self): + budget = ArtifactResourceBudget(max_bytes=1000) + await budget.acquire(400) + await budget.acquire(400) + assert budget._current_bytes == 800 + budget.release(400) + assert budget._current_bytes == 400 + budget.release(400) + assert budget._current_bytes == 0 + + @pytest.mark.asyncio + async def test_release_does_not_go_negative(self): + budget = ArtifactResourceBudget(max_bytes=100, max_items=2) + budget.release(500) + assert budget._current_bytes == 0 + assert budget._current_items == 0 + + +class TestBlocking: + """Acquire blocks when budget is exhausted.""" + + @pytest.mark.asyncio + async def test_blocks_on_item_limit(self): + budget = ArtifactResourceBudget(max_items=1) + await budget.acquire(0) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(0) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when at item limit" + + budget.release(0) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_byte_limit(self): + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(80) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when bytes would exceed limit" + + budget.release(80) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_both_limits(self): + """When both limits are set, both must be satisfied.""" + budget = ArtifactResourceBudget(max_bytes=1000, max_items=1) + await budget.acquire(100) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(100) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set() + + budget.release(100) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestDeadlockPrevention: + """The _current_items == 0 guard prevents deadlock.""" + + @pytest.mark.asyncio + async def test_allows_oversized_item_when_empty(self): + """A single item exceeding max_bytes is allowed when nothing is in flight.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) # Should not block + assert budget._current_bytes == 500 + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_allows_item_over_item_limit_when_empty(self): + """Even max_items=0 (if someone set it) doesn't block when nothing is in flight.""" + budget = ArtifactResourceBudget(max_items=0) + # This would deadlock without the guard -- it should return immediately + await budget.acquire(0) + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_second_oversized_item_blocks(self): + """After allowing one oversized item through, the next must wait.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "second item should block while oversized item is in flight" + + budget.release(500) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestFlushEvent: + """The flush event signals downstream stages to flush.""" + + @pytest.mark.asyncio + async def test_flush_event_set_when_blocked(self): + budget = ArtifactResourceBudget(max_items=1) + assert not budget.flush_event.is_set() + await budget.acquire(0) + + async def try_acquire(): + await budget.acquire(0) + + task = asyncio.ensure_future(try_acquire()) + await asyncio.sleep(0.05) + assert budget.flush_event.is_set(), "flush_event should be set when acquire blocks" + + budget.release(0) + await asyncio.sleep(0.05) + assert not budget.flush_event.is_set(), "flush_event should clear after release" + task.cancel() + + @pytest.mark.asyncio + async def test_flush_event_not_set_when_budget_available(self): + budget = ArtifactResourceBudget(max_items=5) + await budget.acquire(0) + assert not budget.flush_event.is_set() + await budget.acquire(0) + assert not budget.flush_event.is_set() + + +class TestNoLimits: + """When max_bytes and max_items are both None, acquire never blocks.""" + + @pytest.mark.asyncio + async def test_unlimited_acquires(self): + budget = ArtifactResourceBudget(max_bytes=None, max_items=None) + for i in range(100): + await budget.acquire(1_000_000) + assert budget._current_items == 100 + assert budget._current_bytes == 100_000_000 + + +class TestConcurrentAcquireRelease: + """Multiple concurrent acquires and releases behave correctly.""" + + @pytest.mark.asyncio + async def test_concurrent_producers_and_consumer(self): + """Simulate multiple downloaders acquiring and a saver releasing.""" + budget = ArtifactResourceBudget(max_bytes=500, max_items=5) + completed = [] + + async def producer(item_id, size): + await budget.acquire(size) + await asyncio.sleep(0.01) # simulate download + completed.append(item_id) + return size + + async def consumer(): + """Release budget periodically, simulating ArtifactSaver.""" + while len(completed) < 10: + await asyncio.sleep(0.02) + if budget._current_items > 0: + budget.release(100) + + consumer_task = asyncio.ensure_future(consumer()) + producer_tasks = [asyncio.ensure_future(producer(i, 100)) for i in range(10)] + + await asyncio.gather(*producer_tasks, consumer_task) + assert len(completed) == 10 diff --git a/pulpcore/tests/unit/stages/test_stages.py b/pulpcore/tests/unit/stages/test_stages.py index e96fbbbfb5..230972eee8 100644 --- a/pulpcore/tests/unit/stages/test_stages.py +++ b/pulpcore/tests/unit/stages/test_stages.py @@ -154,3 +154,93 @@ async def test_batch_queue_and_min_sizes(): last_stage._connect(queues[1], queues[2]) end_stage._connect(queues[2], None) await asyncio.gather(last_stage(), middle_stage(), first_stage(), end_stage()) + + +@pytest.mark.asyncio +async def test_flush_event_yields_early(stage, in_q): + """A flush_event causes batches() to yield before minsize is reached.""" + flush = asyncio.Event() + c1 = mock.Mock() + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # The single item is not enough to reach minsize=100, so the batch won't yield yet. + # Set the flush event to force an early yield. + flush.set() + + batch = await batch_it.__anext__() + assert batch == [c1] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_no_spin_on_empty_batch(stage, in_q): + """When flush_event is set but the batch is empty, batches() must not spin.""" + flush = asyncio.Event() + flush.set() # Set before any items arrive + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # Put an item after a short delay -- if there were a spin loop, + # the test would hang or burn CPU before we get here. + async def put_later(): + await asyncio.sleep(0.05) + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + asyncio.ensure_future(put_later()) + + batch = await batch_it.__anext__() + assert len(batch) == 1 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_rearms_after_yield(stage, in_q): + """After yielding a flush-triggered batch, the flush_event is re-armed.""" + flush = asyncio.Event() + c1 = mock.Mock() + c2 = mock.Mock() + + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # First flush-triggered yield + flush.set() + batch = await batch_it.__anext__() + assert batch == [c1] + + # Clear and re-set to trigger another early yield + flush.clear() + in_q.put_nowait(c2) + flush.set() + batch = await batch_it.__anext__() + assert batch == [c2] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_not_needed_when_minsize_met(stage, in_q): + """Batches still yield normally when minsize is met, even without flush.""" + flush = asyncio.Event() # Never set + + for _ in range(5): + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + batch_it = stage.batches(minsize=3, flush_event=flush) + batch = await batch_it.__anext__() + assert len(batch) >= 3 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__()