Skip to content

Commit 4cc4e38

Browse files
authored
Merge pull request #35 from tokk-nv/fix/asr-stream-resilience-and-riva-docs
fix(asr): auto-restart stream on unexpected death, prevent pipeline crash
2 parents aa55415 + e0d9463 commit 4cc4e38

4 files changed

Lines changed: 124 additions & 61 deletions

File tree

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,90 @@
1-
# ASR stream goes stale after mute/unmute or long silence
1+
# ASR stream goes stale / dies prematurely
22

3-
After muting then unmuting the mic (or after a prolonged period where no speech reaches Riva), the ASR stream silently stops producing results even though PCM audio is still flowing.
3+
The Riva ASR gRPC stream can die mid-session — silently stopping result production or terminating entirely. This happens in multiple scenarios: after mute/unmute, after long idle periods, or even during normal operation with 0 results.
44

55
## Observed behavior
66

7+
### Scenario 1: Stale after long LLM block + mute/unmute
78
- Session `c87be1b2` (2026-03-14): 9 turns completed successfully.
89
- Turn 8 triggered a degenerate LLM reasoning loop (10,101 chars, **91.89 s** wall-clock).
910
- During that wait, the user muted and later unmuted the mic.
10-
- After turn 9 completed, no further `asr_final` events appeared for ~1.5 min despite the green **user_amplitude** waveform being visible on the timeline (PCM capture was healthy).
11-
- Terminal showed no ASR errors; the stream ended normally at session close with `Stream task timeout, cancelling`.
11+
- After turn 9 completed, no further `asr_final` events appeared for ~1.5 min despite the green **user_amplitude** waveform being visible on the timeline.
12+
- Terminal showed no ASR errors; the stream ended normally at session close.
13+
14+
### Scenario 2: Stream dies after ~2 min (normal operation, no mute)
15+
- Session on jat-4cbb47141bb7 (2026-03-14): 3 turns completed in ~2 min.
16+
- After turn 3, Riva ASR stream ended with 16 results total.
17+
- `_feed_pcm_to_pipeline` continued sending PCM but `send_audio()` raised `RuntimeError: Stream not started`**crashing the entire pipeline**.
18+
19+
### Scenario 3: Stream dies with 0 results (~23s)
20+
- Session `f9748641` on same device: ASR stream started, 0 results received, stream ended after ~23s.
21+
- Same `RuntimeError` crash.
22+
23+
### Scenario 4: USB contention with Brio 4K camera
24+
- On Jetsons with Brio 4K (USB 3.0) + USB audio, severe bus contention causes:
25+
- Camera `VIDIOC_REQBUFS: errno=19 (No such device)` — camera disappears from bus.
26+
- `arecord: audio open error: Device or resource busy` — audio device locked by previous pipeline.
27+
- ASR stream dies with 0 results; pipeline crashes before user even speaks.
1228

1329
## Why amplitude shows but ASR does not
1430

15-
In `_feed_pcm_to_pipeline`, amplitude is always computed and sent to the client (lines 1005-1024) regardless of `mic_muted`. The ASR send is gated:
31+
In `_feed_pcm_to_pipeline`, amplitude is always computed and sent to the client regardless of `mic_muted`. The ASR send is gated:
1632

1733
```python
1834
if not mic_muted:
19-
await asr.send_audio(pcm_bytes)
35+
accepted = await asr.send_audio(pcm_bytes)
2036
```
2137

22-
So the timeline waveform looks alive, but if the Riva gRPC stream has internally timed out (or VAD state has gone stale after 90+ seconds of silence/mute), newly sent audio produces no results.
38+
So the timeline waveform looks alive, but if the Riva gRPC stream has internally timed out or died, newly sent audio is silently dropped (or previously, would crash).
2339

24-
## Probable root cause (needs confirmation)
40+
## Root causes
2541

