From cf37af38d97c8c3516f2d6aa9885a75d8789bc8c Mon Sep 17 00:00:00 2001 From: Sam Sussman Date: Tue, 12 Sep 2023 00:39:40 -0500 Subject: [PATCH] feat: add task event input (#444) --- packages/@eventual/cli/src/display/event.ts | 3 +- .../call-executors-and-factories/task-call.ts | 1 + .../@eventual/core-runtime/test/call-util.ts | 3 +- .../test/workflow-executor.test.ts | 192 +++++++++--------- .../core/src/internal/workflow-events.ts | 1 + 5 files changed, 102 insertions(+), 98 deletions(-) diff --git a/packages/@eventual/cli/src/display/event.ts b/packages/@eventual/cli/src/display/event.ts index 169d3ac9..d7f1f211 100644 --- a/packages/@eventual/cli/src/display/event.ts +++ b/packages/@eventual/cli/src/display/event.ts @@ -44,7 +44,8 @@ export function displayEvent(event: WorkflowEvent) { : []), ...("signalId" in event ? [`Signal Id: ${event.signalId}`] : []), ...((isChildWorkflowScheduled(event.event) || - isTransactionRequest(event.event)) && + isTransactionRequest(event.event) || + isTaskScheduled(event.event)) && event.event.input ? [`Payload: ${JSON.stringify(event.event.input)}`] : []), diff --git a/packages/@eventual/core-runtime/src/workflow/call-executors-and-factories/task-call.ts b/packages/@eventual/core-runtime/src/workflow/call-executors-and-factories/task-call.ts index b1440614..366d5d4d 100644 --- a/packages/@eventual/core-runtime/src/workflow/call-executors-and-factories/task-call.ts +++ b/packages/@eventual/core-runtime/src/workflow/call-executors-and-factories/task-call.ts @@ -67,6 +67,7 @@ export class TaskCallEventualFactory implements EventualFactory { return { name: call.name, seq, + input: call.input, type: WorkflowCallHistoryType.TaskScheduled, }; }, diff --git a/packages/@eventual/core-runtime/test/call-util.ts b/packages/@eventual/core-runtime/test/call-util.ts index 5a3b2b17..2d56774f 100644 --- a/packages/@eventual/core-runtime/test/call-util.ts +++ b/packages/@eventual/core-runtime/test/call-util.ts @@ -154,11 +154,12 @@ export function workflowFailed(error: any, seq: number): ChildWorkflowFailed { }; } -export function taskScheduled(name: string, seq: number) { +export function taskScheduled(name: string, seq: number, input?: any) { return callEvent({ type: WorkflowCallHistoryType.TaskScheduled, name, seq, + input, }); } diff --git a/packages/@eventual/core-runtime/test/workflow-executor.test.ts b/packages/@eventual/core-runtime/test/workflow-executor.test.ts index 0fc0bd9b..642569ae 100644 --- a/packages/@eventual/core-runtime/test/workflow-executor.test.ts +++ b/packages/@eventual/core-runtime/test/workflow-executor.test.ts @@ -130,7 +130,7 @@ test("should continue with result of completed Task", async () => { await expect( execute( myWorkflow, - [taskScheduled(myTask.name, 0), taskSucceeded("result", 0)], + [taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0)], eventName ) ).resolves.toMatchObject({ @@ -146,7 +146,7 @@ test("should fail on workflow timeout event", async () => { await expect( execute( myWorkflow, - [taskScheduled(myTask.name, 0), workflowTimedOut()], + [taskScheduled(myTask.name, 0, eventName), workflowTimedOut()], eventName ) ).resolves.toMatchObject({ @@ -160,7 +160,7 @@ test("should not continue on workflow timeout event", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), workflowTimedOut(), taskSucceeded("result", 0), ], @@ -176,7 +176,7 @@ test("should catch error of failed Task", async () => { await expect( execute( myWorkflow, - [taskScheduled(myTask.name, 0), taskFailed("error", 0)], + [taskScheduled(myTask.name, 0, eventName), taskFailed("error", 0)], eventName ) ).resolves.toMatchObject({ @@ -202,7 +202,7 @@ test("should catch error of timing out Task", async () => { [ timerScheduled(0, time(testTime)), timerCompleted(0), - taskScheduled(myTask.name, 1), + taskScheduled(myTask.name, 1, eventName), ], eventName ) @@ -218,7 +218,7 @@ test("immediately abort task on invalid timeout", async () => { }); await expect( - execute(myWorkflow, [taskScheduled(myTask.name, 0)], eventName) + execute(myWorkflow, [taskScheduled(myTask.name, 0, eventName)], eventName) ).resolves.toMatchObject({ result: Result.failed(new Timeout("Task Timed Out")), }); @@ -238,8 +238,8 @@ test("timeout multiple tasks at once", async () => { myWorkflow, [ timerScheduled(0, time(testTime)), - taskScheduled(myTask.name, 1), - taskScheduled(myTask.name, 2), + taskScheduled(myTask.name, 1, eventName), + taskScheduled(myTask.name, 2, eventName), timerCompleted(0), ], eventName @@ -261,7 +261,7 @@ test("timeout multiple tasks at once", async () => { test("task times out task", async () => { const myWorkflow = workflow(async (event) => { - const z = myTask("my-task", event); + const z = myTask(event); const a = myTask(event, { timeout: z }); const b = myTask(event, { timeout: a }); @@ -272,9 +272,9 @@ test("task times out task", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), - taskScheduled(myTask.name, 1), - taskScheduled(myTask.name, 2), + taskScheduled(myTask.name, 0, eventName), + taskScheduled(myTask.name, 1, eventName), + taskScheduled(myTask.name, 2, eventName), taskSucceeded("woo", 0), ], eventName @@ -303,11 +303,11 @@ test("should return final result", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, eventName), timerScheduled(2, time(testTime)), - taskScheduled(myTask2.name, 3), + taskScheduled(myTask2.name, 3, eventName), taskSucceeded("result-0", 1), timerCompleted(2), taskSucceeded("result-2", 3), @@ -338,9 +338,9 @@ test("should handle partial blocks", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, eventName), ], eventName ) @@ -357,9 +357,9 @@ test("should handle partial blocks with partial completes", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, eventName), taskSucceeded("result", 1), ], eventName @@ -466,7 +466,7 @@ test("should throw when a completed precedes workflow state", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), taskScheduled("result", 1), // the workflow does not return a seq: 2, where does this go? // note: a completed event can be accepted without a "scheduled" counterpart, @@ -571,7 +571,7 @@ test.skip("dangling promise failure", async () => { const executor = new WorkflowExecutor( wf, - [taskScheduled(myTask.name, 0)], + [taskScheduled(myTask.name, 0, eventName)], testPropertyRetriever ); @@ -598,7 +598,7 @@ test.skip("dangling promise failure with then", async () => { const executor = new WorkflowExecutor( wf, - [taskScheduled(myTask.name, 0)], + [taskScheduled(myTask.name, 0, eventName)], testPropertyRetriever ); @@ -623,7 +623,7 @@ test("dangling promise success", async () => { const executor = new WorkflowExecutor( wf, - [taskScheduled(myTask.name, 0)], + [taskScheduled(myTask.name, 0, undefined)], testPropertyRetriever ); @@ -654,11 +654,11 @@ test("should wait if partial results", async () => { execute( myWorkflow, [ - taskScheduled(myTask.name, 0), + taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, eventName), timerScheduled(2), - taskScheduled(myTask2.name, 3), + taskScheduled(myTask2.name, 3, eventName), taskSucceeded("result-0", 1), timerCompleted(2), ], @@ -1074,8 +1074,8 @@ describe("AwaitAll", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), taskSucceeded("B", 1), ], @@ -1150,8 +1150,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), taskSucceeded("B", 1), ], @@ -1165,8 +1165,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), ], ["a", "b"] @@ -1179,8 +1179,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("B", 1), ], ["a", "b"] @@ -1203,8 +1203,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), taskSucceeded("B", 1), ], @@ -1218,8 +1218,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), taskFailed("B", 1), ], @@ -1233,8 +1233,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("B", 1), taskSucceeded("A", 0), ], @@ -1258,8 +1258,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), ], ["a", "b"] @@ -1272,8 +1272,8 @@ describe("AwaitAny", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), taskFailed("B", 1), ], @@ -1328,8 +1328,8 @@ describe("Race", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), taskSucceeded("B", 1), ], @@ -1343,8 +1343,8 @@ describe("Race", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), ], ["a", "b"] @@ -1357,8 +1357,8 @@ describe("Race", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("B", 1), ], ["a", "b"] @@ -1381,8 +1381,8 @@ describe("Race", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), taskSucceeded("B", 1), ], @@ -1396,8 +1396,8 @@ describe("Race", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("B", 1), ], ["a", "b"] @@ -1451,8 +1451,8 @@ describe("AwaitAllSettled", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskSucceeded("A", 0), taskSucceeded("B", 1), ], @@ -1470,8 +1470,8 @@ describe("AwaitAllSettled", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), taskFailed("B", 1), ], @@ -1489,8 +1489,8 @@ describe("AwaitAllSettled", () => { execute( wf, [ - taskScheduled(processItemTask.name, 0), - taskScheduled(processItemTask.name, 1), + taskScheduled(processItemTask.name, 0, "a"), + taskScheduled(processItemTask.name, 1, "b"), taskFailed("A", 0), taskSucceeded("B", 1), ], @@ -1585,8 +1585,8 @@ test("throw error within nested function", async () => { execute( wf, [ - taskScheduled("inside", 0), - taskScheduled("inside", 1), + taskScheduled("inside", 0, "good"), + taskScheduled("inside", 1, "bad"), taskSucceeded("good", 0), taskSucceeded("bad", 1), ], @@ -1599,8 +1599,8 @@ test("throw error within nested function", async () => { execute( wf, [ - taskScheduled("inside", 0), - taskScheduled("inside", 1), + taskScheduled("inside", 0, "good"), + taskScheduled("inside", 1, "bad"), taskSucceeded("good", 0), taskSucceeded("bad", 1), taskScheduled("catch", 2), @@ -1615,8 +1615,8 @@ test("throw error within nested function", async () => { execute( wf, [ - taskScheduled("inside", 0), - taskScheduled("inside", 1), + taskScheduled("inside", 0, "good"), + taskScheduled("inside", 1, "bad"), taskSucceeded("good", 0), taskSucceeded("bad", 1), taskScheduled("catch", 2), @@ -1657,8 +1657,8 @@ test("properly evaluate await of sub-programs", async () => { execute( wf, [ - taskScheduled(myTask0.name, 0), - taskScheduled(myTask2.name, 1), + taskScheduled(myTask0.name, 0, undefined), + taskScheduled(myTask2.name, 1, undefined), taskSucceeded("a", 0), taskSucceeded("b", 1), ], @@ -1691,8 +1691,8 @@ test("properly evaluate await of Promise.all", async () => { execute( wf, [ - taskScheduled(myTask0.name, 0), - taskScheduled(myTask2.name, 1), + taskScheduled(myTask0.name, 0, undefined), + taskScheduled(myTask2.name, 1, undefined), taskSucceeded("a", 0), taskSucceeded("b", 1), ], @@ -1721,7 +1721,7 @@ test("generator function returns a taskCall", async () => { await expect( execute( wf, - [taskScheduled(myTask.name, 0), taskSucceeded("result", 0)], + [taskScheduled(myTask.name, 0, undefined), taskSucceeded("result", 0)], undefined ) ).resolves.toMatchObject({ @@ -1768,7 +1768,7 @@ test("workflow calling other workflow", async () => { [ workflowScheduled(wf1.name, 0), workflowSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, undefined), ], undefined ) @@ -1782,7 +1782,7 @@ test("workflow calling other workflow", async () => { [ workflowScheduled(wf1.name, 0), workflowSucceeded("result", 0), - taskScheduled(myTask0.name, 1), + taskScheduled(myTask0.name, 1, undefined), taskSucceeded(undefined, 1), ], undefined @@ -2166,7 +2166,7 @@ describe("signals", () => { signalReceived("MyOtherSignal", "hi"), timerScheduled(2, time(testTime)), timerCompleted(2), - taskScheduled(myTask.name, 3), + taskScheduled(myTask.name, 3, "hi"), timerScheduled(6, time(testTime)), timerCompleted(6), ], @@ -2189,7 +2189,7 @@ describe("signals", () => { [ signalReceived("MyOtherSignal", "hi"), timerScheduled(2, time(testTime)), - taskScheduled(myTask.name, 3), + taskScheduled(myTask.name, 3, "hi"), taskSucceeded("task1", 3), timerCompleted(2), timerScheduled(6, time(testTime)), @@ -2215,7 +2215,7 @@ describe("signals", () => { signalReceived("MyOtherSignal", "hi"), timerScheduled(2, time(testTime)), timerCompleted(2), - taskScheduled(myTask.name, 3), + taskScheduled(myTask.name, 3, "hi"), taskSucceeded("task1", 3), timerScheduled(6, time(testTime)), timerCompleted(6), @@ -2653,15 +2653,15 @@ test("mixing closure types", async () => { execute( workflow4, [ - taskScheduled(helloTask.name, 0), - taskScheduled(helloTask.name, 1), - taskScheduled(helloTask.name, 2), - taskScheduled(helloTask.name, 3), - taskScheduled(helloTask.name, 4), - taskScheduled(helloTask.name, 5), - taskScheduled(helloTask.name, 6), - taskScheduled(helloTask.name, 7), - taskScheduled(helloTask.name, 8), + taskScheduled(helloTask.name, 0, ["sam"]), + taskScheduled(helloTask.name, 1, ["chris"]), + taskScheduled(helloTask.name, 2, ["sam"]), + taskScheduled(helloTask.name, 3, ["sam"]), + taskScheduled(helloTask.name, 4, ["chris"]), + taskScheduled(helloTask.name, 5, ["sam"]), + taskScheduled(helloTask.name, 6, ["sam"]), + taskScheduled(helloTask.name, 7, ["chris"]), + taskScheduled(helloTask.name, 8, ["sam"]), ], undefined ) @@ -2673,15 +2673,15 @@ test("mixing closure types", async () => { execute( workflow4, [ - taskScheduled(helloTask.name, 0), - taskScheduled(helloTask.name, 1), - taskScheduled(helloTask.name, 2), - taskScheduled(helloTask.name, 3), - taskScheduled(helloTask.name, 4), - taskScheduled(helloTask.name, 5), - taskScheduled(helloTask.name, 6), - taskScheduled(helloTask.name, 7), - taskScheduled(helloTask.name, 8), + taskScheduled(helloTask.name, 0, ["sam"]), + taskScheduled(helloTask.name, 1, ["chris"]), + taskScheduled(helloTask.name, 2, ["sam"]), + taskScheduled(helloTask.name, 3, ["sam"]), + taskScheduled(helloTask.name, 4, ["chris"]), + taskScheduled(helloTask.name, 5, ["sam"]), + taskScheduled(helloTask.name, 6, ["sam"]), + taskScheduled(helloTask.name, 7, ["chris"]), + taskScheduled(helloTask.name, 8, ["sam"]), taskSucceeded(1, 0), taskSucceeded(2, 1), taskSucceeded(3, 2), @@ -2819,7 +2819,7 @@ describe("continue", () => { test("start a workflow with events and feed it one after", async () => { const executor = new WorkflowExecutor( myWorkflow, - [taskScheduled(myTask.name, 0), taskSucceeded("result", 0)], + [taskScheduled(myTask.name, 0, eventName), taskSucceeded("result", 0)], testPropertyRetriever ); await expect( @@ -2959,7 +2959,7 @@ describe("continue", () => { test("filters duplicate events", async () => { const executor = new WorkflowExecutor( myWorkflow, - [taskScheduled(myTask.name, 0), taskSucceeded("result", 0)], + [taskScheduled(myTask.name, 0, [eventName]), taskSucceeded("result", 0)], testPropertyRetriever ); await executor.start([eventName], context); diff --git a/packages/@eventual/core/src/internal/workflow-events.ts b/packages/@eventual/core/src/internal/workflow-events.ts index 5d136e00..d1622a07 100644 --- a/packages/@eventual/core/src/internal/workflow-events.ts +++ b/packages/@eventual/core/src/internal/workflow-events.ts @@ -220,6 +220,7 @@ export type WorkflowRunStarted = export interface TaskScheduled extends CallEventBase { name: string; + input?: any; } export interface TaskSucceeded