Skip to content

Commit 6d89478

Browse files
authored
Merge pull request #1 from gsakkis/sync_submit
Sync submit
2 parents 237378b + 333e36d commit 6d89478

7 files changed

Lines changed: 75 additions & 90 deletions

File tree

README.md

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def main():
3333
# Create an executor with a pool of 3 workers
3434
async with TaskPoolExecutor(max_workers=3) as executor:
3535
# Submit a single task
36-
future = await executor.submit(fetch_data, "https://example.com")
36+
future = executor.submit(fetch_data, "https://example.com")
3737
result = await future
3838
print(result) # Data from https://example.com
3939

@@ -50,7 +50,7 @@ Creates a new task pool executor.
5050
- `max_workers`: Maximum number of workers (defaults to `os.cpu_count()`)
5151
- `task_name_prefix`: Optional prefix for worker task names
5252

53-
### `async submit(fn, /, *args, **kwargs) -> asyncio.Future`
53+
### `submit(fn, /, *args, **kwargs) -> asyncio.Future`
5454

5555
Submits a callable to be executed. Returns an `asyncio.Future`.
5656

@@ -61,7 +61,7 @@ async def multiply(x: int, y: int) -> int:
6161

6262

6363
async with TaskPoolExecutor() as executor:
64-
future = await executor.submit(multiply, 6, 7)
64+
future = executor.submit(multiply, 6, 7)
6565
result = await future
6666
print(result) # 42
6767
```
@@ -71,7 +71,7 @@ You can also submit an awaitable directly:
7171
```python
7272
async with TaskPoolExecutor() as executor:
7373
coro = multiply(6, 7)
74-
future = await executor.submit(coro)
74+
future = executor.submit(coro)
7575
result = await future
7676
print(result) # 42
7777
```
@@ -136,9 +136,9 @@ async def task(name: str, delay: float) -> str:
136136

137137
async with TaskPoolExecutor(max_workers=3) as executor:
138138
futures = [
139-
await executor.submit(task, "fast", 0.1),
140-
await executor.submit(task, "medium", 0.2),
141-
await executor.submit(task, "slow", 0.3),
139+
executor.submit(task, "fast", 0.1),
140+
executor.submit(task, "medium", 0.2),
141+
executor.submit(task, "slow", 0.3),
142142
]
143143

144144
# Wait for the first task to complete
@@ -161,9 +161,9 @@ async with TaskPoolExecutor(max_workers=3) as executor:
161161
```python
162162
async with TaskPoolExecutor(max_workers=3) as executor:
163163
futures = [
164-
await executor.submit(task, "task1", 0.3),
165-
await executor.submit(task, "task2", 0.1),
166-
await executor.submit(task, "task3", 0.2),
164+
executor.submit(task, "task1", 0.3),
165+
executor.submit(task, "task2", 0.1),
166+
executor.submit(task, "task3", 0.2),
167167
]
168168