2642
Riva Streaming ASR has internal session limits:
2743
- **gRPC keepalive / idle timeout**: if no audio is sent for an extended period the server may silently close the stream.
28-
- **VAD state**: after a long silence gap, the VAD model may reset or require a fresh trigger to start detecting speech again.
29-
- **Maximum session duration**: Riva may cap single-stream duration; after that, the stream yields no more results even though it stays open.
30-
31-
The exact Riva behavior here is unconfirmed — the stream appeared open (no error logged) but stopped producing finals.
44+
- **VAD state**: after a long silence gap, the VAD model may reset or require a fresh trigger.
45+
- **Maximum session duration**: Riva may cap single-stream duration (~2 min observed); after that, the stream yields no more results.
46+
- **USB bus contention**: on Jetson devices with multiple USB peripherals (especially high-bandwidth cameras like Brio 4K), the audio device can become temporarily unavailable, preventing `arecord` from opening.
3247

33-
## What is already in place
48+
## Implemented fixes (2026-03-14)
3449

35-
- `mic_muted` gates `asr.send_audio()` in the classic pipeline (line 1003).
36-
- On mute, 0.5 s of silence is injected (`b"\x00" * int(16000 * 2 * 0.5)`) to flush any pending VAD partial (line 1041-1044).
37-
- On unmute, `mic_muted = False` resumes sending PCM to ASR.
38-
- No stream-health monitoring or automatic restart exists today.
39-
40-
## Proposed solutions (pick one or combine)
41-
42-
### Option A: Keep-alive noise during mute
50+
### Fix 1: Graceful `send_audio` (riva.py)
51+
`send_audio()` now returns `bool` instead of raising `RuntimeError` when the stream is dead. Returns `False` if `_sync_audio_queue` is `None`, allowing the PCM feeder to continue without crashing.
4352

44-
While `mic_muted` is True, instead of sending nothing, send **very low amplitude white noise** (e.g., ±10 out of ±32768) at normal cadence. This keeps the gRPC stream active and the VAD model warm without triggering false speech detection.
53+
### Fix 2: Log-once warning (_feed_pcm_to_pipeline)
54+
When `send_audio` returns `False`, a warning is logged once per dead-stream episode: `[asr] send_audio dropped — ASR stream not active (waiting for auto-restart)`. The flag resets on stream restart.
4555

46-
Pros: Simplest change; no stream lifecycle management. \
47-
Cons: Assumes the Riva stream itself is still healthy; does not help if the stream has a hard session-duration cap.
56+
### Fix 3: Auto-restart in asr_consumer (voice_pipeline.py)
57+
`asr_consumer` now wraps the `async for result in asr.receive_results()` loop in a `while not stopped.is_set()` loop. When the inner iterator ends (stream died) and the pipeline is still running:
58+
1. Increments restart counter (max 10).
59+
2. Logs a WARNING with result count and restart number.
60+
3. Emits `asr_stream_restart` timeline event.
61+
4. Calls `asr.stop_stream()` → sleep with exponential backoff (2s, 4s, ..., max 10s) → `asr.start_stream()`.
62+
5. Resets the `send_audio` log-once flag and result counter.
63+
6. Re-enters the `async for` loop on the fresh stream.
4864

49-
### Option B: Restart ASR stream after stale timeout
65+
## Remaining work
5066

51-
Monitor elapsed time since the last `asr_final`. If no final arrives within a configurable window (e.g., 60 s while unmuted), tear down the current `RivaASRBackend` stream and create a fresh one.
52-
53-
1. Track `_last_asr_final_time` in the turn executor; update it on every `asr_final`.
54-
2. In `server_capture_consumer` (or a watchdog task), check `time.time() - _last_asr_final_time > ASR_STALE_TIMEOUT`.
55-
3. If stale and `not mic_muted`: call `asr.stop()`, then `asr.start()` to open a fresh streaming session.
56-
4. Log `[asr] Stream restarted after stale timeout` at WARNING level.
57-
58-
Pros: Covers all root causes (idle timeout, VAD reset, session-duration cap). \
59-
Cons: Slightly more complex; brief gap in ASR coverage during restart (~200 ms).
60-
61-
### Option C: Proactive stream rotation
62-
63-
After every turn (or every N turns), close and re-open the ASR stream. This preempts any session-duration limit and keeps the stream fresh.
64-
65-
Pros: Eliminates stale state entirely. \
66-
Cons: Adds latency at turn boundaries; may lose a partial if speech is ongoing during rotation.
67+
### Option A: Keep-alive noise during mute
68+
While `mic_muted` is True, send very low amplitude white noise (e.g., ±10 out of ±32768) at normal cadence. This keeps the gRPC stream active and the VAD model warm.
6769

