Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chatapi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"start": "ts-node src/index.ts",
"build": "tsc",
"dev": "nodemon --exec ts-node src/index.ts",
"test:integration": "COUCHDB_HOST=http://localhost:5984 ts-node --transpile-only src/tests/assistant.integration.test.ts",
"lint": "eslint . --ext .ts",
"lint-fix": "eslint . --ext .ts --fix"
},
Expand Down
1 change: 1 addition & 0 deletions chatapi/src/config/ai-providers.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const initialize = async () => {
};

assistant = {
'enabled': doc?.assistant?.enabled ?? true,
'name': doc?.assistant?.name || '',
'instructions': doc?.assistant?.instructions || '',
};
Expand Down
2 changes: 1 addition & 1 deletion chatapi/src/models/chat.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface Providers {
}

interface Assistant {
enabled?: boolean;
name: string;
instructions: string;
}
Expand All @@ -33,4 +34,3 @@ export interface ChatItem {
query: string;
response: string;
}

144 changes: 144 additions & 0 deletions chatapi/src/tests/assistant.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/* eslint-disable quote-props, @typescript-eslint/naming-convention */
import assert from 'assert';

import { aiChat } from '../utils/chat.utils';
import { assistant as assistantConfig, models } from '../config/ai-providers.config';
import * as assistantUtils from '../utils/chat-assistant.utils';

function createFakeProvider() {
return {
ai: {
chat: {
completions: {
create: async () => ({ choices: [ { message: { content: 'provider response' } } ] })
}
}
},
defaultModel: 'gpt-4o-mini'
};
}

async function testAssistantNonStreamingPath() {
const originalModel = models.openai;
const originalAssistantEnabled = assistantConfig.enabled;
const originalCreateAssistant = assistantUtils.createAssistant;
const originalCreateThread = assistantUtils.createThread;
const originalAddToThread = assistantUtils.addToThread;
const originalCreateRun = assistantUtils.createRun;
const originalWaitForRunCompletion = assistantUtils.waitForRunCompletion;
const originalRetrieveResponse = assistantUtils.retrieveResponse;

let runCreated = false;

try {
models.openai = createFakeProvider();
assistantConfig.enabled = true;

(assistantUtils as any).createAssistant = (async () => ({ id: 'asst_1' })) as typeof assistantUtils.createAssistant;
(assistantUtils as any).createThread = (async () => ({ id: 'thread_1' })) as typeof assistantUtils.createThread;
(assistantUtils as any).addToThread = (async () => ({})) as typeof assistantUtils.addToThread;
(assistantUtils as any).createRun = (async () => {
runCreated = true;
return { id: 'run_1' };
}) as typeof assistantUtils.createRun;
(assistantUtils as any).waitForRunCompletion = (async () => ({ status: 'completed' })) as typeof assistantUtils.waitForRunCompletion;
(assistantUtils as any).retrieveResponse = (async () => 'assistant non-stream response') as typeof assistantUtils.retrieveResponse;

const response = await aiChat(
[ { role: 'user', content: 'hello' } ],
{ name: 'openai', model: 'gpt-4o-mini' },
true,
{ data: 'context' },
false
);

assert.strictEqual(response, 'assistant non-stream response');
assert.strictEqual(runCreated, true);
} finally {
models.openai = originalModel;
assistantConfig.enabled = originalAssistantEnabled;
(assistantUtils as any).createAssistant = originalCreateAssistant;
(assistantUtils as any).createThread = originalCreateThread;
(assistantUtils as any).addToThread = originalAddToThread;
(assistantUtils as any).createRun = originalCreateRun;
(assistantUtils as any).waitForRunCompletion = originalWaitForRunCompletion;
(assistantUtils as any).retrieveResponse = originalRetrieveResponse;
}
}

async function testAssistantStreamingPath() {
const originalModel = models.openai;
const originalAssistantEnabled = assistantConfig.enabled;
const originalCreateAssistant = assistantUtils.createAssistant;
const originalCreateThread = assistantUtils.createThread;
const originalAddToThread = assistantUtils.addToThread;
const originalCreateAndHandleRunWithStreaming = assistantUtils.createAndHandleRunWithStreaming;

const streamedChunks: string[] = [];

try {
models.openai = createFakeProvider();
assistantConfig.enabled = true;

(assistantUtils as any).createAssistant = (async () => ({ id: 'asst_2' })) as typeof assistantUtils.createAssistant;
(assistantUtils as any).createThread = (async () => ({ id: 'thread_2' })) as typeof assistantUtils.createThread;
(assistantUtils as any).addToThread = (async () => ({})) as typeof assistantUtils.addToThread;
(assistantUtils as any).createAndHandleRunWithStreaming = (
(async (_threadID: string, _assistantID: string, _instructions: string, callback?: (response: string) => void) => {
callback?.('hello');
callback?.(' world');
return 'hello world';
}) as typeof assistantUtils.createAndHandleRunWithStreaming
);

const response = await aiChat(
[ { role: 'user', content: 'hello' } ],
{ name: 'openai', model: 'gpt-4o-mini' },
true,
{ data: 'context' },
true,
(chunk: string) => streamedChunks.push(chunk)
);

assert.strictEqual(response, 'hello world');
assert.deepStrictEqual(streamedChunks, [ 'hello', ' world' ]);
} finally {
models.openai = originalModel;
assistantConfig.enabled = originalAssistantEnabled;
(assistantUtils as any).createAssistant = originalCreateAssistant;
(assistantUtils as any).createThread = originalCreateThread;
(assistantUtils as any).addToThread = originalAddToThread;
(assistantUtils as any).createAndHandleRunWithStreaming = originalCreateAndHandleRunWithStreaming;
}
}

async function testAssistantProviderValidation() {
const originalModel = models.perplexity;
const originalAssistantEnabled = assistantConfig.enabled;
try {
models.perplexity = createFakeProvider();
assistantConfig.enabled = true;

await assert.rejects(
aiChat(
[ { role: 'user', content: 'hello' } ],
{ name: 'perplexity', model: 'sonar' },
true,
{ data: 'context' },
false
),
/only compatible with openai provider/
);
} finally {
models.perplexity = originalModel;
assistantConfig.enabled = originalAssistantEnabled;
}
}

(async () => {
await testAssistantNonStreamingPath();
await testAssistantStreamingPath();
await testAssistantProviderValidation();
// eslint-disable-next-line no-console
console.log('assistant integration tests passed');
})();
180 changes: 125 additions & 55 deletions chatapi/src/utils/chat-assistant.utils.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,121 @@
import { keys } from '../config/ai-providers.config';
import { assistant } from '../config/ai-providers.config';

const TERMINAL_RUN_STATUSES = new Set([ 'completed', 'failed', 'cancelled', 'expired' ]);

export function normalizeAssistantError(error: unknown, operation: string): Error {
if (error instanceof Error) {
return new Error(`Assistant API ${operation} failed: ${error.message}`);
}

if (typeof error === 'object' && error !== null) {
const errorObj = error as { message?: string; status?: number; code?: string };
const details = [
errorObj.status ? `status=${errorObj.status}` : '',
errorObj.code ? `code=${errorObj.code}` : '',
errorObj.message ? `message=${errorObj.message}` : ''
].filter(Boolean).join(' ');

return new Error(`Assistant API ${operation} failed${details ? ` (${details})` : ''}`);
}

return new Error(`Assistant API ${operation} failed`);
}

/**
* Creates an assistant with the specified model
* @param model - Model to use for assistant
* @returns Assistant object
*/
export async function createAssistant(model: string) {
return await keys.openai.beta.assistants.create({
'name': assistant?.name,
'instructions': assistant?.instructions,
'tools': [ { 'type': 'code_interpreter' } ],
model,
});
try {
return await keys.openai.beta.assistants.create({
'name': assistant?.name,
'instructions': assistant?.instructions,
'tools': [ { 'type': 'code_interpreter' } ],
model,
});
} catch (error) {
throw normalizeAssistantError(error, 'createAssistant');
}
}

export async function createThread() {
return await keys.openai.beta.threads.create();
try {
return await keys.openai.beta.threads.create();
} catch (error) {
throw normalizeAssistantError(error, 'createThread');
}
}

export async function addToThread(threadId: any, message: string) {
return await keys.openai.beta.threads.messages.create(
threadId,
{
'role': 'user',
'content': message
}
);
try {
return await keys.openai.beta.threads.messages.create(
threadId,
{
'role': 'user',
'content': message
}
);
} catch (error) {
throw normalizeAssistantError(error, 'addToThread');
}
}

