Skip to content

Conversation

@toubatbrian
Copy link
Contributor

@toubatbrian toubatbrian commented Feb 7, 2026

This class could solves the issue where user want the room to stay alive event after participant disconnected and being able to pipe the audio stream from new participants.

Related PR: #1018

Summary by CodeRabbit

  • New Features

    • MultiInputStream primitive for managing multiple input streams dynamically.
    • Audio input now exposes a close operation to shut down underlying inputs.
  • Bug Fixes / Reliability

    • Improved handling of audio recorder and subscribed track lifecycle for cleaner shutdown and interception behavior.
  • Tests

    • Comprehensive tests covering MultiInputStream behavior, concurrency, edge cases, and error handling.

Open with Devin

@changeset-bot
Copy link

changeset-bot bot commented Feb 7, 2026

🦋 Changeset detected

Latest commit: adfecf4

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 19 packages
Name Type
@livekit/agents Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-baseten Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-deepgram Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-hedra Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-lemonslice Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugins-test Patch
@livekit/agents-plugin-xai Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link

coderabbitai bot commented Feb 7, 2026

📝 Walkthrough

Walkthrough

Adds a new MultiInputStream primitive that merges multiple ReadableStream inputs into a single output stream with dynamic add/remove and close semantics. Replaces previous DeferredReadableStream usage in AudioInput, RecorderIO, and ParticipantAudioInputStream to enable runtime input management.

Changes

Cohort / File(s) Summary
Changeset & Export
\.changeset/neat-parents-develop.md, agents/src/stream/index.ts
Adds a changeset entry and re-exports the new MultiInputStream from the stream module.
Core Implementation
agents/src/stream/multi_input_stream.ts
New public MultiInputStream<T> class: manages concurrent pumps from multiple ReadableStream inputs, provides addInputStream/removeInputStream/close, tracks input count and closed state, and handles error/cleanup semantics.
Tests
agents/src/stream/multi_input_stream.test.ts
Comprehensive tests validating creation, concurrent reads, dynamic add/remove, error handling, backpressure, teeing, close semantics, id uniqueness, and edge cases.
Audio Integration
agents/src/voice/io.ts, agents/src/voice/recorder_io/recorder_io.ts, agents/src/voice/room_io/_input.ts
Replaces DeferredReadableStream with MultiInputStream<AudioFrame>, updates stream getters to return multiStream.stream, wires interception via addInputStream, adds AudioInput.close() and updates participant input lifecycle (currentInputId tracking and removal on close).

Sequence Diagram

sequenceDiagram
    participant Client
    participant AudioInput
    participant MultiInputStream
    participant InputStreams as Input<br/>Streams
    participant OutputStream

    Client->>AudioInput: create AudioInput
    AudioInput->>MultiInputStream: new MultiInputStream()

    rect rgba(100,150,200,0.5)
    Client->>AudioInput: attach input source
    AudioInput->>MultiInputStream: addInputStream(source)
    MultiInputStream-->>AudioInput: input ID
    end

    rect rgba(150,100,200,0.5)
    MultiInputStream->>InputStreams: read() (concurrent pumps)
    InputStreams-->>MultiInputStream: data chunk
    MultiInputStream->>OutputStream: write(chunk)
    end

    rect rgba(100,200,150,0.5)
    Client->>AudioInput: remove/swap input
    AudioInput->>MultiInputStream: removeInputStream(id)
    MultiInputStream->>MultiInputStream: cancel pump / release reader
    end

    rect rgba(200,150,100,0.5)
    Client->>AudioInput: close
    AudioInput->>MultiInputStream: close()
    MultiInputStream->>MultiInputStream: detach inputs / await pumps
    MultiInputStream->>OutputStream: close writer
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • rektdeckard
  • lukasIO
  • 1egoman

Poem

