From 5beb64a5d44cced56f89fa7bc30aa46e3af5a2d2 Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Thu, 20 Jun 2024 00:09:11 -0700 Subject: [PATCH 1/4] First implementation with no tests --- packages/workflow/src/interfaces.ts | 15 +++++ packages/workflow/src/internals.ts | 46 +++++++++------- packages/workflow/src/update-scope.ts | 67 +++++++++++++++++++++++ packages/workflow/src/worker-interface.ts | 2 + packages/workflow/src/workflow.ts | 15 +++++ 5 files changed, 126 insertions(+), 19 deletions(-) create mode 100644 packages/workflow/src/update-scope.ts diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index c6c4e63c6..f05d9ee0d 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -190,6 +190,21 @@ export interface UnsafeWorkflowInfo { readonly isReplaying: boolean; } +/** + * Information about a workflow update. + */ +export interface UpdateInfo { + /** + * A workflow-unique identifier for this update. + */ + readonly id: string; + + /** + * The update type name. + */ + readonly name: string; +} + export interface ParentWorkflowInfo { workflowId: string; runId: string; diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 172005c53..0412fa663 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -21,6 +21,7 @@ import { checkExtends } from '@temporalio/common/lib/type-helpers'; import type { coresdk, temporal } from '@temporalio/proto'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; +import { UpdateScope } from './update-scope'; import { DeterminismViolationError, LocalActivityDoBackoff, isCancellation } from './errors'; import { QueryInput, SignalInput, UpdateInput, WorkflowExecuteInput, WorkflowInterceptors } from './interceptors'; import { @@ -624,25 +625,25 @@ export class Activator implements ActivationHandler { // // Note that there is a deliberately unhandled promise rejection below. // These are caught elsewhere and fail the corresponding activation. - let input: UpdateInput; - try { - if (runValidator && this.updateHandlers.get(name)?.validator) { - const validate = composeInterceptors( - this.interceptors.inbound, - 'validateUpdate', - this.validateUpdateNextHandler.bind(this) - ); - validate(makeInput()); + const doUpdateImpl = async () => { + let input: UpdateInput; + try { + if (runValidator && this.updateHandlers.get(name)?.validator) { + const validate = composeInterceptors( + this.interceptors.inbound, + 'validateUpdate', + this.validateUpdateNextHandler.bind(this) + ); + validate(makeInput()); + } + input = makeInput(); + } catch (error) { + this.rejectUpdate(protocolInstanceId, error); + return; } - input = makeInput(); - } catch (error) { - this.rejectUpdate(protocolInstanceId, error); - return; - } - const execute = composeInterceptors(this.interceptors.inbound, 'handleUpdate', this.updateNextHandler.bind(this)); - this.acceptUpdate(protocolInstanceId); - untrackPromise( - execute(input) + const execute = composeInterceptors(this.interceptors.inbound, 'handleUpdate', this.updateNextHandler.bind(this)); + this.acceptUpdate(protocolInstanceId); + const res = execute(input) .then((result) => this.completeUpdate(protocolInstanceId, result)) .catch((error) => { if (error instanceof TemporalFailure) { @@ -650,7 +651,14 @@ export class Activator implements ActivationHandler { } else { throw error; } - }) + }); + untrackPromise(res); + return res; + }; + untrackPromise( + UpdateScope.updateWithInfo(updateId, name, doUpdateImpl).catch((error: any) => { + throw error; + }) ); } diff --git a/packages/workflow/src/update-scope.ts b/packages/workflow/src/update-scope.ts new file mode 100644 index 000000000..de6a75b05 --- /dev/null +++ b/packages/workflow/src/update-scope.ts @@ -0,0 +1,67 @@ +import type { AsyncLocalStorage as ALS } from 'node:async_hooks'; + +/** + * Option for constructing a UpdateScope + */ +export interface UpdateScopeOptions { + /** + * A workflow-unique identifier for this update. + */ + id: string; + + /** + * The update type name. + */ + name: string; +} + +// AsyncLocalStorage is injected via vm module into global scope. +// In case Workflow code is imported in Node.js context, replace with an empty class. +export const AsyncLocalStorage: new () => ALS = (globalThis as any).AsyncLocalStorage ?? class {}; + +export class UpdateScope { + /** + * A workflow-unique identifier for this update. + */ + public readonly id: string; + + /** + * The update type name. + */ + public readonly name: string; + + constructor(options: UpdateScopeOptions) { + this.id = options.id; + this.name = options.name; + } + + /** + * Activate the scope as current and run the update handler `fn`. + * + * @return the result of `fn` + */ + run(fn: () => Promise): Promise { + return storage.run(this, fn); + } + + /** + * Get the current "active" update scope. + */ + static current(): UpdateScope | undefined { + return storage.getStore(); + } + + /** Alias to `new UpdateScope({ id, name }).run(fn)` */ + static updateWithInfo(id: string, name: string, fn: () => Promise): Promise { + return new this({ id, name }).run(fn); + } +} + +const storage = new AsyncLocalStorage(); + +/** + * Disable the async local storage for updates. + */ +export function disableUpdateStorage(): void { + storage.disable(); +} diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index ba02098bf..9e4166df2 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -8,6 +8,7 @@ import { msToTs, tsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { coresdk } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; +import { disableUpdateStorage } from './update-scope'; import { DeterminismViolationError } from './errors'; import { WorkflowInterceptorsFactory } from './interceptors'; import { WorkflowCreateOptionsInternal } from './interfaces'; @@ -295,6 +296,7 @@ export function shouldUnblockConditions(job: coresdk.workflow_activation.IWorkfl export function dispose(): void { const dispose = composeInterceptors(getActivator().interceptors.internals, 'dispose', async () => { disableStorage(); + disableUpdateStorage(); }); dispose({}); } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 1ecb4b323..2229f620f 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -24,6 +24,7 @@ import { Duration, msOptionalToTs, msToNumber, msToTs, tsToMs } from '@temporali import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; +import { UpdateScope } from './update-scope'; import { ActivityInput, LocalActivityInput, @@ -44,6 +45,7 @@ import { SignalHandlerOptions, UpdateHandlerOptions, WorkflowInfo, + UpdateInfo, } from './interfaces'; import { LocalActivityDoBackoff } from './errors'; import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes'; @@ -868,6 +870,19 @@ export function workflowInfo(): WorkflowInfo { return activator.info; } +/** + * Get information about the current update if any. + * + * @return Info for the current update handler the code calling this is executing + * within if any. + * + * @experimental + */ +export function currentUpdateInfo(): UpdateInfo | undefined { + assertInWorkflowContext('Workflow.currentUpdateInfo(...) may only be used from a Workflow Execution.'); + return UpdateScope.current(); +} + /** * Returns whether or not code is executing in workflow context */ From 11904efb2dd1fe27b87fa241419066972a979e87 Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Tue, 25 Jun 2024 13:06:57 +0100 Subject: [PATCH 2/4] Add tests --- packages/test/src/test-integration-update.ts | 71 ++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/packages/test/src/test-integration-update.ts b/packages/test/src/test-integration-update.ts index f7763678c..7748ffb89 100644 --- a/packages/test/src/test-integration-update.ts +++ b/packages/test/src/test-integration-update.ts @@ -461,3 +461,74 @@ test('Interruption of update by server long-poll timeout is invisible to client' t.deepEqual(wfResult, [arg, 'done', '$']); }); }); + +export const currentInfoUpdate = wf.defineUpdate('current-info-update'); + +export async function workflowWithCurrentUpdateInfo(): Promise { + const state: Promise[] = []; + const getUpdateId = async (): Promise => { + await wf.sleep(10); + const info = wf.currentUpdateInfo(); + if (info === undefined) { + throw new Error('No current update info'); + } + return info.id; + }; + const updateHandler = async (): Promise => { + const info = wf.currentUpdateInfo(); + if (info === undefined || info.name !== 'current-info-update') { + throw new Error(`Invalid current update info in updateHandler: info ${info?.name}`); + } + const id = await getUpdateId(); + if (info.id !== id) { + throw new Error(`Update id changed: before ${info.id} after ${id}`); + } + + state.push(getUpdateId()); + // Re-fetch and return + const infoAfter = wf.currentUpdateInfo(); + if (infoAfter === undefined) { + throw new Error('Invalid current update info in updateHandler - after'); + } + return infoAfter.id; + }; + + const validator = (): void => { + const info = wf.currentUpdateInfo(); + if (info === undefined || info.name !== 'current-info-update') { + throw new Error(`Invalid current update info in validator: info ${info?.name}`); + } + }; + + wf.setHandler(currentInfoUpdate, updateHandler, { validator }); + + if (wf.currentUpdateInfo() !== undefined) { + throw new Error('Current update info not undefined outside handler'); + } + + await wf.condition(() => state.length === 5); + + if (wf.currentUpdateInfo() !== undefined) { + throw new Error('Current update info not undefined outside handler - after'); + } + + return await Promise.all(state); +} + +test('currentUpdateInfo returns the update id', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const wfHandle = await startWorkflow(workflowWithCurrentUpdateInfo); + const updateIds = await Promise.all([ + wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update1' }), + wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update2' }), + wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update3' }), + wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update4' }), + wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update5' }), + ]); + t.deepEqual(updateIds, ['update1', 'update2', 'update3', 'update4', 'update5']); + const wfResults = await wfHandle.result(); + t.deepEqual(wfResults.sort(), ['update1', 'update2', 'update3', 'update4', 'update5']); + }); +}); From 447bb20d9f5f66af341cfec293475f1cf8a9bb8b Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Tue, 25 Jun 2024 17:52:43 +0100 Subject: [PATCH 3/4] Remove catch() --- packages/workflow/src/interfaces.ts | 2 ++ packages/workflow/src/internals.ts | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index f05d9ee0d..a60ef93a0 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -192,6 +192,8 @@ export interface UnsafeWorkflowInfo { /** * Information about a workflow update. + * + * @experimental */ export interface UpdateInfo { /** diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 788c4ebee..12d970eec 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -661,9 +661,7 @@ export class Activator implements ActivationHandler { return res; }; untrackPromise( - UpdateScope.updateWithInfo(updateId, name, doUpdateImpl).catch((error: any) => { - throw error; - }) + UpdateScope.updateWithInfo(updateId, name, doUpdateImpl) ); } From e45bfaefe4a2fba3a82fb3e180bf2f41021eec43 Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Tue, 25 Jun 2024 18:00:26 +0100 Subject: [PATCH 4/4] Run prettier --- packages/workflow/src/internals.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 12d970eec..1915ccfb7 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -660,9 +660,7 @@ export class Activator implements ActivationHandler { untrackPromise(res); return res; }; - untrackPromise( - UpdateScope.updateWithInfo(updateId, name, doUpdateImpl) - ); + untrackPromise(UpdateScope.updateWithInfo(updateId, name, doUpdateImpl)); } protected async updateNextHandler({ name, args }: UpdateInput): Promise {