Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-carpets-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@voltagent/core": patch
---

Fix workflow state persistence for userId/conversationId and make resume robust when input is missing by falling back to workflow-start event input.
38 changes: 38 additions & 0 deletions packages/core/src/workflow/core.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,44 @@ describe.sequential("workflow.run", () => {
plan: "pro",
});
});

it("should persist userId and conversationId in workflow state", async () => {
const memory = new Memory({ storage: new InMemoryStorageAdapter() });

const workflow = createWorkflow(
{
id: "workflow-user-context",
name: "Workflow User Context",
input: z.object({
value: z.string(),
}),
result: z.object({
value: z.string(),
}),
memory,
},
andThen({
id: "echo",
execute: async ({ data }) => data,
}),
);

const registry = WorkflowRegistry.getInstance();
registry.registerWorkflow(workflow);

const result = await workflow.run(
{ value: "ok" },
{
userId: "user-test-1",
conversationId: "conv-test-1",
},
);

const persistedState = await memory.getWorkflowState(result.executionId);

expect(persistedState?.userId).toBe("user-test-1");
expect(persistedState?.conversationId).toBe("conv-test-1");
});
});

describe.sequential("workflow streaming", () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/workflow/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,8 @@ export function createWorkflow<
input,
context: options?.context ? Array.from(options.context.entries()) : undefined,
workflowState: workflowStateStore,
userId: options?.userId,
conversationId: options?.conversationId,
metadata: {
traceId: rootSpan.spanContext().traceId,
spanId: rootSpan.spanContext().spanId,
Expand Down
12 changes: 8 additions & 4 deletions packages/core/src/workflow/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ export class WorkflowRegistry extends SimpleEventEmitter {
// Run the workflow with resume options
const resumeOptions: any = {
executionId,
userId: workflowState.metadata?.userId,
conversationId: workflowState.metadata?.conversationId,
userId: workflowState.userId ?? workflowState.metadata?.userId,
conversationId: workflowState.conversationId ?? workflowState.metadata?.conversationId,
suspendController: suspendController,
resumeFrom: {
executionId,
Expand Down Expand Up @@ -219,8 +219,12 @@ export class WorkflowRegistry extends SimpleEventEmitter {
this.logger.debug(`Resuming workflow from step ${resumeOptions.resumeFrom.resumeStepIndex}`);

try {
// Always use original workflow input - resumeData is passed through resumeOptions
const inputToUse = workflowState.input;
// Prefer persisted workflow input; fall back to the workflow-start event payload.
// This keeps resume compatible with adapters that don't store input in a dedicated column.
const workflowStartEventInput = workflowState.events?.find(
(event) => event.type === "workflow-start",
)?.input;
const inputToUse = workflowState.input ?? workflowStartEventInput;

// Add resumeData to resumeOptions if provided
if (resumeData !== undefined) {
Expand Down
207 changes: 207 additions & 0 deletions packages/core/src/workflow/suspend-resume.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -849,4 +849,211 @@ describe.sequential("workflow suspend/resume functionality", () => {
expect(resumed.status).toBe("completed");
expect(resumed.result).toBe("resumed successfully");
});

it("should resume using workflow-start event input when persisted input is missing", async () => {
const { memory } = createTestStores();
let firstRun = true;

const workflow = createWorkflow(
{
id: "test-resume-input-from-events",
name: "Test Resume Input From Events",
result: z.object({ value: z.string() }),
memory,
},
andThen({
id: "step",
name: "Step",
execute: async ({ data, suspend }) => {
if (firstRun) {
firstRun = false;
await suspend("Pause for event-input fallback test");
}
return { value: data as string };
},
}),
);

registry.registerWorkflow(workflow);

const controller = createSuspendController();
const suspended = await workflow.run("event-fallback-input", {
suspendController: controller,
});

expect(suspended.status).toBe("suspended");

const persistedState = await memory.getWorkflowState(suspended.executionId);
expect(persistedState?.events?.some((event) => event.type === "workflow-start")).toBe(true);

await memory.updateWorkflowState(suspended.executionId, {
input: undefined,
});

const resumed = await registry.resumeSuspendedWorkflow(workflow.id, suspended.executionId);

expect(resumed?.status).toBe("completed");
expect(resumed?.result).toEqual({ value: "event-fallback-input" });
});

it("should prefer top-level resume user context and fall back to metadata when missing", async () => {
const { memory } = createTestStores();

let firstRunA = true;
const workflowA = createWorkflow(
{
id: "test-resume-user-context-precedence",
name: "Test Resume User Context Precedence",
result: z.object({ userId: z.string(), conversationId: z.string() }),
memory,
},
andThen({
id: "step",
execute: async ({ state, suspend }) => {
if (firstRunA) {
firstRunA = false;
await suspend("Pause for precedence test");
}
return {
userId: state.userId as string,
conversationId: state.conversationId as string,
};
},
}),
);

registry.registerWorkflow(workflowA);

const suspendedA = await workflowA.run("test", {
userId: "original-user",
conversationId: "original-conversation",
suspendController: createSuspendController(),
});

expect(suspendedA.status).toBe("suspended");

await memory.updateWorkflowState(suspendedA.executionId, {
userId: "top-user",
conversationId: "top-conversation",
metadata: {
userId: "meta-user",
conversationId: "meta-conversation",
},
});

const resumedA = await registry.resumeSuspendedWorkflow(workflowA.id, suspendedA.executionId);
expect(resumedA?.status).toBe("completed");
expect(resumedA?.result).toEqual({
userId: "top-user",
conversationId: "top-conversation",
});

let firstRunB = true;
const workflowB = createWorkflow(
{
id: "test-resume-user-context-metadata-fallback",
name: "Test Resume User Context Metadata Fallback",
result: z.object({ userId: z.string(), conversationId: z.string() }),
memory,
},
andThen({
id: "step",
execute: async ({ state, suspend }) => {
if (firstRunB) {
firstRunB = false;
await suspend("Pause for metadata fallback test");
}
return {
userId: state.userId as string,
conversationId: state.conversationId as string,
};
},
}),
);

registry.registerWorkflow(workflowB);

const suspendedB = await workflowB.run("test", {
userId: "original-user-b",
conversationId: "original-conversation-b",
suspendController: createSuspendController(),
});

expect(suspendedB.status).toBe("suspended");

await memory.updateWorkflowState(suspendedB.executionId, {
userId: undefined,
conversationId: undefined,
metadata: {
userId: "meta-user-b",
conversationId: "meta-conversation-b",
},
});

const resumedB = await registry.resumeSuspendedWorkflow(workflowB.id, suspendedB.executionId);
expect(resumedB?.status).toBe("completed");
expect(resumedB?.result).toEqual({
userId: "meta-user-b",
conversationId: "meta-conversation-b",
});
});

it("should fail resume when both persisted input and workflow-start event input are missing", async () => {
const { memory } = createTestStores();
let firstRun = true;

const workflow = createWorkflow(
{
id: "test-resume-missing-input-sources",
name: "Test Resume Missing Input Sources",
result: z.object({ value: z.string() }),
memory,
},
andThen({
id: "step",
name: "Step",
execute: async ({ data, suspend }) => {
if (firstRun) {
firstRun = false;
await suspend("Pause before removing input sources");
}

if (typeof data !== "string") {
throw new Error("Missing resume input");
}

return { value: data };
},
}),
);

registry.registerWorkflow(workflow);

const suspended = await workflow.run("missing-input", {
suspendController: createSuspendController(),
});

expect(suspended.status).toBe("suspended");

const persistedState = await memory.getWorkflowState(suspended.executionId);
expect(persistedState?.suspension).toBeDefined();

await memory.updateWorkflowState(suspended.executionId, {
input: undefined,
events: [],
suspension: persistedState?.suspension
? {
...persistedState.suspension,
checkpoint: undefined,
}
: undefined,
});

const resumed = await registry.resumeSuspendedWorkflow(workflow.id, suspended.executionId);
expect(resumed?.status).toBe("error");
expect(resumed?.error).toBeInstanceOf(Error);
if (resumed?.error instanceof Error) {
expect(resumed.error.message).toBe("Missing resume input");
}
});
});