diff --git a/.changeset/smart-carpets-taste.md b/.changeset/smart-carpets-taste.md new file mode 100644 index 000000000..2442fa77f --- /dev/null +++ b/.changeset/smart-carpets-taste.md @@ -0,0 +1,5 @@ +--- +"@voltagent/core": patch +--- + +Fix workflow state persistence for userId/conversationId and make resume robust when input is missing by falling back to workflow-start event input. diff --git a/packages/core/src/workflow/core.spec.ts b/packages/core/src/workflow/core.spec.ts index 7579c8610..2ede8be79 100644 --- a/packages/core/src/workflow/core.spec.ts +++ b/packages/core/src/workflow/core.spec.ts @@ -133,6 +133,44 @@ describe.sequential("workflow.run", () => { plan: "pro", }); }); + + it("should persist userId and conversationId in workflow state", async () => { + const memory = new Memory({ storage: new InMemoryStorageAdapter() }); + + const workflow = createWorkflow( + { + id: "workflow-user-context", + name: "Workflow User Context", + input: z.object({ + value: z.string(), + }), + result: z.object({ + value: z.string(), + }), + memory, + }, + andThen({ + id: "echo", + execute: async ({ data }) => data, + }), + ); + + const registry = WorkflowRegistry.getInstance(); + registry.registerWorkflow(workflow); + + const result = await workflow.run( + { value: "ok" }, + { + userId: "user-test-1", + conversationId: "conv-test-1", + }, + ); + + const persistedState = await memory.getWorkflowState(result.executionId); + + expect(persistedState?.userId).toBe("user-test-1"); + expect(persistedState?.conversationId).toBe("conv-test-1"); + }); }); describe.sequential("workflow streaming", () => { diff --git a/packages/core/src/workflow/core.ts b/packages/core/src/workflow/core.ts index 01a0ec9ef..572b00bb5 100644 --- a/packages/core/src/workflow/core.ts +++ b/packages/core/src/workflow/core.ts @@ -925,6 +925,8 @@ export function createWorkflow< input, context: options?.context ? Array.from(options.context.entries()) : undefined, workflowState: workflowStateStore, + userId: options?.userId, + conversationId: options?.conversationId, metadata: { traceId: rootSpan.spanContext().traceId, spanId: rootSpan.spanContext().spanId, diff --git a/packages/core/src/workflow/registry.ts b/packages/core/src/workflow/registry.ts index ffadb0644..4569ff57b 100644 --- a/packages/core/src/workflow/registry.ts +++ b/packages/core/src/workflow/registry.ts @@ -189,8 +189,8 @@ export class WorkflowRegistry extends SimpleEventEmitter { // Run the workflow with resume options const resumeOptions: any = { executionId, - userId: workflowState.metadata?.userId, - conversationId: workflowState.metadata?.conversationId, + userId: workflowState.userId ?? workflowState.metadata?.userId, + conversationId: workflowState.conversationId ?? workflowState.metadata?.conversationId, suspendController: suspendController, resumeFrom: { executionId, @@ -219,8 +219,12 @@ export class WorkflowRegistry extends SimpleEventEmitter { this.logger.debug(`Resuming workflow from step ${resumeOptions.resumeFrom.resumeStepIndex}`); try { - // Always use original workflow input - resumeData is passed through resumeOptions - const inputToUse = workflowState.input; + // Prefer persisted workflow input; fall back to the workflow-start event payload. + // This keeps resume compatible with adapters that don't store input in a dedicated column. + const workflowStartEventInput = workflowState.events?.find( + (event) => event.type === "workflow-start", + )?.input; + const inputToUse = workflowState.input ?? workflowStartEventInput; // Add resumeData to resumeOptions if provided if (resumeData !== undefined) { diff --git a/packages/core/src/workflow/suspend-resume.spec.ts b/packages/core/src/workflow/suspend-resume.spec.ts index efe3b3c1b..8a79251e0 100644 --- a/packages/core/src/workflow/suspend-resume.spec.ts +++ b/packages/core/src/workflow/suspend-resume.spec.ts @@ -849,4 +849,211 @@ describe.sequential("workflow suspend/resume functionality", () => { expect(resumed.status).toBe("completed"); expect(resumed.result).toBe("resumed successfully"); }); + + it("should resume using workflow-start event input when persisted input is missing", async () => { + const { memory } = createTestStores(); + let firstRun = true; + + const workflow = createWorkflow( + { + id: "test-resume-input-from-events", + name: "Test Resume Input From Events", + result: z.object({ value: z.string() }), + memory, + }, + andThen({ + id: "step", + name: "Step", + execute: async ({ data, suspend }) => { + if (firstRun) { + firstRun = false; + await suspend("Pause for event-input fallback test"); + } + return { value: data as string }; + }, + }), + ); + + registry.registerWorkflow(workflow); + + const controller = createSuspendController(); + const suspended = await workflow.run("event-fallback-input", { + suspendController: controller, + }); + + expect(suspended.status).toBe("suspended"); + + const persistedState = await memory.getWorkflowState(suspended.executionId); + expect(persistedState?.events?.some((event) => event.type === "workflow-start")).toBe(true); + + await memory.updateWorkflowState(suspended.executionId, { + input: undefined, + }); + + const resumed = await registry.resumeSuspendedWorkflow(workflow.id, suspended.executionId); + + expect(resumed?.status).toBe("completed"); + expect(resumed?.result).toEqual({ value: "event-fallback-input" }); + }); + + it("should prefer top-level resume user context and fall back to metadata when missing", async () => { + const { memory } = createTestStores(); + + let firstRunA = true; + const workflowA = createWorkflow( + { + id: "test-resume-user-context-precedence", + name: "Test Resume User Context Precedence", + result: z.object({ userId: z.string(), conversationId: z.string() }), + memory, + }, + andThen({ + id: "step", + execute: async ({ state, suspend }) => { + if (firstRunA) { + firstRunA = false; + await suspend("Pause for precedence test"); + } + return { + userId: state.userId as string, + conversationId: state.conversationId as string, + }; + }, + }), + ); + + registry.registerWorkflow(workflowA); + + const suspendedA = await workflowA.run("test", { + userId: "original-user", + conversationId: "original-conversation", + suspendController: createSuspendController(), + }); + + expect(suspendedA.status).toBe("suspended"); + + await memory.updateWorkflowState(suspendedA.executionId, { + userId: "top-user", + conversationId: "top-conversation", + metadata: { + userId: "meta-user", + conversationId: "meta-conversation", + }, + }); + + const resumedA = await registry.resumeSuspendedWorkflow(workflowA.id, suspendedA.executionId); + expect(resumedA?.status).toBe("completed"); + expect(resumedA?.result).toEqual({ + userId: "top-user", + conversationId: "top-conversation", + }); + + let firstRunB = true; + const workflowB = createWorkflow( + { + id: "test-resume-user-context-metadata-fallback", + name: "Test Resume User Context Metadata Fallback", + result: z.object({ userId: z.string(), conversationId: z.string() }), + memory, + }, + andThen({ + id: "step", + execute: async ({ state, suspend }) => { + if (firstRunB) { + firstRunB = false; + await suspend("Pause for metadata fallback test"); + } + return { + userId: state.userId as string, + conversationId: state.conversationId as string, + }; + }, + }), + ); + + registry.registerWorkflow(workflowB); + + const suspendedB = await workflowB.run("test", { + userId: "original-user-b", + conversationId: "original-conversation-b", + suspendController: createSuspendController(), + }); + + expect(suspendedB.status).toBe("suspended"); + + await memory.updateWorkflowState(suspendedB.executionId, { + userId: undefined, + conversationId: undefined, + metadata: { + userId: "meta-user-b", + conversationId: "meta-conversation-b", + }, + }); + + const resumedB = await registry.resumeSuspendedWorkflow(workflowB.id, suspendedB.executionId); + expect(resumedB?.status).toBe("completed"); + expect(resumedB?.result).toEqual({ + userId: "meta-user-b", + conversationId: "meta-conversation-b", + }); + }); + + it("should fail resume when both persisted input and workflow-start event input are missing", async () => { + const { memory } = createTestStores(); + let firstRun = true; + + const workflow = createWorkflow( + { + id: "test-resume-missing-input-sources", + name: "Test Resume Missing Input Sources", + result: z.object({ value: z.string() }), + memory, + }, + andThen({ + id: "step", + name: "Step", + execute: async ({ data, suspend }) => { + if (firstRun) { + firstRun = false; + await suspend("Pause before removing input sources"); + } + + if (typeof data !== "string") { + throw new Error("Missing resume input"); + } + + return { value: data }; + }, + }), + ); + + registry.registerWorkflow(workflow); + + const suspended = await workflow.run("missing-input", { + suspendController: createSuspendController(), + }); + + expect(suspended.status).toBe("suspended"); + + const persistedState = await memory.getWorkflowState(suspended.executionId); + expect(persistedState?.suspension).toBeDefined(); + + await memory.updateWorkflowState(suspended.executionId, { + input: undefined, + events: [], + suspension: persistedState?.suspension + ? { + ...persistedState.suspension, + checkpoint: undefined, + } + : undefined, + }); + + const resumed = await registry.resumeSuspendedWorkflow(workflow.id, suspended.executionId); + expect(resumed?.status).toBe("error"); + expect(resumed?.error).toBeInstanceOf(Error); + if (resumed?.error instanceof Error) { + expect(resumed.error.message).toBe("Missing resume input"); + } + }); });