169169
# Process results as they complete
@@ -177,9 +177,9 @@ async with TaskPoolExecutor(max_workers=3) as executor:
177177
```python
178178
async with TaskPoolExecutor(max_workers=3) as executor:
179179
futures = [
180-
await executor.submit(task, "task1", 0.3),
181-
await executor.submit(task, "task2", 0.1),
182-
await executor.submit(task, "task3", 0.2),
180+
executor.submit(task, "task1", 0.3),
181+
executor.submit(task, "task2", 0.1),
182+
executor.submit(task, "task3", 0.2),
183183
]
184184

185185
# Wait for all and collect results
@@ -198,7 +198,7 @@ async def failing_task():
198198

199199

200200
async with TaskPoolExecutor() as executor:
201-
future = await executor.submit(failing_task)
201+
future = executor.submit(failing_task)
202202

203203
try:
204204
await future
@@ -218,7 +218,7 @@ async def long_running_task():
218218

219219

220220
async with TaskPoolExecutor() as executor:
221-
future = await executor.submit(long_running_task)
221+
future = executor.submit(long_running_task)
222222

223223
# Cancel the task
224224
future.cancel()

cf_taskpool.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import contextlib
32
import inspect
43
import itertools as it
54
import os
@@ -38,14 +37,14 @@ async def __aexit__(self, *args: object) -> None:
3837
await self.shutdown()
3938

4039
@overload
41-
async def submit(
40+
def submit(
4241
self, fn: Callable[P, Awaitable[T]], /, *args: P.args, **kwargs: P.kwargs
4342
) -> asyncio.Future[T]: ...
4443

4544
@overload
46-
async def submit(self, aw: Awaitable[T], /) -> asyncio.Future[T]: ...
45+
def submit(self, aw: Awaitable[T], /) -> asyncio.Future[T]: ...
4746

48-
async def submit(
47+
def submit(
4948
self,
5049
aw_or_fn: Callable[P, Awaitable[T]] | Awaitable[T],
5150
/,
@@ -64,13 +63,12 @@ async def submit(
6463
# When the future gets garbage collected, ensure the coroutine is closed
6564
weakref.finalize(future, _close_unawaited_coro, awaitable)
6665

67-
async with self._shutdown_lock:
68-
if self._shutdown:
69-
raise RuntimeError("cannot schedule new futures after shutdown")
66+
if self._shutdown:
67+
raise RuntimeError("cannot schedule new futures after shutdown")
7068

71-
await self._work_queue.put((future, awaitable))
72-
await self._adjust_task_count()
73-
return future
69+
self._work_queue.put_nowait((future, awaitable))
70+
self._adjust_task_count()
71+
return future
7472

7573
async def map(
7674
self,
@@ -88,9 +86,9 @@ async def map(
8886
submissions = (self.submit(fn, *args) for args in zipped_iterables)
8987
fs: list[asyncio.Future[T]] | deque[asyncio.Future[T]]
9088
if buffersize is None:
91-
fs = await asyncio.gather(*submissions)
89+
fs = list(submissions)
9290
else:
93-
fs = fsd = deque(await asyncio.gather(*it.islice(submissions, buffersize)))
91+
fs = fsd = deque(it.islice(submissions, buffersize))
9492

9593
# Use a weak reference to ensure that the executor can be garbage
9694
# collected independently of the result_iterator closure.
@@ -108,7 +106,7 @@ async def result_iterator() -> AsyncGenerator[T]:
108106
and (executor := executor_weakref())
109107
and (args := next(zipped_iterables, None))
110108
):
111-
fsd.appendleft(await executor.submit(fn, *args))
109+
fsd.appendleft(executor.submit(fn, *args))
112110

113111
# Careful not to keep a reference to the popped future
114112
yield await fs.pop()
@@ -139,12 +137,11 @@ async def shutdown(
139137
if wait and self._tasks:
140138
await asyncio.wait(self._tasks)
141139

142-
async def _adjust_task_count(self) -> None:
140+
def _adjust_task_count(self) -> None:
143141
# If idle workers are available, don't spin new ones
144-
with contextlib.suppress(TimeoutError):
145-
async with asyncio.timeout(0):
146-
if await self._idle_semaphore.acquire():
147-
return
142+
if not self._idle_semaphore.locked():
143+
self._idle_semaphore._value -= 1 # noqa: SLF001
144+
return
148145

149146
num_tasks = len(self._tasks)
150147
if num_tasks < self._max_workers:

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def submit(
3333
func: Callable[P, Awaitable[T]],
3434
*args: P.args,
3535
**kwargs: P.kwargs,
36-
) -> Awaitable[asyncio.Future[T]]:
36+
) -> asyncio.Future[T]:
3737
if as_awaitable:
3838
return executor.submit(func(*args, **kwargs))
3939
return executor.submit(func, *args, **kwargs)

tests/test_as_completed.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ async def test_no_timeout(self, executor):
1212
future_c = cancelled_future()
1313
future_e = exception_future()
1414
future_s = successful_future()
15-
future1 = await executor.submit(amul, 2, 21)
16-
future2 = await executor.submit(amul, 7, 6)
15+
future1 = executor.submit(amul, 2, 21)
16+
future2 = executor.submit(amul, 7, 6)
1717

1818
coros = list(
1919
asyncio.as_completed([future_c, future_e, future_s, future1, future2])
@@ -34,7 +34,7 @@ async def test_future_times_out(self, executor, timeout): # noqa: ASYNC109
3434
successful_future(),
3535
}
3636
# Windows clock resolution is around 15.6 ms
37-
future = await executor.submit(asyncio.sleep, 1.0)
37+
future = executor.submit(asyncio.sleep, 1.0)
3838
results = []
3939
exception_types = set()
4040
for coro in asyncio.as_completed(already_completed | {future}, timeout=timeout):
@@ -52,7 +52,7 @@ async def test_duplicate_futures(self, executor):
5252
# Issue 20367. Duplicate futures should not raise exceptions or give duplicate
5353
# responses.
5454
# Issue #31641: accept arbitrary iterables.
55-
future1 = await executor.submit(asyncio.sleep, 0.1)
55+
future1 = executor.submit(asyncio.sleep, 0.1)
5656
results = [
5757
await coro for coro in asyncio.as_completed(itertools.repeat(future1, 3))
5858
]

tests/test_shutdown.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async def sleep_and_print(t, msg):
2020
print(msg)
2121
2222
async def main(executor):
23-
await executor.submit(sleep_and_print, 0.1, "apple")
23+
executor.submit(sleep_and_print, 0.1, "apple")
2424
if {shutdown} is not None:
2525
await executor.shutdown(**{shutdown})
2626
@@ -41,30 +41,20 @@ class TestTaskPoolShutdown:
4141
async def test_run_after_shutdown(self, executor, as_awaitable):
4242
await executor.shutdown()
4343
with pytest.raises(RuntimeError):
44-
await submit(executor, as_awaitable, amul, 2, 5)
44+
submit(executor, as_awaitable, amul, 2, 5)
4545

4646
@pytest.mark.parametrize("as_awaitable", [False, True])
4747
@pytest.mark.parametrize("cancel_futures", [False, True])
4848
async def test_shutdown(self, executor, as_awaitable, cancel_futures):
49-
fs = [
50-
await submit(executor, as_awaitable, asyncio.sleep, 0.1) for _ in range(50)
51-
]
49+
fs = [submit(executor, as_awaitable, asyncio.sleep, 0.1) for _ in range(50)]
5250
await executor.shutdown(cancel_futures=cancel_futures)
53-
54-
cancelled = [fut for fut in fs if fut.cancelled()]
55-
others = [fut for fut in fs if not fut.cancelled()]
5651
if cancel_futures:
57-
# 5 tasks were picked by the workers before the shutdown, 45 were cancelled
58-
assert len(cancelled) == 45
59-
assert len(others) == 5
52+
# All tasks were cancelled
53+
assert all(fut.cancelled() for fut in fs)
6054
else:
61-
# No tasks were cancelled
62-
assert len(cancelled) == 0
63-
assert len(others) == 50
64-
65-
for fut in others:
66-
assert fut.done()
67-
assert fut.exception() is None
55+
# All tasks were completed
56+
assert all(fut.done() for fut in fs)
57+
assert all(fut.result() is None for fut in fs)
6858

6959
@pytest.mark.skipif(
7060
not hasattr(signal, "alarm"), reason="signal.alarm not available"
@@ -81,12 +71,12 @@ def timeout(_signum, _frame):
8171
raise RuntimeError("timed out waiting for shutdown") # pragma: no cover
8272

8373
executor = TaskPoolExecutor(max_workers=1)
84-
future = await submit(executor, as_awaitable, amul, 2, 5)
74+
future = submit(executor, as_awaitable, amul, 2, 5)
8575
await future
8676
old_handler = signal.signal(signal.SIGALRM, timeout)
8777
try:
8878
signal.alarm(5)
89-
future = await submit(executor, as_awaitable, amul, 2, 5)
79+
future = submit(executor, as_awaitable, amul, 2, 5)
9080
future.cancel()
9181
await executor.shutdown(wait=True)
9282
finally:
@@ -100,7 +90,7 @@ async def acquire_lock(lock):
10090

10191
sem = asyncio.Semaphore(0)
10292
for _ in range(3):
103-
await submit(executor, as_awaitable, acquire_lock, sem)
93+
submit(executor, as_awaitable, acquire_lock, sem)
10494
assert len(executor._tasks) == 3
10595
for _ in range(3):
10696
sem.release()
@@ -117,7 +107,7 @@ async def test_context_manager_shutdown(self):
117107
@pytest.mark.parametrize("explicit_shutdown", [False, True])
118108
async def test_shutdown_no_wait(self, as_awaitable, explicit_shutdown):
119109
executor = TaskPoolExecutor(max_workers=5)
120-
future = await submit(executor, as_awaitable, amul, 2, 5)
110+
future = submit(executor, as_awaitable, amul, 2, 5)
121111
res = await executor.map(aabs, range(-5, 5))
122112
tasks = executor._tasks
123113
if explicit_shutdown:

tests/test_task_pool.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __len__(self):
3939
class TestTaskPoolExecutor:
4040
@pytest.mark.parametrize("as_awaitable", [False, True])
4141
async def test_submit(self, executor, as_awaitable):
42-
future = await submit(executor, as_awaitable, amul, 2, 8)
42+
future = submit(executor, as_awaitable, amul, 2, 8)
4343
assert await future == 16
4444
assert future.result() == 16
4545

@@ -49,11 +49,11 @@ async def acapture(*args, **kwargs):
4949
await asyncio.sleep(0.01)
5050
return args, kwargs
5151

52-
future = await submit(executor, as_awaitable, amul, 2, y=8)
52+
future = submit(executor, as_awaitable, amul, 2, y=8)
5353
assert await future == 16
5454
assert future.result() == 16
5555

56-
future = await submit(executor, as_awaitable, acapture, 1, self=2, fn=3)
56+
future = submit(executor, as_awaitable, acapture, 1, self=2, fn=3)
5757
assert await future == ((1,), {"self": 2, "fn": 3})
5858
assert future.result() == ((1,), {"self": 2, "fn": 3})
5959

@@ -85,22 +85,20 @@ async def test_submit_awaitable_error(self, executor, args, kwargs):
8585
coro = amul(2, 8)
8686
try:
8787
with pytest.raises(TypeError, match=error):
88-
await executor.submit(coro, *args, **kwargs)
88+
executor.submit(coro, *args, **kwargs)
8989
finally:
9090
coro.close()
9191

9292
@pytest.mark.parametrize("as_awaitable", [False, True])
9393
async def test_exception(self, executor, as_awaitable):
94-
future = await submit(executor, as_awaitable, adivmod, 2, 0)
94+
future = submit(executor, as_awaitable, adivmod, 2, 0)
9595
with pytest.raises(ZeroDivisionError) as exc_info:
9696
await future
9797
assert future.exception() is exc_info.value
9898

9999
@pytest.mark.parametrize("as_awaitable", [False, True])
100100
async def test_cancellation(self, executor, as_awaitable):
101-
future = await submit(
102-
executor, as_awaitable, adivmod, 2, 0, cancel_if_zero=True
103-
)
101+
future = submit(executor, as_awaitable, adivmod, 2, 0, cancel_if_zero=True)
104102
with pytest.raises(asyncio.CancelledError):
105103
await future
106104
assert future.cancelled()
@@ -180,7 +178,7 @@ async def test_no_stale_references(self, executor, as_awaitable):
180178
my_object_collected = asyncio.Event()
181179
my_object_callback = weakref.ref(my_object, lambda _: my_object_collected.set())
182180
# Deliberately discarding the future.
183-
await submit(executor, as_awaitable, my_object.my_method)
181+
submit(executor, as_awaitable, my_object.my_method)
184182
del my_object
185183
try:
186184
await asyncio.wait_for(my_object_collected.wait(), timeout=1.0)
@@ -195,7 +193,7 @@ def test_max_workers_negative(self):
195193

196194
@pytest.mark.parametrize("as_awaitable", [False, True])
197195
async def test_free_future_reference(self, executor, as_awaitable):
198-
future = await submit(executor, as_awaitable, MyObject.create, 1)
196+
future = submit(executor, as_awaitable, MyObject.create, 1)
199197
await future
200198

201199
wr = weakref.ref(future)
@@ -228,7 +226,7 @@ async def araise(exception):
228226
raise exception
229227

230228
msg = "falsy"
231-
future = await submit(executor, as_awaitable, araise, exc_type(msg))
229+
future = submit(executor, as_awaitable, araise, exc_type(msg))
232230
with pytest.raises(exc_type, match=msg):
233231
await future
234232

@@ -251,16 +249,16 @@ def test_default_workers(self):
251249
async def test_saturation(self, executor, as_awaitable):
252250
sem = asyncio.Semaphore(0)
253251
for _ in range(15 * executor._max_workers):
254-
await submit(executor, as_awaitable, sem.acquire)
252+
submit(executor, as_awaitable, sem.acquire)
255253
assert len(executor._tasks) == executor._max_workers
256254
for _ in range(15 * executor._max_workers):
257255
sem.release()
258256

259257
@pytest.mark.parametrize("as_awaitable", [False, True])
260258
async def test_idle_worker_reuse(self, executor, as_awaitable):
261-
assert await (await submit(executor, as_awaitable, amul, 21, 2)) == 42
262-
assert await (await submit(executor, as_awaitable, amul, 6, 7)) == 42
263-
assert await (await submit(executor, as_awaitable, amul, 3, 14)) == 42
259+
assert await submit(executor, as_awaitable, amul, 21, 2) == 42
260+
assert await submit(executor, as_awaitable, amul, 6, 7) == 42
261+
assert await submit(executor, as_awaitable, amul, 3, 14) == 42
264262
assert len(executor._tasks) == 1
265263

266264
@pytest.mark.parametrize("as_awaitable", [False, True])
@@ -277,7 +275,7 @@ async def log_n_wait(ident):
277275

278276
async with TaskPoolExecutor(max_workers=1) as executor:
279277
# submit work to saturate the pool
280-
fut = await submit(executor, as_awaitable, log_n_wait, ident="first")
278+
fut = submit(executor, as_awaitable, log_n_wait, ident="first")
281279
try:
282280
agen = await executor.map(log_n_wait, ["second", "third"])
283281
with pytest.raises(TimeoutError):

0 commit comments

Comments
 (0)