Skip to content

Commit be0477d

Browse files
authored
feat(openai): implement close() for openai TTS (#883)
1 parent 6b94d1a commit be0477d

2 files changed

Lines changed: 47 additions & 25 deletions

File tree

.changeset/vast-colts-burn.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/agents-plugin-openai': patch
3+
---
4+
5+
Implemented close() in openai TTS

plugins/openai/src/tts.ts

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class TTS extends tts.TTS {
3030
#opts: TTSOptions;
3131
#client: OpenAI;
3232
label = 'openai.TTS';
33+
private abortController = new AbortController();
3334

3435
/**
3536
* Create a new instance of OpenAI TTS.
@@ -62,20 +63,27 @@ export class TTS extends tts.TTS {
6263
return new ChunkedStream(
6364
this,
6465
text,
65-
this.#client.audio.speech.create({
66-
input: text,
67-
model: this.#opts.model,
68-
voice: this.#opts.voice,
69-
instructions: this.#opts.instructions,
70-
response_format: 'pcm',
71-
speed: this.#opts.speed,
72-
}),
66+
this.#client.audio.speech.create(
67+
{
68+
input: text,
69+
model: this.#opts.model,
70+
voice: this.#opts.voice,
71+
instructions: this.#opts.instructions,
72+
response_format: 'pcm',
73+
speed: this.#opts.speed,
74+
},
75+
{ signal: this.abortController.signal },
76+
),
7377
);
7478
}
7579

7680
stream(): tts.SynthesizeStream {
7781
throw new Error('Streaming is not supported on OpenAI TTS');
7882
}
83+
84+
async close(): Promise<void> {
85+
this.abortController.abort();
86+
}
7987
}
8088

8189
export class ChunkedStream extends tts.ChunkedStream {
@@ -89,25 +97,34 @@ export class ChunkedStream extends tts.ChunkedStream {
8997
}
9098

9199
protected async run() {
92-
const buffer = await this.stream.then((r) => r.arrayBuffer());
93-
const requestId = shortuuid();
94-
const audioByteStream = new AudioByteStream(OPENAI_TTS_SAMPLE_RATE, OPENAI_TTS_CHANNELS);
95-
const frames = audioByteStream.write(buffer);
96-
97-
let lastFrame: AudioFrame | undefined;
98-
const sendLastFrame = (segmentId: string, final: boolean) => {
99-
if (lastFrame) {
100-
this.queue.put({ requestId, segmentId, frame: lastFrame, final });
101-
lastFrame = undefined;
100+
try {
101+
const buffer = await this.stream.then((r) => r.arrayBuffer());
102+
const requestId = shortuuid();
103+
const audioByteStream = new AudioByteStream(OPENAI_TTS_SAMPLE_RATE, OPENAI_TTS_CHANNELS);
104+
const frames = audioByteStream.write(buffer);
105+
106+
let lastFrame: AudioFrame | undefined;
107+
const sendLastFrame = (segmentId: string, final: boolean) => {
108+
if (lastFrame) {
109+
this.queue.put({ requestId, segmentId, frame: lastFrame, final });
110+
lastFrame = undefined;
111+
}
112+
};
113+
114+
for (const frame of frames) {
115+
sendLastFrame(requestId, false);
116+
lastFrame = frame;
102117
}
103-
};
118+
sendLastFrame(requestId, true);
104119

105-
for (const frame of frames) {
106-
sendLastFrame(requestId, false);
107-
lastFrame = frame;
120+
this.queue.close();
121+
} catch (error) {
122+
if (error instanceof Error && error.name === 'AbortError') {
123+
return;
124+
}
125+
throw error;
126+
} finally {
127+
this.queue.close();
108128
}
109-
sendLastFrame(requestId, true);
110-
111-
this.queue.close();
112129
}
113130
}

0 commit comments

Comments
 (0)