68-
## Recommendation
70+
Pros: Prevents idle timeout during mute. \
71+
Cons: Does not help if stream has a hard session-duration cap (but auto-restart covers that now).
6972

70-
**Option A + B combined**: send keep-alive noise during mute (A) to prevent idle timeout, and add a stale-timeout watchdog (B) as a safety net for unexpected stream failures. Option C is heavier and only needed if Riva has a hard session cap that A+B cannot address.
73+
### Device contention mitigations
74+
- Investigate separating camera and audio onto different USB host controllers.
75+
- Consider CSI camera instead of USB to free USB bandwidth entirely.
76+
- Current `arecord` retry logic (8 attempts with backoff) helps, but persistent `Device or resource busy` across all retries indicates the previous pipeline's `arecord` process was not killed before the new one started.
7177

72-
## Diagnosis checklist (before implementing)
78+
## Diagnosis checklist
7379

74-
- [ ] Confirm Riva Streaming ASR session limits: check `riva_asr` service config for `max_duration_seconds`, keepalive settings, or gRPC deadline.
75-
- [ ] Add a log line in `RivaASRBackend` when the gRPC response iterator ends (to distinguish "server closed stream" from "no results but stream open").
76-
- [ ] Reproduce by muting for 60+ s mid-session and verifying ASR stops producing results on unmute.
80+
- [x] Add auto-restart when gRPC stream ends unexpectedly.
81+
- [x] Make `send_audio` graceful (no crash on dead stream).
82+
- [x] Emit timeline events for stream restarts.
83+
- [ ] Confirm Riva session limits: check `riva_asr` config for `max_duration_seconds`, keepalive settings, or gRPC deadline.
84+
- [ ] Implement keep-alive noise during mute (Option A).
85+
- [ ] Investigate cleanup of old `arecord` processes on WebSocket reconnect.
7786

7887
## Effort
7988

80-
**Small–Medium**: Option A is ~30 min (noise generator in `_feed_pcm_to_pipeline`). Option B is ~1–2 hours (watchdog task + stream restart plumbing + tests).
89+
**Done**: Auto-restart (Fix 3) + graceful send_audio (Fix 1). \
90+
**Remaining**: Keep-alive noise ~30 min. Device cleanup investigation ~1–2 hours.

docs/setup_riva.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ riva_model_loc="riva-model-repo" # Docker volume (default)
268268
# Language/model selection
269269
asr_acoustic_model="parakeet_1.1b" # Default for ARM64 v2.24.0
270270
asr_language_code="en-US" # ASR language
271+
asr_accessory_model="silero_diarizer" # Adds Silero VAD + speaker diarization
271272
use_asr_streaming_throughput_mode=false # false=low latency (recommended)
272273

273274
tts_language_code=("multi") # TTS language
@@ -280,9 +281,17 @@ tts_language_code=("multi") # TTS language
280281
- Language codes available: `en-US`, `multi` (multilingual)
281282
- Pre-optimized for Jetson GPUs (no build step required)
282283