🐰
Streams I tend and streams I sow,
Many inputs, one soft flow,
Add, remove—I'll let them play,
Pumps hum gently through the day,
Close with care, then hop away.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The PR description is vague and lacks structured information required by the template, making it difficult to fully assess the scope and quality of changes. Fill out the complete PR template including explicit 'Changes Made', 'Pre-Review Checklist', 'Testing', and 'Additional Notes' sections with detailed information about modifications to AudioInput, RecorderIO, and ParticipantAudioInputStream.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Create MultiInputStream API primitive' directly and clearly summarizes the main change—introducing a new MultiInputStream class API.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch brian/allow-user-reconnect

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@agents/src/stream/multi_input_stream.test.ts`:
- Around line 18-28: The helper function collectAll is unused and triggers a
linter error; either remove the collectAll function entirely from this test file
or rename it to _collectAll (prefix with underscore) to indicate it's
intentionally unused, updating any references if present; locate the async
function collectAll<T>(stream: ReadableStream<T>) in multi_input_stream.test.ts
and apply one of these two fixes to satisfy `@typescript-eslint/no-unused-vars`.
🧹 Nitpick comments (2)
agents/src/stream/multi_input_stream.ts (1)

137-162: Consider logging non-release errors before silently swallowing them.

Lines 152-157: when an input stream errors with something other than a release error, the error is silently ignored. This is fine for keeping the output alive, but silently swallowing unexpected errors makes debugging harder. A console.warn or structured log would help operators diagnose misbehaving input streams.

💡 Proposed improvement
     } catch (e) {
       // TypeErrors from releaseLock() during removeInputStream / close are expected.
       if (!isStreamReaderReleaseError(e)) {
         // For any other error we silently remove the input — the output stays alive.
-        // (Contrast with DeferredReadableStream which propagates errors to the output.)
+        // (Contrast with DeferredReadableStream which propagates errors to the output.)
+        // Log so operators can diagnose misbehaving input streams.
+        console.warn(`[MultiInputStream] input ${id} removed due to error:`, e);
       }
     } finally {
agents/src/voice/room_io/_input.ts (1)

124-128: Fire-and-forget removeInputStream could briefly interleave old and new audio frames.

void this.multiStream.removeInputStream(...) doesn't await the pump finishing. When closeStream() is immediately followed by addInputStream (e.g., in onTrackSubscribed at Line 150), the old pump may still be writing to the shared writer for a few microticks, causing brief interleaving of old-track and new-track frames.

If this is intentional (acceptable for a track switch), a brief comment would help. Otherwise, consider awaiting the removal:

💡 Option: await removal to guarantee clean handoff
- private closeStream() {
+ private async closeStream() {
    if (this.currentInputId) {
-     void this.multiStream.removeInputStream(this.currentInputId);
+     await this.multiStream.removeInputStream(this.currentInputId);
      this.currentInputId = null;
    }

Note: this would require onTrackSubscribed to be made async or to handle the returned promise, so weigh the tradeoff.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8310f8 and cc7bd25.

📒 Files selected for processing (7)
  • .changeset/neat-parents-develop.md
  • agents/src/stream/index.ts
  • agents/src/stream/multi_input_stream.test.ts
  • agents/src/stream/multi_input_stream.ts
  • agents/src/voice/io.ts
  • agents/src/voice/recorder_io/recorder_io.ts
  • agents/src/voice/room_io/_input.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'

Files:

  • agents/src/voice/recorder_io/recorder_io.ts
  • agents/src/stream/multi_input_stream.test.ts
  • agents/src/stream/multi_input_stream.ts
  • agents/src/stream/index.ts
  • agents/src/voice/room_io/_input.ts
  • agents/src/voice/io.ts
**/*.{ts,tsx}?(test|example|spec)

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

When testing inference LLM, always use full model names from agents/src/inference/models.ts (e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')

Files:

  • agents/src/voice/recorder_io/recorder_io.ts
  • agents/src/stream/multi_input_stream.test.ts
  • agents/src/stream/multi_input_stream.ts
  • agents/src/stream/index.ts
  • agents/src/voice/room_io/_input.ts
  • agents/src/voice/io.ts
**/*.{ts,tsx}?(test|example)

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

Initialize logger before using any LLM functionality with initializeLogger({ pretty: true }) from '@livekit/agents'

Files:

  • agents/src/voice/recorder_io/recorder_io.ts
  • agents/src/stream/multi_input_stream.test.ts
  • agents/src/stream/multi_input_stream.ts
  • agents/src/stream/index.ts
  • agents/src/voice/room_io/_input.ts
  • agents/src/voice/io.ts
🧠 Learnings (1)
📚 Learning: 2026-01-16T14:33:39.551Z
Learnt from: CR
Repo: livekit/agents-js PR: 0
File: .cursor/rules/agent-core.mdc:0-0
Timestamp: 2026-01-16T14:33:39.551Z
Learning: Applies to **/{examples,test}/**/*.test.ts : Include both basic streaming and tool calling tests to verify full LLM functionality

Applied to files:

  • agents/src/stream/multi_input_stream.test.ts
  • agents/src/voice/io.ts
🧬 Code graph analysis (2)
agents/src/stream/multi_input_stream.test.ts (1)
agents/src/stream/multi_input_stream.ts (2)
  • stream (40-42)
  • MultiInputStream (26-163)
agents/src/voice/io.ts (2)
agents/src/stream/index.ts (1)
  • MultiInputStream (7-7)
agents/src/stream/multi_input_stream.ts (1)
  • MultiInputStream (26-163)
🪛 GitHub Actions: Build
agents/src/stream/multi_input_stream.test.ts

[error] 18-18: 'collectAll' is defined but never used. Allowed unused vars must match /^_/u. [Error/@typescript-eslint/no-unused-vars]

🔇 Additional comments (15)
agents/src/stream/multi_input_stream.ts (5)

1-11: LGTM — SPDX header and imports are correct.


26-42: Clean design: single IdentityTransform with one shared writer is a good foundation.


60-73: addInputStream looks correct — guarded by _closed, returns unique IDs.


81-98: removeInputStream correctly sequences delete → releaseLock → await pump.

The ordering ensures the pump's finally block is a harmless no-op on the inputs map, and the pump promise is awaited before returning to the caller.


105-131: close() — clean shutdown sequence with proper idempotency.

The release → allSettled → writer-close ordering is correct. Snapshotting pumpPromises.values() via spread before clearing avoids losing references to in-flight pumps.

.changeset/neat-parents-develop.md (1)

1-5: Changeset looks good.

agents/src/voice/recorder_io/recorder_io.ts (2)

108-108: Good addition — closing the input recorder during shutdown prevents resource leaks.


376-383: Correct wiring of the intercepting stream through multiStream.addInputStream.

The RecorderAudioInput now pipes the intercepting stream through the MultiInputStream inherited from AudioInput, which is consistent with the new design.

agents/src/stream/index.ts (1)

7-7: Export looks good — alphabetically ordered and consistent with existing exports.

agents/src/voice/io.ts (1)

86-95: Clean replacement of DeferredReadableStream with MultiInputStream.

The protected visibility allows subclasses to add/remove inputs, while the public close() method provides a clean lifecycle API. The stream getter correctly delegates to multiStream.stream.

agents/src/voice/room_io/_input.ts (1)

186-192: Clean override — event listeners removed before closing streams.

agents/src/stream/multi_input_stream.test.ts (4)

30-41: Good basic smoke test covering initial state assertions.


215-278: Solid error-handling tests — both single-errored-input and mixed-good-bad scenarios covered.


284-333: Close semantics are well tested — done signal, pending read resolution, idempotency, and post-close rejection all covered.


518-551: Good edge case: infinite source validates that close() gracefully terminates active pumps.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@toubatbrian
Copy link
Contributor Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: adfecf4736

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +109 to +112
// Release every input reader to unblock pending reads inside pumps.
for (const reader of this.inputs.values()) {
reader.releaseLock();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cancel input streams on close to avoid runaway producers

In close(), the code only calls reader.releaseLock() for each input. Releasing the lock does not stop a push-based ReadableStream source; it simply detaches the reader. If an input keeps enqueuing (e.g., an infinite audio source), it will continue running after close() and can buffer unbounded data or leak CPU/memory because nothing is consuming it. This regression is specific to MultiInputStream because close() is expected to stop all inputs. Consider calling await reader.cancel() (then release the lock) when shutting down, so producers are actually stopped.

Useful? React with 👍 / 👎.

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no bugs or issues to report.

Open in Devin Review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant