Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,62 @@ import { isEmptyObj } from './internal/utils/values';

const WORKLOAD_IDENTITY_API_KEY_PLACEHOLDER = 'workload-identity-auth';

/**
* Wrap a `Response` so that the given `clearRequestTimeout` callback runs
* exactly once, when the body is either fully read, cancelled, or errors.
* Responses without a body have the timeout cleared synchronously.
*
* This lets the request-timeout timer stay armed across the body-read phase
* (e.g. `await response.json()`), which native `await fetch()` does not cover.
*/
function wrapResponseForRequestTimeout(response: Response, clearRequestTimeout: () => void): Response {
if (!response.body) {
clearRequestTimeout();
return response;
}

let cleared = false;
const clearOnce = () => {
if (cleared) return;
cleared = true;
clearRequestTimeout();
};

const originalBody = response.body;
const wrappedBody = new ReadableStream({
async start(controller) {
const reader = originalBody.getReader();
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
Comment on lines +272 to +275
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Respect backpressure in wrapped response stream

The wrapper reads the upstream response.body to completion inside start(), which runs eagerly when new ReadableStream(...) is constructed, not when the caller pulls. Because this loop never checks downstream demand, slow consumers (especially stream: true SSE users that process events incrementally) can accumulate an unbounded in-memory queue and lose the original backpressure behavior of fetch bodies, leading to large memory growth or OOM on long streams.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PROGRAMA ENFERMEDADES TRANSMISIBLES II.pdf

}
controller.close();
} catch (err) {
controller.error(err);
} finally {
clearOnce();
try {
reader.releaseLock();
} catch {
// reader may already be released
}
}
},
cancel(reason) {
clearOnce();
return originalBody.cancel(reason);
Comment on lines +289 to +291
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cancel the active reader instead of locked stream

In cancel(), calling originalBody.cancel(reason) is invalid while start() holds a reader lock (originalBody.getReader()), so cancellation can reject with TypeError: ReadableStream is locked and fail to stop the upstream body read. This affects callers that cancel early (e.g., response.body.cancel() / early termination paths), leaving the network read running despite cancellation.

Useful? React with 👍 / 👎.

},
});

return new Response(wrappedBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
}

export type ApiKeySetter = () => Promise<string>;

export interface ClientOptions {
Expand Down Expand Up @@ -887,12 +943,22 @@ export class OpenAI {
fetchOptions.method = method.toUpperCase();
}

let response: Response;
try {
// use undefined this binding; fetch errors if bound to something else in browser/cloudflare
return await this.fetch.call(undefined, url, fetchOptions);
} finally {
response = await this.fetch.call(undefined, url, fetchOptions);
} catch (err) {
clearTimeout(timeout);
throw err;
}

// Keep the timer armed until the body is fully read, cancelled, or errored.
// The `await fetch()` above resolves as soon as response *headers* arrive, so
// clearing the timeout here would leave subsequent body readers — e.g.
// `await response.json()` in `internal/parse.ts` — without any timeout.
// Servers that flush 200 headers fast and then stall mid-body would cause
// the SDK to hang indefinitely. See openai/openai-node#1825.
return wrapResponseForRequestTimeout(response, () => clearTimeout(timeout));
}

private async shouldRetry(response: Response): Promise<boolean> {
Expand Down
50 changes: 50 additions & 0 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,56 @@ describe('default encoder', () => {
});
});

describe('timeout covers body read phase', () => {
// Regression for https://github.com/openai/openai-node/issues/1825
// The SDK must abort the request if the server sends headers fast but
// stalls while streaming the body. Previously the internal timer was
// cleared as soon as `await fetch()` resolved (at headers arrival),
// leaving `response.json()` unguarded and able to hang forever.
test('rejects when the body read stalls past the configured timeout', async () => {
const CONFIGURED_TIMEOUT_MS = 50;
const BODY_STALL_MS = 2000;

const testFetch = async (
_url: string | URL | Request,
{ signal }: RequestInit = {},
): Promise<Response> => {
// Server-style body: headers are returned instantly, but the body
// stream never produces any chunks. We honour the AbortSignal so the
// SDK's retry/timeout path can still short-circuit it.
const body = new ReadableStream<Uint8Array>({
start(controller) {
signal?.addEventListener('abort', () => controller.error(new Error('aborted')), { once: true });
// Safety net so the test never actually hangs longer than
// BODY_STALL_MS, regardless of whether the fix is in place.
setTimeout(() => {
controller.close();
}, BODY_STALL_MS).unref?.();
},
});

return new Response(body, {
status: 200,
headers: { 'Content-Type': 'application/json' },
});
};

const client = new OpenAI({
apiKey: 'My API Key',
timeout: CONFIGURED_TIMEOUT_MS,
maxRetries: 0,
fetch: testFetch,
});

const started = Date.now();
await expect(client.request({ path: '/foo', method: 'get' })).rejects.toThrow();
const elapsed = Date.now() - started;

// Must reject well before the body stall would naturally complete.
expect(elapsed).toBeLessThan(BODY_STALL_MS);
});
});

describe('retries', () => {
test('retry on timeout', async () => {
let count = 0;
Expand Down