284+
**ASR accessory model** (`asr_accessory_model`):
285+
- Set to `"silero_diarizer"` to deploy with **Silero VAD** and speaker diarization
286+
- This makes the `parakeet-1.1b-en-US-asr-streaming-silero-vad-sortformer` model available alongside the base `parakeet-1.1b-en-US-asr-streaming`
287+
- The Silero VAD variant provides better voice activity detection — without it, the base model often clips the beginning of utterances (e.g., "How many monitors do you see?" becomes "monitors do you see") because its default VAD reacts too late to speech onset
288+
- Only available when `asr_acoustic_model` is `"parakeet_1.1b"`
289+
- After changing this setting, re-run `riva_init.sh` and `riva_start.sh`
290+
283291
**For Multi-modal AI Studio and Live RIVA WebUI**, recommended settings:
284292
- Enable ASR + TTS only (NLP/NMT not needed)
285293
- Use default `parakeet_1.1b` for ASR (best quality/latency balance)
294+
- Set `asr_accessory_model="silero_diarizer"` for Silero VAD support
286295
- Keep `use_asr_streaming_throughput_mode=false` for real-time voice apps
287296
- SSL/TLS can be added later for production deployments
288297

src/multi_modal_ai_studio/backends/asr/riva.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,21 @@ async def start_stream(self) -> None:
174174

175175
self.logger.info("Riva ASR stream started")
176176

177-
async def send_audio(self, audio_chunk: bytes) -> None:
177+
async def send_audio(self, audio_chunk: bytes) -> bool:
178178
"""Send audio chunk for recognition.
179179
180180
Args:
181181
audio_chunk: Raw PCM audio bytes (16kHz, 16-bit, mono)
182182
183-
Raises:
184-
RuntimeError: If stream not started
183+
Returns:
184+
True if audio was queued, False if stream is not active (caller should
185+
not treat this as fatal — the stream may be restarting).
185186
"""
186187
if self._sync_audio_queue is None:
187-
raise RuntimeError("Stream not started. Call start_stream() first.")
188+
return False
188189

189190
self._sync_audio_queue.put(audio_chunk)
191+
return True
190192