export async function createRun(threadID: any, assistantID: any, instructions?: string) {
return await keys.openai.beta.threads.runs.create(
threadID,
{
'assistant_id': assistantID,
instructions
}
);
try {
return await keys.openai.beta.threads.runs.create(
threadID,
{
'assistant_id': assistantID,
instructions
}
);
} catch (error) {
throw normalizeAssistantError(error, 'createRun');
}
}

export async function waitForRunCompletion(threadId: any, runId: any) {
let runStatus = await keys.openai.beta.threads.runs.retrieve(threadId, runId);
while (runStatus.status !== 'completed') {
await new Promise((resolve) => setTimeout(resolve, 1000));
runStatus = await keys.openai.beta.threads.runs.retrieve(threadId, runId);
const maxAttempts = 60;
const pollDelayMs = 1000;
const timeoutMs = 90_000;
const startedAt = Date.now();

let attempts = 0;
let runStatus: any;
while (attempts < maxAttempts) {
attempts += 1;
try {
runStatus = await keys.openai.beta.threads.runs.retrieve(threadId, runId);
} catch (error) {
throw normalizeAssistantError(error, 'waitForRunCompletion');
}

if (TERMINAL_RUN_STATUSES.has(runStatus.status)) {
if (runStatus.status === 'completed') {
return runStatus;
}
throw new Error(`Assistant run finished with status "${runStatus.status}"`);
}

if (Date.now() - startedAt > timeoutMs) {
throw new Error(`Assistant run timed out after ${timeoutMs}ms`);
}

await new Promise((resolve) => setTimeout(resolve, pollDelayMs));
}
return runStatus;

throw new Error(`Assistant run exceeded retry limit (${maxAttempts})`);
}

export async function retrieveResponse(threadId: any): Promise<string> {
const messages = await keys.openai.beta.threads.messages.list(threadId);
let messages;
try {
messages = await keys.openai.beta.threads.messages.list(threadId);
} catch (error) {
throw normalizeAssistantError(error, 'retrieveResponse');
}
for (const msg of messages.data) {
if ('text' in msg.content[0] && msg.role === 'assistant') {
return msg.content[0].text.value;
Expand All @@ -65,39 +131,43 @@ export async function createAndHandleRunWithStreaming(
let completionText = '';

return new Promise((resolve, reject) => {
keys.openai.beta.threads.runs.stream(threadID, {
'assistant_id': assistantID,
instructions
})
.on('textDelta', (textDelta: { value: string }) => {
if (textDelta && textDelta.value) {
completionText += textDelta.value;
if (callback) {
callback(textDelta.value);
}
}
try {
keys.openai.beta.threads.runs.stream(threadID, {
'assistant_id': assistantID,
instructions
})
.on('toolCallDelta', (toolCallDelta: { type: string; code_interpreter: { input: string; outputs: any[] } }) => {
if (toolCallDelta.type === 'code_interpreter') {
if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.input) {
completionText += toolCallDelta.code_interpreter.input;
.on('textDelta', (textDelta: { value: string }) => {
if (textDelta && textDelta.value) {
completionText += textDelta.value;
if (callback) {
callback(toolCallDelta.code_interpreter.input);
callback(textDelta.value);
}
}
if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.outputs) {
toolCallDelta.code_interpreter.outputs.forEach((output) => {
if (output.type === 'logs' && output.logs) {
completionText += output.logs;
if (callback) {
callback(output.logs);
}
})
.on('toolCallDelta', (toolCallDelta: { type: string; code_interpreter: { input: string; outputs: any[] } }) => {
if (toolCallDelta.type === 'code_interpreter') {
if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.input) {
completionText += toolCallDelta.code_interpreter.input;
if (callback) {
callback(toolCallDelta.code_interpreter.input);
}
});
}
if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.outputs) {
toolCallDelta.code_interpreter.outputs.forEach((output) => {
if (output.type === 'logs' && output.logs) {
completionText += output.logs;
if (callback) {
callback(output.logs);
}
}
});
}
}
}
})
.on('end', () => resolve(completionText))
.on('error', reject);
})
.on('end', () => resolve(completionText))
.on('error', (error: unknown) => reject(normalizeAssistantError(error, 'streamRun')));
} catch (error) {
reject(normalizeAssistantError(error, 'streamRun'));
}
});
}
Loading
Loading