Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import traceback
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from time import time as _wall_time
from types import FrameType
from typing import TYPE_CHECKING, Annotated, Any, Literal

Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._pushed_duration: float = 0.0
self._capture_start: float = 0.0
self._flush_task: asyncio.Task[None] | None = None
self._playback_started_fired: bool = False

self._output_buf = bytearray()
self._audio_lock = threading.Lock()
Expand All @@ -151,6 +153,9 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._paused_at: float | None = None
self._paused_duration: float = 0.0

# track the segment id to avoid stale async operations
self._segment_id = 0

@property
def audio_lock(self) -> threading.Lock:
return self._audio_lock
Expand All @@ -175,7 +180,6 @@ async def capture_frame(self, frame: rtc.AudioFrame) -> None:

if not self._pushed_duration:
self._capture_start = time.monotonic()
self.on_playback_started(created_at=time.time())

self._pushed_duration += frame.duration
with self._audio_lock:
Expand All @@ -195,6 +199,9 @@ def clear_buffer(self) -> None:
with self._audio_lock:
self._output_buf.clear()
self._output_buf_empty.set()
# redundant (_wait_for_playout does the same, albeit async) but defensive
self._segment_id += 1
self._playback_started_fired = False

if self._pushed_duration:
self._interrupted_ev.set()
Expand Down Expand Up @@ -248,6 +255,24 @@ async def _wait_buffered_audio() -> None:
self._interrupted_ev.clear()
with self._audio_lock:
self._output_buf_empty.set()
self._playback_started_fired = False
self._segment_id += 1

def _maybe_mark_playback_started(self) -> None:
"""Mark the playback as started if it hasn't been already. Must be called under ``audio_lock``."""
if self._playback_started_fired:
return
self._playback_started_fired = True
t = _wall_time()
segment_id = self._segment_id
self._loop.call_soon_threadsafe(
lambda: self._on_playback_started(created_at=t, segment_id=segment_id)
)

def _on_playback_started(self, *, created_at: float, segment_id: int) -> None:
if self._segment_id != segment_id:
return
self.on_playback_started(created_at=created_at)


class AgentsConsole:
Expand Down Expand Up @@ -698,6 +723,8 @@ def _sd_output_callback(self, outdata: np.ndarray, frames: int, time: Any, *_: A
bytes_needed = frames * 2
if len(self._io_audio_output.audio_buffer) < bytes_needed:
available_bytes = len(self._io_audio_output.audio_buffer)
if available_bytes > 0:
self._io_audio_output._maybe_mark_playback_started()
outdata[: available_bytes // 2, 0] = np.frombuffer(
self._io_audio_output.audio_buffer,
dtype=np.int16,
Expand All @@ -707,6 +734,7 @@ def _sd_output_callback(self, outdata: np.ndarray, frames: int, time: Any, *_: A
del self._io_audio_output.audio_buffer[:available_bytes] # TODO: optimize
self.io_loop.call_soon_threadsafe(self._io_audio_output.mark_output_empty)
else:
self._io_audio_output._maybe_mark_playback_started()
chunk = self._io_audio_output.audio_buffer[:bytes_needed]
outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frames)
del self._io_audio_output.audio_buffer[:bytes_needed]
Expand Down
Loading