191193
async def receive_results(self) -> AsyncIterator[ASRResult]:
192194
"""Yield recognition results as they become available.

src/multi_modal_ai_studio/webui/voice_pipeline.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,10 @@ async def _feed_pcm_to_pipeline(
10011001
return (last_amplitude_time, False, 0.0, 0.0)
10021002
now = time.time() - session.timeline.start_time
10031003
if not mic_muted:
1004-
await asr.send_audio(pcm_bytes)
1004+
accepted = await asr.send_audio(pcm_bytes)
1005+
if not accepted and not getattr(_feed_pcm_to_pipeline, "_warned_dead_stream", False):
1006+
_feed_pcm_to_pipeline._warned_dead_stream = True
1007+
logger.warning("[asr] send_audio dropped — ASR stream not active (waiting for auto-restart)")
10051008
amplitudes = _pcm_rms_slices(pcm_bytes, sample_rate=16000, window_s=_amplitude_window_s)
10061009
did_send = False
10071010
amp = 0.0
@@ -1151,13 +1154,19 @@ async def receive_loop() -> None:
11511154
async def asr_consumer() -> None:
11521155
"""Independent ASR task: forward every partial/final to client immediately; enqueue finals for turn_executor.
11531156
Enables barge-in (turn_executor can be cancelled when new final arrives) and avoids phantom partial at tts_complete.
1154-
On stream end, if we had a partial but no final (e.g. user stopped before VAD), enqueue a synthetic final so one turn runs."""
1157+
On stream end, if we had a partial but no final (e.g. user stopped before VAD), enqueue a synthetic final so one turn runs.
1158+
1159+
Auto-restart: if Riva's gRPC stream dies (idle timeout, server-side limit, or bus contention)
1160+
and the pipeline has not been stopped, the stream is restarted with exponential backoff."""
11551161
last_asr_final_text: Optional[str] = None
11561162
last_asr_final_ts: Optional[float] = None
11571163
last_partial_text: Optional[str] = None
11581164
last_partial_ts: Optional[float] = None
11591165
asr_received_count = 0
1166+
_MAX_ASR_RESTARTS = 10
1167+
_asr_restart_count = 0
11601168
try:
1169+
while not stopped.is_set():
11611170
async for result in asr.receive_results():
11621171
if stopped.is_set():
11631172
break
@@ -1242,16 +1251,51 @@ async def asr_consumer() -> None:
12421251
asr_consumer._finals_count = finals_count
12431252
logger.info("[asr] asr_final #%d enqueued for LLM/TTS: %r", finals_count, text[:80])
12441253
finals_queue.put_nowait(result)
1254+
1255+
# --- Inner async-for ended (stream died or returned None) ---
1256+
if stopped.is_set():
1257+
break
1258+
1259+
_asr_restart_count += 1
1260+
if _asr_restart_count > _MAX_ASR_RESTARTS:
1261+
logger.error("[asr] Exceeded max restarts (%d); giving up", _MAX_ASR_RESTARTS)
1262+
break
1263+
1264+
backoff = min(2.0 * _asr_restart_count, 10.0)
1265+
logger.warning(
1266+
"[asr] Stream died after %d result(s); restarting (%d/%d) in %.1fs",
1267+
asr_received_count, _asr_restart_count, _MAX_ASR_RESTARTS, backoff,
1268+
)
1269+
try:
1270+
now_ts = (time.time() - session.timeline.start_time) if session.timeline.start_time else 0
1271+
session.timeline.add_event(
1272+
"asr_stream_restart", Lane.SPEECH,
1273+
data={"restart": _asr_restart_count, "prev_results": asr_received_count},
1274+
)
1275+
await send_event({
1276+
"event_type": "asr_stream_restart",
1277+
"lane": "speech",
1278+
"data": {"restart": _asr_restart_count, "prev_results": asr_received_count},
1279+
"timestamp": now_ts,
1280+
})
1281+
except Exception:
1282+
pass
1283+
1284+
await asr.stop_stream()
1285+
await asyncio.sleep(backoff)
1286+
if stopped.is_set():
1287+
break
1288+
await asr.start_stream()
1289+
_feed_pcm_to_pipeline._warned_dead_stream = False
1290+
asr_received_count = 0
1291+
logger.info("[asr] Stream restarted successfully (%d/%d)", _asr_restart_count, _MAX_ASR_RESTARTS)
1292+
# --- end while ---
12451293
except asyncio.CancelledError:
12461294
pass
12471295
except Exception as e:
12481296
logger.exception("asr_consumer error: %s", e)
12491297
finally:
1250-
logger.info("[asr] Stream ended; received %d ASR result(s) total", asr_received_count)
1251-
# Only create a synthetic final when the stream had no final at all (e.g. user stopped before
1252-
# VAD sent a final). Do NOT create one when we already had a final and the last partial is
1253-
# different (e.g. Riva sent early final "How about computer?" then partials "joke") — that
1254-
# would create a phantom extra turn; the partial is the tail of the same utterance.
1298+
logger.info("[asr] Stream ended; received %d ASR result(s) total (restarts=%d)", asr_received_count, _asr_restart_count)
12551299
if last_partial_text and last_asr_final_text is None:
12561300
try:
12571301
now_ts = (time.time() - session.timeline.start_time) if session.timeline.start_time else 0
@@ -1264,9 +1308,7 @@ async def asr_consumer() -> None:
12641308
)
12651309
logger.info("[asr] Stream ended with only partial; enqueueing synthetic final for LLM/TTS: %r", last_partial_text[:80])
12661310
finals_queue.put_nowait(synthetic)
1267-
# Add to timeline so replay/saved session has this final (use partial's time, not stream-end)
12681311
session.timeline.add_event("asr_final", Lane.SPEECH, data={"text": last_partial_text, "confidence": 1.0})
1269-
# Send asr_final to client so UI shows final_transcript (otherwise only partials were sent)
12701312
await send_event({
12711313
"event_type": "asr_final",
12721314
"lane": "speech",

0 commit comments

Comments
 (0)