-
Notifications
You must be signed in to change notification settings - Fork 222
feat: Create MultiInputStream API primitive #1036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: adfecf4 The changes in this PR will be included in the next version bump. This PR includes changesets to release 19 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📝 WalkthroughWalkthroughAdds a new MultiInputStream primitive that merges multiple ReadableStream inputs into a single output stream with dynamic add/remove and close semantics. Replaces previous DeferredReadableStream usage in AudioInput, RecorderIO, and ParticipantAudioInputStream to enable runtime input management. Changes
Sequence DiagramsequenceDiagram
participant Client
participant AudioInput
participant MultiInputStream
participant InputStreams as Input<br/>Streams
participant OutputStream
Client->>AudioInput: create AudioInput
AudioInput->>MultiInputStream: new MultiInputStream()
rect rgba(100,150,200,0.5)
Client->>AudioInput: attach input source
AudioInput->>MultiInputStream: addInputStream(source)
MultiInputStream-->>AudioInput: input ID
end
rect rgba(150,100,200,0.5)
MultiInputStream->>InputStreams: read() (concurrent pumps)
InputStreams-->>MultiInputStream: data chunk
MultiInputStream->>OutputStream: write(chunk)
end
rect rgba(100,200,150,0.5)
Client->>AudioInput: remove/swap input
AudioInput->>MultiInputStream: removeInputStream(id)
MultiInputStream->>MultiInputStream: cancel pump / release reader
end
rect rgba(200,150,100,0.5)
Client->>AudioInput: close
AudioInput->>MultiInputStream: close()
MultiInputStream->>MultiInputStream: detach inputs / await pumps
MultiInputStream->>OutputStream: close writer
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…it/agents-js into brian/allow-user-reconnect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@agents/src/stream/multi_input_stream.test.ts`:
- Around line 18-28: The helper function collectAll is unused and triggers a
linter error; either remove the collectAll function entirely from this test file
or rename it to _collectAll (prefix with underscore) to indicate it's
intentionally unused, updating any references if present; locate the async
function collectAll<T>(stream: ReadableStream<T>) in multi_input_stream.test.ts
and apply one of these two fixes to satisfy `@typescript-eslint/no-unused-vars`.
🧹 Nitpick comments (2)
agents/src/stream/multi_input_stream.ts (1)
137-162: Consider logging non-release errors before silently swallowing them.Lines 152-157: when an input stream errors with something other than a release error, the error is silently ignored. This is fine for keeping the output alive, but silently swallowing unexpected errors makes debugging harder. A
console.warnor structured log would help operators diagnose misbehaving input streams.💡 Proposed improvement
} catch (e) { // TypeErrors from releaseLock() during removeInputStream / close are expected. if (!isStreamReaderReleaseError(e)) { // For any other error we silently remove the input — the output stays alive. - // (Contrast with DeferredReadableStream which propagates errors to the output.) + // (Contrast with DeferredReadableStream which propagates errors to the output.) + // Log so operators can diagnose misbehaving input streams. + console.warn(`[MultiInputStream] input ${id} removed due to error:`, e); } } finally {agents/src/voice/room_io/_input.ts (1)
124-128: Fire-and-forgetremoveInputStreamcould briefly interleave old and new audio frames.
void this.multiStream.removeInputStream(...)doesn't await the pump finishing. WhencloseStream()is immediately followed byaddInputStream(e.g., inonTrackSubscribedat Line 150), the old pump may still be writing to the shared writer for a few microticks, causing brief interleaving of old-track and new-track frames.If this is intentional (acceptable for a track switch), a brief comment would help. Otherwise, consider awaiting the removal:
💡 Option: await removal to guarantee clean handoff
- private closeStream() { + private async closeStream() { if (this.currentInputId) { - void this.multiStream.removeInputStream(this.currentInputId); + await this.multiStream.removeInputStream(this.currentInputId); this.currentInputId = null; }Note: this would require
onTrackSubscribedto be made async or to handle the returned promise, so weigh the tradeoff.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.changeset/neat-parents-develop.mdagents/src/stream/index.tsagents/src/stream/multi_input_stream.test.tsagents/src/stream/multi_input_stream.tsagents/src/voice/io.tsagents/src/voice/recorder_io/recorder_io.tsagents/src/voice/room_io/_input.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'
Files:
agents/src/voice/recorder_io/recorder_io.tsagents/src/stream/multi_input_stream.test.tsagents/src/stream/multi_input_stream.tsagents/src/stream/index.tsagents/src/voice/room_io/_input.tsagents/src/voice/io.ts
**/*.{ts,tsx}?(test|example|spec)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
When testing inference LLM, always use full model names from
agents/src/inference/models.ts(e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')
Files:
agents/src/voice/recorder_io/recorder_io.tsagents/src/stream/multi_input_stream.test.tsagents/src/stream/multi_input_stream.tsagents/src/stream/index.tsagents/src/voice/room_io/_input.tsagents/src/voice/io.ts
**/*.{ts,tsx}?(test|example)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Initialize logger before using any LLM functionality with
initializeLogger({ pretty: true })from '@livekit/agents'
Files:
agents/src/voice/recorder_io/recorder_io.tsagents/src/stream/multi_input_stream.test.tsagents/src/stream/multi_input_stream.tsagents/src/stream/index.tsagents/src/voice/room_io/_input.tsagents/src/voice/io.ts
🧠 Learnings (1)
📚 Learning: 2026-01-16T14:33:39.551Z
Learnt from: CR
Repo: livekit/agents-js PR: 0
File: .cursor/rules/agent-core.mdc:0-0
Timestamp: 2026-01-16T14:33:39.551Z
Learning: Applies to **/{examples,test}/**/*.test.ts : Include both basic streaming and tool calling tests to verify full LLM functionality
Applied to files:
agents/src/stream/multi_input_stream.test.tsagents/src/voice/io.ts
🧬 Code graph analysis (2)
agents/src/stream/multi_input_stream.test.ts (1)
agents/src/stream/multi_input_stream.ts (2)
stream(40-42)MultiInputStream(26-163)
agents/src/voice/io.ts (2)
agents/src/stream/index.ts (1)
MultiInputStream(7-7)agents/src/stream/multi_input_stream.ts (1)
MultiInputStream(26-163)
🪛 GitHub Actions: Build
agents/src/stream/multi_input_stream.test.ts
[error] 18-18: 'collectAll' is defined but never used. Allowed unused vars must match /^_/u. [Error/@typescript-eslint/no-unused-vars]
🔇 Additional comments (15)
agents/src/stream/multi_input_stream.ts (5)
1-11: LGTM — SPDX header and imports are correct.
26-42: Clean design: single IdentityTransform with one shared writer is a good foundation.
60-73:addInputStreamlooks correct — guarded by_closed, returns unique IDs.
81-98:removeInputStreamcorrectly sequences delete → releaseLock → await pump.The ordering ensures the pump's
finallyblock is a harmless no-op on theinputsmap, and the pump promise is awaited before returning to the caller.
105-131:close()— clean shutdown sequence with proper idempotency.The release → allSettled → writer-close ordering is correct. Snapshotting
pumpPromises.values()via spread before clearing avoids losing references to in-flight pumps..changeset/neat-parents-develop.md (1)
1-5: Changeset looks good.agents/src/voice/recorder_io/recorder_io.ts (2)
108-108: Good addition — closing the input recorder during shutdown prevents resource leaks.
376-383: Correct wiring of the intercepting stream throughmultiStream.addInputStream.The
RecorderAudioInputnow pipes the intercepting stream through theMultiInputStreaminherited fromAudioInput, which is consistent with the new design.agents/src/stream/index.ts (1)
7-7: Export looks good — alphabetically ordered and consistent with existing exports.agents/src/voice/io.ts (1)
86-95: Clean replacement ofDeferredReadableStreamwithMultiInputStream.The
protectedvisibility allows subclasses to add/remove inputs, while the publicclose()method provides a clean lifecycle API. Thestreamgetter correctly delegates tomultiStream.stream.agents/src/voice/room_io/_input.ts (1)
186-192: Clean override — event listeners removed before closing streams.agents/src/stream/multi_input_stream.test.ts (4)
30-41: Good basic smoke test covering initial state assertions.
215-278: Solid error-handling tests — both single-errored-input and mixed-good-bad scenarios covered.
284-333: Close semantics are well tested — done signal, pending read resolution, idempotency, and post-close rejection all covered.
518-551: Good edge case: infinite source validates thatclose()gracefully terminates active pumps.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: adfecf4736
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Release every input reader to unblock pending reads inside pumps. | ||
| for (const reader of this.inputs.values()) { | ||
| reader.releaseLock(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancel input streams on close to avoid runaway producers
In close(), the code only calls reader.releaseLock() for each input. Releasing the lock does not stop a push-based ReadableStream source; it simply detaches the reader. If an input keeps enqueuing (e.g., an infinite audio source), it will continue running after close() and can buffer unbounded data or leak CPU/memory because nothing is consuming it. This regression is specific to MultiInputStream because close() is expected to stop all inputs. Consider calling await reader.cancel() (then release the lock) when shutting down, so producers are actually stopped.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class could solves the issue where user want the room to stay alive event after participant disconnected and being able to pipe the audio stream from new participants.
Related PR: #1018
Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Tests