Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
LLMStream,
)
from .realtime import (
DualChatContextSyncSession,
GenerationCreatedEvent,
InputSpeechStartedEvent,
InputSpeechStoppedEvent,
Expand Down Expand Up @@ -98,6 +99,7 @@
"RealtimeModelError",
"RealtimeCapabilities",
"RealtimeSession",
"DualChatContextSyncSession",
"InputTranscriptionCompleted",
"InputSpeechStartedEvent",
"InputSpeechStoppedEvent",
Expand Down
17 changes: 15 additions & 2 deletions livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import asyncio
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, Awaitable
from collections.abc import AsyncIterable, Awaitable, Sequence
from dataclasses import dataclass
from types import TracebackType
from typing import Generic, Literal, TypeVar
from typing import Generic, Literal, Protocol, TypeVar, runtime_checkable

from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -207,3 +207,16 @@ async def aclose(self) -> None: ...
def start_user_activity(self) -> None:
"""notifies the model that user activity has started"""
pass


@runtime_checkable
class DualChatContextSyncSession(Protocol):
"""Optional capability for realtime sessions that handles synchronization between a local and a
remote chat context, where certain items may arrive in remote context before local context.

This capability allows external entities to notify the session that certain items have arrived
in local context.
"""

# Notify that function calls have been started/completed, and are therefore in local context
def notify_fc_processed(self, item_ids: Sequence[str]) -> None: ...
9 changes: 8 additions & 1 deletion livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry import context as otel_context, trace

from livekit import rtc
from livekit.agents.llm.realtime import MessageGeneration
from livekit.agents.llm.realtime import DualChatContextSyncSession, MessageGeneration
from livekit.agents.metrics.base import Metadata

from .. import llm, stt, tts, utils, vad
Expand Down Expand Up @@ -2372,6 +2372,10 @@ async def _realtime_generation_task_impl(
assert self._rt_session is not None, "rt_session is not available"
assert isinstance(self.llm, llm.RealtimeModel), "llm is not a realtime model"

def _notify_fc_processed(fnc_calls: list[llm.FunctionCall]) -> None:
if fnc_calls and isinstance(self._rt_session, DualChatContextSyncSession):
self._rt_session.notify_fc_processed([fc.id for fc in fnc_calls])

current_span.set_attributes(
{
trace_types.ATTR_GEN_AI_OPERATION_NAME: "chat",
Expand Down Expand Up @@ -2564,6 +2568,7 @@ def _tool_execution_started_cb(fnc_call: llm.FunctionCall) -> None:
speech_handle._item_added([fnc_call])
self._agent._chat_ctx.items.append(fnc_call)
self._session._tool_items_added([fnc_call])
_notify_fc_processed([fnc_call])

def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
if out.fnc_call_out:
Expand Down Expand Up @@ -2675,6 +2680,7 @@ def _create_assistant_message(

if speech_handle.interrupted:
await utils.aio.cancel_and_wait(exe_task)
_notify_fc_processed(function_calls)
Comment on lines 2682 to +2683
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Incomplete cleanup of _inflight_fc_ids on interruption leaks stale entries that permanently block deletion

When a speech is interrupted, _notify_fc_processed(function_calls) is called at line 2683 to release all in-flight function call IDs. However, function_calls is populated by the _read_fnc_stream task, which was already cancelled at line 2632 (await utils.aio.cancel_and_wait(*tasks)). This means function_calls may be incomplete — it might not include function calls whose IDs were already added to _inflight_fc_ids by the server's conversation.item.added event.

Root Cause: timing gap between server event and stream reader

The _inflight_fc_ids set is populated in _handle_conversion_item_added (realtime_model.py:1510-1511) when the server sends conversation.item.added (step 3 in the event sequence). However, a function call only enters function_calls after response.output_item.done (step 8) pushes it to function_ch, and then _read_fnc_stream reads it from the tee.

Two scenarios cause a leak:

  1. Response cancelled before response.output_item.done: The server creates the function_call item (step 3) but cancels the response before step 8. The function call never reaches function_ch, so it's never in function_calls.

  2. _read_fnc_stream cancelled before reading buffered items: Even if step 8 fires and the item is in the tee buffer, _read_fnc_stream is cancelled at agent_activity.py:2632 before reading it.

In both cases, the stale ID remains in _inflight_fc_ids indefinitely. Every future update_chat_ctx call sees the function_call in _remote_chat_ctx but not in local context, yet the _inflight_fc_ids guard (realtime_model.py:1189-1190) prevents deletion. The item permanently occupies space in the server's conversation context window.

Impact: After an interruption during function call generation, orphaned function_call items can accumulate in the remote context and can never be cleaned up by update_chat_ctx (e.g., during summarization), causing gradual context window waste.

Prompt for agents
In livekit-agents/livekit/agents/voice/agent_activity.py, at lines 2681-2684 (the interrupted return path), the _notify_fc_processed(function_calls) call uses an incomplete list because _read_fnc_stream was already cancelled. To fix this, instead of relying on function_calls (which comes from the cancelled stream reader), the cleanup should directly clear all inflight IDs for this generation. One approach: add a method to the DualChatContextSyncSession protocol (and its implementations in realtime_model.py and realtime_model_beta.py) like clear_all_inflight_fcs() that does self._inflight_fc_ids.clear(), and call it in the interrupted path. Alternatively, track which response's function calls are inflight (keyed by response_id) so only the relevant IDs are cleared. The simplest safe fix is to call a method that clears all inflight IDs when the generation is interrupted, since no other generation should be active concurrently.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, function_calls can be incomplete if _read_fnc_stream was cancelled before consuming all items from the stream. However, this is expected and covered by a secondary clean-up path: _handle_conversion_item_deleted in realtime_model.py calls self._inflight_fc_ids.discard(event.item_id) whenever the server deletes an item. During interruption, the server typically deletes/truncates these items, which triggers the clean-up.

In the worst case (server retains the item), the ID lingers in _inflight_fc_ids, which prevents update_chat_ctx from deleting it, but I believe that is the correct behaviour, since the item legitimately exists on the server and deleting it would cause the cascade corruption this fix prevents.

return

# wait for the tool execution to complete
Expand All @@ -2687,6 +2693,7 @@ def _create_assistant_message(
await exe_task
finally:
self._background_speeches.discard(speech_handle)
_notify_fc_processed(function_calls)

# important: no agent output should be used after this point

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import time
import weakref
from collections.abc import Iterator
from collections.abc import Iterator, Sequence
from dataclasses import dataclass
from typing import Any, Literal, overload
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
Expand Down Expand Up @@ -690,6 +690,7 @@ def __init__(self, realtime_model: RealtimeModel) -> None:

self._current_generation: _ResponseGeneration | None = None
self._remote_chat_ctx = llm.remote_chat_context.RemoteChatContext()
self._inflight_fc_ids: set[str] = set()

self._update_chat_ctx_lock = asyncio.Lock()
self._update_fnc_ctx_lock = asyncio.Lock()
Expand Down Expand Up @@ -1185,6 +1186,8 @@ def _is_content_empty(msg_id: str) -> bool:
# so in those cases, we do not want to remove them from the server context
if _is_content_empty(msg_id):
continue
if msg_id in self._inflight_fc_ids:
continue
_delete_item(msg_id)

for previous_msg_id, msg_id in diff_ops.to_create:
Expand All @@ -1196,6 +1199,8 @@ def _is_content_empty(msg_id: str) -> bool:
# we don't want to recreate these items there
if _is_content_empty(msg_id):
continue
if msg_id in self._inflight_fc_ids:
continue
_delete_item(msg_id)
_create_item(previous_msg_id, msg_id)

Expand Down Expand Up @@ -1486,6 +1491,10 @@ def _handle_response_content_part_added(self, event: ResponseContentPartAddedEve
["text"] if item_type == "text" else ["audio", "text"]
)

def notify_fc_processed(self, item_ids: Sequence[str]) -> None:
for item_id in item_ids:
self._inflight_fc_ids.discard(item_id)

def _handle_conversion_item_added(self, event: ConversationItemAdded) -> None:
assert event.item.id is not None, "item.id is None"

Expand All @@ -1498,6 +1507,9 @@ def _handle_conversion_item_added(self, event: ConversationItemAdded) -> None:
f"failed to insert item `{event.item.id}`: {str(e)}",
)

if event.item.type == "function_call":
self._inflight_fc_ids.add(event.item.id)

if fut := self._item_create_future.pop(event.item.id, None):
fut.set_result(None)

Expand All @@ -1511,6 +1523,8 @@ def _handle_conversion_item_deleted(self, event: ConversationItemDeletedEvent) -
f"failed to delete item `{event.item_id}`: {str(e)}",
)

self._inflight_fc_ids.discard(event.item_id)

if fut := self._item_delete_future.pop(event.item_id, None):
fut.set_result(None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import time
import weakref
from collections.abc import Iterator
from collections.abc import Iterator, Sequence
from dataclasses import dataclass
from typing import Any, Literal, overload
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
Expand Down Expand Up @@ -537,6 +537,7 @@ def __init__(self, realtime_model: RealtimeModelBeta) -> None:

self._current_generation: _ResponseGeneration | None = None
self._remote_chat_ctx = llm.remote_chat_context.RemoteChatContext()
self._inflight_fc_ids: set[str] = set()

self._update_chat_ctx_lock = asyncio.Lock()
self._update_fnc_ctx_lock = asyncio.Lock()
Expand Down Expand Up @@ -1019,13 +1020,17 @@ def _create_item(previous_msg_id: str | None, msg_id: str) -> None:
)

for msg_id in diff_ops.to_remove:
if msg_id in self._inflight_fc_ids:
continue
_delete_item(msg_id)

for previous_msg_id, msg_id in diff_ops.to_create:
_create_item(previous_msg_id, msg_id)

# update the items with the same id but different content
for previous_msg_id, msg_id in diff_ops.to_update:
if msg_id in self._inflight_fc_ids:
continue
_delete_item(msg_id)
_create_item(previous_msg_id, msg_id)

Expand Down Expand Up @@ -1299,6 +1304,10 @@ def _handle_response_content_part_added(self, event: ResponseContentPartAddedEve
["text"] if item_type == "text" else ["audio", "text"]
)

def notify_fc_processed(self, item_ids: Sequence[str]) -> None:
for item_id in item_ids:
self._inflight_fc_ids.discard(item_id)

def _handle_conversion_item_created(self, event: ConversationItemCreatedEvent) -> None:
assert event.item.id is not None, "item.id is None"

Expand All @@ -1311,6 +1320,9 @@ def _handle_conversion_item_created(self, event: ConversationItemCreatedEvent) -
f"failed to insert item `{event.item.id}`: {str(e)}",
)

if event.item.type == "function_call":
self._inflight_fc_ids.add(event.item.id)

if fut := self._item_create_future.pop(event.item.id, None):
fut.set_result(None)

Expand All @@ -1324,6 +1336,8 @@ def _handle_conversion_item_deleted(self, event: ConversationItemDeletedEvent) -
f"failed to delete item `{event.item_id}`: {str(e)}",
)

self._inflight_fc_ids.discard(event.item_id)

if fut := self._item_delete_future.pop(event.item_id, None):
fut.set_result(None)

Expand Down