Skip to content

Commit 8332cca

Browse files
1egomanlukasIO
andauthored
Datastream - TextStreamReader/ByteStreamReader read timeouts and abort signals (#1611)
Co-authored-by: lukasIO <[email protected]>
1 parent 2b9c237 commit 8332cca

5 files changed

Lines changed: 174 additions & 30 deletions

File tree

.changeset/famous-suns-count.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
feat: add ability to include an AbortSignal when reading from a datastream

examples/demo/demo.ts

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ let currentRoom: Room | undefined;
5757

5858
let startTime: number;
5959

60+
let streamReaderAbortController: AbortController | undefined;
61+
6062
const searchParams = new URLSearchParams(window.location.search);
6163
const storedUrl = searchParams.get('url') ?? 'ws://localhost:7880';
6264
const storedToken = searchParams.get('token') ?? '';
@@ -259,30 +261,44 @@ const appActions = {
259261
});
260262

261263
room.registerTextStreamHandler('lk.chat', async (reader, participant) => {
264+
streamReaderAbortController = new AbortController();
265+
(<HTMLButtonElement>$('cancel-chat-receive-button')).style.display = 'block';
266+
262267
const info = reader.info;
263-
if (info.size) {
264-
handleChatMessage(
265-
{
266-
id: info.id,
267-
timestamp: info.timestamp,
268-
message: await reader.readAll(),
269-
},
270-
room.getParticipantByIdentity(participant?.identity),
271-
);
272-
} else {
268+
269+
let message = '';
270+
try {
271+
for await (const chunk of reader.withAbortSignal(streamReaderAbortController.signal)) {
272+
message += chunk;
273+
handleChatMessage(
274+
{
275+
id: info.id,
276+
timestamp: info.timestamp,
277+
message,
278+
},
279+
room.getParticipantByIdentity(participant?.identity),
280+
);
281+
}
282+
} catch (err) {
283+
message += 'ERROR';
273284
handleChatMessage(
274285
{
275286
id: info.id,
276287
timestamp: info.timestamp,
277-
message: await reader.readAll(),
288+
message,
278289
},
279-
280290
room.getParticipantByIdentity(participant?.identity),
281291
);
292+
throw err;
293+
}
282294

295+
if (!info.size) {
283296
appendLog('text stream finished');
284297
}
285298
console.log('final info including close extensions', reader.info);
299+
300+
streamReaderAbortController = undefined;
301+
(<HTMLButtonElement>$('cancel-chat-receive-button')).style.display = 'none';
286302
});
287303

288304
room.registerByteStreamHandler('files', async (reader, participant) => {
@@ -305,6 +321,9 @@ const appActions = {
305321

306322
appendLog(`Started receiving file "${info.name}" from ${participant?.identity}`);
307323

324+
streamReaderAbortController = new AbortController();
325+
(<HTMLButtonElement>$('cancel-chat-receive-button')).style.display = 'block';
326+
308327
reader.onProgress = (progress) => {
309328
console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`);
310329

@@ -314,9 +333,21 @@ const appActions = {
314333
}
315334
};
316335

317-
const result = new Blob(await reader.readAll(), { type: info.mimeType });
336+
let byteContents;
337+
try {
338+
byteContents = await reader.readAll({
339+
signal: streamReaderAbortController.signal,
340+
});
341+
} catch (err) {
342+
progressLabel.innerText = `Receiving "${info.name}" - readAll aborted!`;
343+
throw err;
344+
}
345+
const result = new Blob(byteContents, { type: info.mimeType });
318346
appendLog(`Completely received file "${info.name}" from ${participant?.identity}`);
319347

348+
streamReaderAbortController = undefined;
349+
(<HTMLButtonElement>$('cancel-chat-receive-button')).style.display = 'none';
350+
320351
progressContainer.remove();
321352

322353
if (info.mimeType.startsWith('image/')) {
@@ -537,6 +568,15 @@ const appActions = {
537568
}
538569
},
539570

571+
cancelChatReceive: () => {
572+
if (!streamReaderAbortController) {
573+
return;
574+
}
575+
streamReaderAbortController.abort();
576+
577+
(<HTMLButtonElement>$('cancel-chat-receive-button')).style.display = 'none';
578+
},
579+
540580
disconnectRoom: () => {
541581
if (currentRoom) {
542582
currentRoom.disconnect();
@@ -642,7 +682,7 @@ async function sendGreetingTo(participant: Participant) {
642682

643683
for (const char of greeting) {
644684
await streamWriter.write(char);
645-
await sleep(20);
685+
await sleep(50);
646686
}
647687
await streamWriter.close();
648688
}

examples/demo/index.html

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,15 @@ <h3>Chat</h3>
289289
</button>
290290
</div>
291291
</div>
292+
<button
293+
id="cancel-chat-receive-button"
294+
class="btn btn-outline-secondary btn-sm"
295+
type="button"
296+
onclick="appActions.cancelChatReceive()"
297+
style="display: none;"
298+
>
299+
Cancel current receive
300+
</button>
292301
</div>
293302
</div>
294303
</div>

examples/demo/styles.css

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#chat-input-area {
3535
margin-top: 1.2rem;
36+
margin-bottom: 0.5rem;
3637
display: grid;
3738
grid-template-columns: auto min-content;
3839
gap: 1.25rem;

src/room/StreamReader.ts

Lines changed: 105 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import type { DataStream_Chunk } from '@livekit/protocol';
22
import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types';
3-
import { bigIntToNumber } from './utils';
3+
import { Future, bigIntToNumber } from './utils';
4+
5+
export type BaseStreamReaderReadAllOpts = {
6+
/** An AbortSignal can be used to terminate reads early. */
7+
signal?: AbortSignal;
8+
};
49

510
abstract class BaseStreamReader<T extends BaseStreamInfo> {
611
protected reader: ReadableStream<DataStream_Chunk>;
@@ -26,7 +31,7 @@ abstract class BaseStreamReader<T extends BaseStreamInfo> {
2631

2732
onProgress?: (progress: number | undefined) => void;
2833

29-
abstract readAll(): Promise<string | Array<Uint8Array>>;
34+
abstract readAll(opts?: BaseStreamReaderReadAllOpts): Promise<string | Array<Uint8Array>>;
3035
}
3136

3237
export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
@@ -40,35 +45,77 @@ export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
4045

4146
onProgress?: (progress: number | undefined) => void;
4247

48+
signal?: AbortSignal;
49+
4350
[Symbol.asyncIterator]() {
4451
const reader = this.reader.getReader();
4552

53+
let rejectingSignalFuture = new Future<never>();
54+
let activeSignal: AbortSignal | null = null;
55+
let onAbort: (() => void) | null = null;
56+
if (this.signal) {
57+
const signal = this.signal;
58+
onAbort = () => {
59+
rejectingSignalFuture.reject?.(signal.reason);
60+
};
61+
signal.addEventListener('abort', onAbort);
62+
activeSignal = signal;
63+
}
64+
65+
const cleanup = () => {
66+
reader.releaseLock();
67+
68+
if (activeSignal && onAbort) {
69+
activeSignal.removeEventListener('abort', onAbort);
70+
}
71+
72+
this.signal = undefined;
73+
};
74+
4675
return {
4776
next: async (): Promise<IteratorResult<Uint8Array>> => {
4877
try {
49-
const { done, value } = await reader.read();
78+
const { done, value } = await Promise.race([
79+
reader.read(),
80+
rejectingSignalFuture.promise,
81+
]);
5082
if (done) {
5183
return { done: true, value: undefined as any };
5284
} else {
5385
this.handleChunkReceived(value);
5486
return { done: false, value: value.content };
5587
}
56-
} catch (error) {
57-
// TODO handle errors
58-
return { done: true, value: undefined };
88+
} catch (err) {
89+
cleanup();
90+
throw err;
5991
}
6092
},
6193

94+
// note: `return` runs only for premature exits, see:
95+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#errors_during_iteration
6296
async return(): Promise<IteratorResult<Uint8Array>> {
63-
reader.releaseLock();
97+
cleanup();
6498
return { done: true, value: undefined };
6599
},
66100
};
67101
}
68102

69-
async readAll(): Promise<Array<Uint8Array>> {
103+
/**
104+
* Injects an AbortSignal, which if aborted, will terminate the currently active
105+
* stream iteration operation.
106+
*
107+
* Note that when using AbortSignal.timeout(...), the timeout applies across
108+
* the whole iteration operation, not just one individual chunk read.
109+
*/
110+
withAbortSignal(signal: AbortSignal) {
111+
this.signal = signal;
112+
return this;
113+
}
114+
115+
async readAll(opts: BaseStreamReaderReadAllOpts = {}): Promise<Array<Uint8Array>> {
70116
let chunks: Set<Uint8Array> = new Set();
71-
for await (const chunk of this) {
117+
const iterator = opts.signal ? this.withAbortSignal(opts.signal) : this;
118+
for await (const chunk of iterator) {
72119
chunks.add(chunk);
73120
}
74121
return Array.from(chunks);
@@ -81,6 +128,8 @@ export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
81128
export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
82129
private receivedChunks: Map<number, DataStream_Chunk>;
83130

131+
signal?: AbortSignal;
132+
84133
/**
85134
* A TextStreamReader instance can be used as an AsyncIterator that returns the entire string
86135
* that has been received up to the current point in time.
@@ -123,10 +172,35 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
123172
const reader = this.reader.getReader();
124173
const decoder = new TextDecoder();
125174

175+
let rejectingSignalFuture = new Future<never>();
176+
let activeSignal: AbortSignal | null = null;
177+
let onAbort: (() => void) | null = null;
178+
if (this.signal) {
179+
const signal = this.signal;
180+
onAbort = () => {
181+
rejectingSignalFuture.reject?.(signal.reason);
182+
};
183+
signal.addEventListener('abort', onAbort);
184+
activeSignal = signal;
185+
}
186+
187+
const cleanup = () => {
188+
reader.releaseLock();
189+
190+
if (activeSignal && onAbort) {
191+
activeSignal.removeEventListener('abort', onAbort);
192+
}
193+
194+
this.signal = undefined;
195+
};
196+
126197
return {
127198
next: async (): Promise<IteratorResult<string>> => {
128199
try {
129-
const { done, value } = await reader.read();
200+
const { done, value } = await Promise.race([
201+
reader.read(),
202+
rejectingSignalFuture.promise,
203+
]);
130204
if (done) {
131205
return { done: true, value: undefined };
132206
} else {
@@ -137,22 +211,37 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
137211
value: decoder.decode(value.content),
138212
};
139213
}
140-
} catch (error) {
141-
// TODO handle errors
142-
return { done: true, value: undefined };
214+
} catch (err) {
215+
cleanup();
216+
throw err;
143217
}
144218
},
145219

220+
// note: `return` runs only for premature exits, see:
221+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#errors_during_iteration
146222
async return(): Promise<IteratorResult<string>> {
147-
reader.releaseLock();
223+
cleanup();
148224
return { done: true, value: undefined };
149225
},
150226
};
151227
}
152228

153-
async readAll(): Promise<string> {
229+
/**
230+
* Injects an AbortSignal, which if aborted, will terminate the currently active
231+
* stream iteration operation.
232+
*
233+
* Note that when using AbortSignal.timeout(...), the timeout applies across
234+
* the whole iteration operation, not just one individual chunk read.
235+
*/
236+
withAbortSignal(signal: AbortSignal) {
237+
this.signal = signal;
238+
return this;
239+
}
240+
241+
async readAll(opts: BaseStreamReaderReadAllOpts = {}): Promise<string> {
154242
let finalString: string = '';
155-
for await (const chunk of this) {
243+
const iterator = opts.signal ? this.withAbortSignal(opts.signal) : this;
244+
for await (const chunk of iterator) {
156245
finalString += chunk;
157246
}
158247
return finalString;

0 commit comments

Comments
 (0)