Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions livekit-agents/livekit/agents/voice/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,15 @@ async def wait_for_playout(self) -> None:
Unlike `SpeechHandle.wait_for_playout`, which waits for the full
assistant turn to complete (including all function tools),
this method only waits for the assistant's spoken response prior running
this tool to finish playing."""
await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)
this tool to finish playing.

Returns as soon as the speech is interrupted (``speech_handle.interrupted``
is ``True``) instead of blocking until the generation is torn down.
"""
sh = self.speech_handle
if not sh._generations:
raise RuntimeError("cannot use wait_for_playout: no active generation is running.")
await sh._race_against_interrupt(sh._generations[self._initial_step_idx])


EventTypes = Literal[
Expand Down
29 changes: 27 additions & 2 deletions livekit-agents/livekit/agents/voice/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ async def wait_for_playout(self) -> None:
This method waits until the assistant has fully finished speaking,
including any finalization steps beyond initial response generation.
This is appropriate to call when you want to ensure the speech output
has entirely played out, including any tool calls and response follow-ups."""
has entirely played out, including any tool calls and response follow-ups.

Returns as soon as the speech is interrupted (``self.interrupted`` is
``True``) instead of blocking until the underlying generation is torn
down. Callers that want to ignore interrupts can check ``self.interrupted``
on return and decide whether to proceed.
"""

# raise an error to avoid developer mistakes
from .agent import _get_activity_task_info
Expand All @@ -179,7 +185,7 @@ async def wait_for_playout(self) -> None:
"To wait for the assistant’s spoken response prior to running this tool, use `RunContext.wait_for_playout()` instead."
)

await asyncio.shield(self._done_fut)
await self._race_against_interrupt(self._done_fut)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot resolve the done_fut on interrupt because the audio output may not stopped when interrupt is set, especially for avatar use case. there is already a timeout in _cancel that will resolve the done after a timeout and cancel the tasks arbitrarily.


def __await__(self) -> Generator[None, None, SpeechHandle]:
async def _await_impl() -> SpeechHandle:
Expand Down Expand Up @@ -208,6 +214,25 @@ async def wait_if_not_interrupted(self, aw: list[asyncio.futures.Future[Any]]) -
gather_fut.cancel()
await gather_fut

async def _race_against_interrupt(self, playout_fut: asyncio.Future[None]) -> None:
"""Wait for ``playout_fut`` or interruption, whichever happens first.

Returns as soon as either future resolves; callers can read
``self.interrupted`` to distinguish between completion and interruption.
Mirrors ``wait_if_not_interrupted`` but accepts a single playout future
instead of an arbitrary list.
"""
playout_aw = asyncio.ensure_future(asyncio.shield(playout_fut))
interrupt_aw = asyncio.ensure_future(asyncio.shield(self._interrupt_fut))
try:
await asyncio.wait({playout_aw, interrupt_aw}, return_when=asyncio.FIRST_COMPLETED)
finally:
for aw in (playout_aw, interrupt_aw):
if not aw.done():
aw.cancel()
with contextlib.suppress(asyncio.CancelledError):
await aw

def _cancel(self) -> SpeechHandle:
if self.done():
return self
Expand Down
89 changes: 89 additions & 0 deletions tests/test_speech_handle_interruption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Regression tests for SpeechHandle / RunContext playout-wait interruption.

Guards against the deadlock described in livekit/agents#5359, where
``SpeechHandle.wait_for_playout()`` and ``RunContext.wait_for_playout()``
awaited only on the playout-completion future and ignored the interrupt
future. When ``interrupt()`` fired, callers blocked until the
``INTERRUPTION_TIMEOUT`` (~5s) hard-killed the surrounding tasks.

Both methods now race the playout future against ``_interrupt_fut`` using
``asyncio.wait(FIRST_COMPLETED)`` (the same primitive already used by
``SpeechHandle.wait_if_not_interrupted``). The wait returns promptly on
interrupt; callers can inspect ``speech_handle.interrupted`` to decide how
to proceed.
"""

from __future__ import annotations

import asyncio
from unittest.mock import MagicMock

from livekit.agents.llm import FunctionCall
from livekit.agents.voice.events import RunContext
from livekit.agents.voice.speech_handle import SpeechHandle


async def test_speech_handle_wait_for_playout_returns_on_interrupt() -> None:
"""wait_for_playout must unblock when interrupted, not hang on _done_fut.

Regression test for https://github.com/livekit/agents/issues/5359.
Pre-fix this hung on ``_done_fut`` for INTERRUPTION_TIMEOUT (~5s); the
1.0s deadline below would fire and the test would raise TimeoutError.
"""
sh = SpeechHandle.create()

async def _interrupt_after_delay() -> None:
await asyncio.sleep(0.05)
sh._cancel()

interrupt_task = asyncio.create_task(_interrupt_after_delay())
try:
await asyncio.wait_for(sh.wait_for_playout(), timeout=1.0)
finally:
await interrupt_task

assert sh.interrupted
assert not sh.done()


async def test_run_context_wait_for_playout_returns_on_interrupt() -> None:
"""RunContext.wait_for_playout must unblock when the speech is interrupted.

Regression test for https://github.com/livekit/agents/issues/5359.
"""
sh = SpeechHandle.create()
sh._authorize_generation()

fc = FunctionCall(call_id="call_test", arguments="{}", name="noop")
ctx = RunContext(session=MagicMock(), speech_handle=sh, function_call=fc)

async def _interrupt_after_delay() -> None:
await asyncio.sleep(0.05)
sh._cancel()

interrupt_task = asyncio.create_task(_interrupt_after_delay())
try:
await asyncio.wait_for(ctx.wait_for_playout(), timeout=1.0)
finally:
await interrupt_task
sh._mark_done()

assert sh.interrupted


async def test_speech_handle_wait_for_playout_returns_normally_on_completion() -> None:
"""Sanity check: a non-interrupted playout still resolves on _done_fut."""
sh = SpeechHandle.create()

async def _complete_after_delay() -> None:
await asyncio.sleep(0.05)
sh._mark_done()

completion_task = asyncio.create_task(_complete_after_delay())
try:
await asyncio.wait_for(sh.wait_for_playout(), timeout=1.0)
finally:
await completion_task

assert not sh.interrupted
assert sh.done()
Loading