Skip to content

Commit c2ca804

Browse files
authored
feat: Connections v0.1 proxy /v2/connections to Composio (#190)
1 parent 1362f70 commit c2ca804

13 files changed

Lines changed: 959 additions & 4 deletions

File tree

.env.example

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ AGENT_POOL_URL=
3737
AGENT_POOL_API_KEY=
3838
AGENT_ASSETS_API_KEY=
3939

40+
# Composio (optional — /v2/connections/* returns 503 if unset)
41+
# Auth configs are looked up dynamically from Composio by toolkit slug.
42+
COMPOSIO_API_KEY=
43+
COMPOSIO_CONNECTION_CALLBACK_URL=convos://connections/callback
44+
4045
# S3 Lifecycle Test Configuration
4146
# Bucket for lifecycle testing (24h expiry policy)
4247
LIFECYCLE_TEST_BUCKET=

bun.lock

Lines changed: 22 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"@aws-sdk/client-s3": "^3.750.0",
3030
"@aws-sdk/s3-request-presigner": "^3.750.0",
3131
"@bufbuild/protobuf": "^2.2.3",
32+
"@composio/core": "^0.6.10",
3233
"@connectrpc/connect": "^2.0.1",
3334
"@connectrpc/connect-node": "^2.0.1",
3435
"@opentelemetry/exporter-trace-otlp-grpc": "^0.200.0",
@@ -58,7 +59,7 @@
5859
"uint8array-extras": "^1.4.0",
5960
"uuid": "^11.0.5",
6061
"viem": "^2.23.2",
61-
"zod": "^3.24.1"
62+
"zod": "^3.25.76"
6263
},
6364
"devDependencies": {
6465
"@bufbuild/buf": "^1.50.0",
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import {
2+
Composio,
3+
type ConnectedAccountListResponseItem,
4+
} from "@composio/core";
5+
import { COMPOSIO_API_KEY, COMPOSIO_CONNECTION_CALLBACK_URL } from "@/config";
6+
import logger from "@/utils/logger";
7+
8+
// Auth configs rarely change in Composio; cache the toolkit→authConfigId
9+
// resolution in-process so most initiate calls skip the extra round trip.
10+
const AUTH_CONFIG_CACHE_TTL_MS = 5 * 60 * 1000;
11+
12+
type AuthConfigCacheEntry = { authConfigId: string | null; expiresAt: number };
13+
14+
export class ComposioService {
15+
private composio: Composio;
16+
private authConfigCache = new Map<string, AuthConfigCacheEntry>();
17+
18+
constructor(args: { composio: Composio }) {
19+
this.composio = args.composio;
20+
}
21+
22+
/**
23+
* Resolve a toolkit slug (e.g. "googlecalendar") to its Composio authConfigId
24+
* by querying Composio. Picks the first ENABLED auth config for that toolkit.
25+
* Returns null if no enabled config exists.
26+
*
27+
* Clients must send Composio's canonical slug (case-insensitive).
28+
*/
29+
async resolveAuthConfigId(toolkit: string): Promise<string | null> {
30+
const now = Date.now();
31+
const normalized = toolkit.toLowerCase();
32+
const hit = this.authConfigCache.get(normalized);
33+
if (hit && hit.expiresAt > now) {
34+
return hit.authConfigId;
35+
}
36+
37+
const list = await this.composio.authConfigs.list({ toolkit: normalized });
38+
const enabled = list.items.find(
39+
(item) =>
40+
item.toolkit.slug.toLowerCase() === normalized &&
41+
item.status === "ENABLED",
42+
);
43+
const authConfigId = enabled?.id ?? null;
44+
45+
if (!authConfigId) {
46+
logger.warn(
47+
{
48+
toolkit: normalized,
49+
returnedItems: list.items.map((item) => ({
50+
id: item.id,
51+
slug: item.toolkit.slug,
52+
status: item.status,
53+
isComposioManaged: item.isComposioManaged,
54+
})),
55+
totalPages: list.totalPages,
56+
},
57+
"[Composio] resolveAuthConfigId: no ENABLED config matched",
58+
);
59+
} else {
60+
logger.info(
61+
{ toolkit: normalized, authConfigId },
62+
"[Composio] resolveAuthConfigId: resolved",
63+
);
64+
}
65+
66+
this.authConfigCache.set(normalized, {
67+
authConfigId,
68+
expiresAt: now + AUTH_CONFIG_CACHE_TTL_MS,
69+
});
70+
return authConfigId;
71+
}
72+
73+
async initiate(args: {
74+
userId: string;
75+
authConfigId: string;
76+
callbackUrl?: string;
77+
}) {
78+
return this.composio.connectedAccounts.initiate(
79+
args.userId,
80+
args.authConfigId,
81+
{
82+
callbackUrl: args.callbackUrl ?? COMPOSIO_CONNECTION_CALLBACK_URL,
83+
},
84+
);
85+
}
86+
87+
/**
88+
* Fetch a connected account only if it belongs to the given userId.
89+
* Returns null if it doesn't exist or isn't owned by the caller.
90+
*
91+
* Composio's retrieve endpoint no longer returns userId on the response,
92+
* so ownership is verified by listing the caller's accounts.
93+
*/
94+
async getIfOwned(args: { connectionId: string; userId: string }) {
95+
const list = await this.composio.connectedAccounts.list({
96+
userIds: [args.userId],
97+
});
98+
const items: ConnectedAccountListResponseItem[] = list.items;
99+
return items.find((item) => item.id === args.connectionId) ?? null;
100+
}
101+
102+
async listForUser(userId: string) {
103+
return this.composio.connectedAccounts.list({ userIds: [userId] });
104+
}
105+
106+
async delete(connectionId: string) {
107+
return this.composio.connectedAccounts.delete(connectionId);
108+
}
109+
}
110+
111+
let cached: ComposioService | null = null;
112+
let initialized = false;
113+
114+
export function createComposioService(): ComposioService | null {
115+
if (initialized) {
116+
return cached;
117+
}
118+
initialized = true;
119+
120+
if (!COMPOSIO_API_KEY) {
121+
logger.warn("[Composio] COMPOSIO_API_KEY not set, connections disabled");
122+
return null;
123+
}
124+
125+
cached = new ComposioService({
126+
composio: new Composio({
127+
apiKey: COMPOSIO_API_KEY,
128+
allowTracking: false,
129+
}),
130+
});
131+
logger.info("[Composio] Initialised service");
132+
return cached;
133+
}
134+
135+
// Exposed for tests — resets the singleton.
136+
export function __resetComposioServiceForTests(
137+
override: ComposioService | null = null,
138+
) {
139+
cached = override;
140+
initialized = override !== null;
141+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { Router } from "express";
2+
import { completeHandler } from "./handlers/complete";
3+
import { deleteHandler } from "./handlers/delete";
4+
import { initiateHandler } from "./handlers/initiate";
5+
import { listHandler } from "./handlers/list";
6+
7+
export const connectionsRouter = Router();
8+
9+
connectionsRouter.post("/initiate", initiateHandler);
10+
connectionsRouter.post("/complete", completeHandler);
11+
connectionsRouter.get("/", listHandler);
12+
connectionsRouter.delete("/:id", deleteHandler);
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import type { Request, Response } from "express";
2+
import { z } from "zod";
3+
import { createComposioService } from "../composio.service";
4+
import { mapComposioToResponse } from "../types";
5+
6+
const bodySchema = z.object({
7+
connectionRequestId: z.string().min(1).max(256),
8+
});
9+
10+
export async function completeHandler(req: Request, res: Response) {
11+
const deviceId = res.locals.deviceId;
12+
if (!deviceId) {
13+
res.status(401).json({ error: "Unauthorized" });
14+
return;
15+
}
16+
17+
const parsed = bodySchema.safeParse(req.body);
18+
if (!parsed.success) {
19+
req.log.warn(
20+
{
21+
deviceId,
22+
issues: parsed.error.issues,
23+
receivedBody: req.body as unknown,
24+
contentType: req.header("content-type"),
25+
},
26+
"[Composio] complete body validation failed",
27+
);
28+
res.status(400).json({
29+
error: "Invalid request body",
30+
details: parsed.error.issues,
31+
});
32+
return;
33+
}
34+
35+
const service = createComposioService();
36+
if (!service) {
37+
res.status(503).json({ error: "Connections not configured" });
38+
return;
39+
}
40+
41+
try {
42+
const owned = await service.getIfOwned({
43+
connectionId: parsed.data.connectionRequestId,
44+
userId: deviceId,
45+
});
46+
if (!owned) {
47+
res.status(403).json({ error: "Connection not owned by this device" });
48+
return;
49+
}
50+
res.status(200).json(mapComposioToResponse(owned, deviceId));
51+
return;
52+
} catch (error) {
53+
req.log.error({ error, deviceId }, "[Composio] complete failed");
54+
res.status(502).json({ error: "Failed to complete connection" });
55+
return;
56+
}
57+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import type { Request, Response } from "express";
2+
import { createComposioService } from "../composio.service";
3+
4+
export async function deleteHandler(req: Request, res: Response) {
5+
const deviceId = res.locals.deviceId;
6+
if (!deviceId) {
7+
res.status(401).json({ error: "Unauthorized" });
8+
return;
9+
}
10+
11+
const connectionId = req.params.id;
12+
if (!connectionId) {
13+
res.status(400).json({ error: "Missing connection id" });
14+
return;
15+
}
16+
17+
const service = createComposioService();
18+
if (!service) {
19+
res.status(503).json({ error: "Connections not configured" });
20+
return;
21+
}
22+
23+
try {
24+
const owned = await service.getIfOwned({
25+
connectionId,
26+
userId: deviceId,
27+
});
28+
if (!owned) {
29+
res.status(403).json({ error: "Connection not owned by this device" });
30+
return;
31+
}
32+
await service.delete(connectionId);
33+
res.status(204).send();
34+
return;
35+
} catch (error) {
36+
req.log.error(
37+
{ error, deviceId, connectionId },
38+
"[Composio] delete failed",
39+
);
40+
res.status(502).json({ error: "Failed to delete connection" });
41+
return;
42+
}
43+
}

0 commit comments

Comments
 (0)