Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions sdks/python/apache_beam/transforms/async_dofn.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,14 @@ def reset_state():
if AsyncWrapper._loop_started is not None:
AsyncWrapper._loop_started.clear()

for pool in AsyncWrapper._pool.values():
pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
wait=True, cancel_futures=True)
pools_to_shutdown = [
pool.acquire(AsyncWrapper.initialize_pool(1))
for pool in AsyncWrapper._pool.values()
]
Comment thread
shunping marked this conversation as resolved.
Outdated

for pool in pools_to_shutdown:
pool.shutdown(wait=True, cancel_futures=True)

with AsyncWrapper._lock:
AsyncWrapper._pool = {}
AsyncWrapper._processing_elements = {}
Expand Down Expand Up @@ -268,7 +273,8 @@ async def _collect(result):

def decrement_items_in_buffer(self, future):
with AsyncWrapper._lock:
AsyncWrapper._items_in_buffer[self._uuid] -= 1
if self._uuid in AsyncWrapper._items_in_buffer:
AsyncWrapper._items_in_buffer[self._uuid] -= 1

def schedule_if_room(self, element, ignore_buffer=False, *args, **kwargs):
"""Schedules an item to be processed asynchronously if there is room.
Expand Down
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/transforms/async_dofn_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import multiprocessing
import random
import time
import unittest
Expand Down Expand Up @@ -487,6 +488,46 @@ def add_item(i):
self.check_output(results[i], expected_outputs['key' + str(i)])
self.assertEqual(bag_states['key' + str(i)].items, [])

@staticmethod
def _run_reset_state_deadlock_scenario(use_asyncio):
if use_asyncio:
return

dofn = BasicDofn(sleep_time=0.5)
async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=False)
async_dofn.setup()
fake_bag_state = FakeBagState([])
fake_timer = FakeTimer(0)

# Start processing an item. This starts a worker thread sleeping for 0.5s.
async_dofn.process(('key1', 1), to_process=fake_bag_state, timer=fake_timer)
time.sleep(0.05)

# Attempt to call reset_state(). If the fix is NOT applied, this will deadlock
# forever because reset_state() holds the lock while calling shutdown(wait=True),
# blocking the future's done callback from acquiring the lock.
async_lib.AsyncWrapper.reset_state()

def test_reset_state_hang_reproduction(self):
# Run the deadlock scenario in a separate process so that if it hangs,
# we can terminate it without causing the main pytest process to hang at exit.
if self.use_asyncio:
return

p = multiprocessing.Process(
target=AsyncTest._run_reset_state_deadlock_scenario,
args=(self.use_asyncio,))
p.start()
p.join(timeout=3.0)

if p.is_alive():
p.terminate()
p.join()
self.fail("reset_state() deadlocked/hung waiting for active threads to finish")
else:
self.assertEqual(p.exitcode, 0)


if __name__ == '__main__':
unittest.main()

Loading