diff --git a/.gitignore b/.gitignore index b277ec7a16..753c0f8883 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ test_short_report.html # Test failures will dump the cluster state in here test_cluster_dump/ +.omx/ diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 79e8986b4d..73aa2b51a5 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -116,7 +116,7 @@ properties: worker-saturation: oneOf: - type: number - exclusiveMinimum: 0 + minimum: 0 # String "inf", not to be confused with .inf which in YAML means float # infinity. This is necessary because there's no way to parse a float # infinity from a DASK_* environment variable. @@ -125,7 +125,13 @@ properties: Controls how many root tasks are sent to workers (like a `readahead`). Up to worker-saturation * nthreads root tasks are sent to a - worker at a time. If `.inf`, all runnable tasks are immediately sent to workers. + worker at a time. + + Special values: + - 0: Only send tasks to completely idle workers (no queuing). Useful for + long-running tasks to avoid head-of-line blocking. + - .inf: All runnable tasks are immediately sent to workers. + The target number is rounded up, so any `worker-saturation` value > 1.0 guarantees at least one extra task will be sent to workers. diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 00d8e525b5..b06e0e911c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1838,10 +1838,10 @@ def __init__( self.WORKER_SATURATION = math.inf if ( not isinstance(self.WORKER_SATURATION, (int, float)) - or self.WORKER_SATURATION <= 0 + or self.WORKER_SATURATION < 0 ): raise ValueError( # pragma: nocover - "`distributed.scheduler.worker-saturation` must be a float > 0; got " + "`distributed.scheduler.worker-saturation` must be a float >= 0; got " + repr(self.WORKER_SATURATION) ) @@ -9273,8 +9273,19 @@ def heartbeat_interval(n: int) -> float: def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int: - """Number of tasks that can be sent to this worker without oversaturating it""" + """Number of tasks that can be sent to this worker without oversaturating it + + When saturation_factor is 0, tasks are only sent up to the worker's current + execution capacity (no scheduler-side queuing). This is useful for + long-running tasks where you want to avoid head-of-line blocking. + """ assert not math.isinf(saturation_factor) + + # Special case: saturation_factor == 0 means no queuing + # Only send tasks to workers that still have an open execution slot. + if saturation_factor == 0: + return ws.nthreads - (len(ws.processing) - len(ws.long_running)) + return max(math.ceil(saturation_factor * ws.nthreads), 1) - ( len(ws.processing) - len(ws.long_running) ) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 4b9d917df9..5510b3d673 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -618,6 +618,44 @@ def func(first, second): await c.gather(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)], + config={"distributed.scheduler.worker-saturation": 0.0}, +) +async def test_secede_opens_all_slots_when_queuing_disabled(c, s, a): + started = Event() + release_long_running = Event() + release_ordinary = Event() + + def long_running(started, release): + started.set() + secede() + release.wait() + + def ordinary(release): + release.wait() + + long_future = c.submit(long_running, started, release_long_running, key="long") + await started.wait() + await async_poll_for(lambda: len(a.state.long_running) == 1, timeout=5) + + ordinary_futures = c.map( + ordinary, + [release_ordinary, release_ordinary], + key=["ordinary-1", "ordinary-2"], + ) + await async_poll_for(lambda: a.state.executing_count == a.state.nthreads, timeout=5) + + ws = s.workers[a.address] + assert len(ws.long_running) == 1 + assert len(ws.processing) == 1 + a.state.nthreads + + await release_ordinary.set() + await release_long_running.set() + await c.gather([long_future, *ordinary_futures]) + + @pytest.mark.parametrize( "saturation_config, expected_task_counts", [ @@ -626,6 +664,7 @@ def func(first, second): (1.1, (3, 2)), (1.0, (2, 1)), (0.1, (1, 1)), + (0.0, (2, 1)), # No queuing: only executing tasks, no queued tasks # This is necessary because there's no way to parse a float infinite from # a DASK_* environment variable ("inf", (6, 4)), @@ -674,6 +713,12 @@ async def test_bad_saturation_factor(): async with Scheduler(dashboard_address=":0"): pass + # Negative values should be rejected + with pytest.raises(ValueError, match=">= 0"): + with dask.config.set({"distributed.scheduler.worker-saturation": -1.0}): + async with Scheduler(dashboard_address=":0"): + pass + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c):