Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): Expose updateId to update handlers #1450

Merged
merged 6 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions packages/test/src/test-integration-update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,77 @@ test('Interruption of update by server long-poll timeout is invisible to client'
});
});

export const currentInfoUpdate = wf.defineUpdate<string, []>('current-info-update');

export async function workflowWithCurrentUpdateInfo(): Promise<string[]> {
const state: Promise<string>[] = [];
const getUpdateId = async (): Promise<string> => {
await wf.sleep(10);
const info = wf.currentUpdateInfo();
if (info === undefined) {
throw new Error('No current update info');
}
return info.id;
};
const updateHandler = async (): Promise<string> => {
const info = wf.currentUpdateInfo();
if (info === undefined || info.name !== 'current-info-update') {
throw new Error(`Invalid current update info in updateHandler: info ${info?.name}`);
}
const id = await getUpdateId();
if (info.id !== id) {
throw new Error(`Update id changed: before ${info.id} after ${id}`);
}

state.push(getUpdateId());
// Re-fetch and return
const infoAfter = wf.currentUpdateInfo();
if (infoAfter === undefined) {
throw new Error('Invalid current update info in updateHandler - after');
}
return infoAfter.id;
};

const validator = (): void => {
const info = wf.currentUpdateInfo();
if (info === undefined || info.name !== 'current-info-update') {
throw new Error(`Invalid current update info in validator: info ${info?.name}`);
}
};

wf.setHandler(currentInfoUpdate, updateHandler, { validator });

if (wf.currentUpdateInfo() !== undefined) {
throw new Error('Current update info not undefined outside handler');
}

await wf.condition(() => state.length === 5);

if (wf.currentUpdateInfo() !== undefined) {
throw new Error('Current update info not undefined outside handler - after');
}

return await Promise.all(state);
}

test('currentUpdateInfo returns the update id', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithCurrentUpdateInfo);
const updateIds = await Promise.all([
wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update1' }),
wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update2' }),
wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update3' }),
wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update4' }),
wfHandle.executeUpdate(currentInfoUpdate, { updateId: 'update5' }),
]);
t.deepEqual(updateIds, ['update1', 'update2', 'update3', 'update4', 'update5']);
const wfResults = await wfHandle.result();
t.deepEqual(wfResults.sort(), ['update1', 'update2', 'update3', 'update4', 'update5']);
});
});

test('startUpdate throws WorkflowUpdateRPCTimeoutOrCancelledError with no worker', async (t) => {
const { startWorkflow } = helpers(t);
const wfHandle = await startWorkflow(workflowWithUpdates);
Expand Down
17 changes: 17 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ export interface UnsafeWorkflowInfo {
readonly isReplaying: boolean;
}

/**
* Information about a workflow update.
antlai-temporal marked this conversation as resolved.
Show resolved Hide resolved
*
* @experimental
*/
export interface UpdateInfo {
/**
* A workflow-unique identifier for this update.
*/
readonly id: string;

/**
* The update type name.
*/
readonly name: string;
}

export interface ParentWorkflowInfo {
workflowId: string;
runId: string;
Expand Down
44 changes: 24 additions & 20 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { checkExtends } from '@temporalio/common/lib/type-helpers';
import type { coresdk, temporal } from '@temporalio/proto';
import { alea, RNG } from './alea';
import { RootCancellationScope } from './cancellation-scope';
import { UpdateScope } from './update-scope';
import { DeterminismViolationError, LocalActivityDoBackoff, isCancellation } from './errors';
import { QueryInput, SignalInput, UpdateInput, WorkflowExecuteInput, WorkflowInterceptors } from './interceptors';
import {
Expand Down Expand Up @@ -629,34 +630,37 @@ export class Activator implements ActivationHandler {
//
// Note that there is a deliberately unhandled promise rejection below.
// These are caught elsewhere and fail the corresponding activation.
let input: UpdateInput;
try {
if (runValidator && this.updateHandlers.get(name)?.validator) {
const validate = composeInterceptors(
this.interceptors.inbound,
'validateUpdate',
this.validateUpdateNextHandler.bind(this)
);
validate(makeInput());
const doUpdateImpl = async () => {
let input: UpdateInput;
try {
if (runValidator && this.updateHandlers.get(name)?.validator) {
const validate = composeInterceptors(
this.interceptors.inbound,
'validateUpdate',
this.validateUpdateNextHandler.bind(this)
);
validate(makeInput());
}
input = makeInput();
} catch (error) {
this.rejectUpdate(protocolInstanceId, error);
return;
}
input = makeInput();
} catch (error) {
this.rejectUpdate(protocolInstanceId, error);
return;
}
const execute = composeInterceptors(this.interceptors.inbound, 'handleUpdate', this.updateNextHandler.bind(this));
this.acceptUpdate(protocolInstanceId);
untrackPromise(
execute(input)
const execute = composeInterceptors(this.interceptors.inbound, 'handleUpdate', this.updateNextHandler.bind(this));
this.acceptUpdate(protocolInstanceId);
const res = execute(input)
.then((result) => this.completeUpdate(protocolInstanceId, result))
.catch((error) => {
if (error instanceof TemporalFailure) {
this.rejectUpdate(protocolInstanceId, error);
} else {
throw error;
}
})
);
});
untrackPromise(res);
return res;
};
untrackPromise(UpdateScope.updateWithInfo(updateId, name, doUpdateImpl));
}

protected async updateNextHandler({ name, args }: UpdateInput): Promise<unknown> {
Expand Down
67 changes: 67 additions & 0 deletions packages/workflow/src/update-scope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { AsyncLocalStorage as ALS } from 'node:async_hooks';

/**
* Option for constructing a UpdateScope
*/
export interface UpdateScopeOptions {
/**
* A workflow-unique identifier for this update.
*/
id: string;

/**
* The update type name.
*/
name: string;
}

// AsyncLocalStorage is injected via vm module into global scope.
// In case Workflow code is imported in Node.js context, replace with an empty class.
export const AsyncLocalStorage: new <T>() => ALS<T> = (globalThis as any).AsyncLocalStorage ?? class {};

export class UpdateScope {
/**
* A workflow-unique identifier for this update.
*/
public readonly id: string;

/**
* The update type name.
*/
public readonly name: string;

constructor(options: UpdateScopeOptions) {
this.id = options.id;
this.name = options.name;
}

/**
* Activate the scope as current and run the update handler `fn`.
*
* @return the result of `fn`
*/
run<T>(fn: () => Promise<T>): Promise<T> {
return storage.run(this, fn);
}

/**
* Get the current "active" update scope.
*/
static current(): UpdateScope | undefined {
return storage.getStore();
}

/** Alias to `new UpdateScope({ id, name }).run(fn)` */
static updateWithInfo<T>(id: string, name: string, fn: () => Promise<T>): Promise<T> {
return new this({ id, name }).run(fn);
}
}

const storage = new AsyncLocalStorage<UpdateScope>();

/**
* Disable the async local storage for updates.
*/
export function disableUpdateStorage(): void {
storage.disable();
}
2 changes: 2 additions & 0 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { msToTs, tsToMs } from '@temporalio/common/lib/time';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { coresdk } from '@temporalio/proto';
import { disableStorage } from './cancellation-scope';
import { disableUpdateStorage } from './update-scope';
import { DeterminismViolationError } from './errors';
import { WorkflowInterceptorsFactory } from './interceptors';
import { WorkflowCreateOptionsInternal } from './interfaces';
Expand Down Expand Up @@ -295,6 +296,7 @@ export function shouldUnblockConditions(job: coresdk.workflow_activation.IWorkfl
export function dispose(): void {
const dispose = composeInterceptors(getActivator().interceptors.internals, 'dispose', async () => {
disableStorage();
disableUpdateStorage();
});
dispose({});
}
15 changes: 15 additions & 0 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { Duration, msOptionalToTs, msToNumber, msToTs, tsToMs } from '@temporali
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { temporal } from '@temporalio/proto';
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
import { UpdateScope } from './update-scope';
import {
ActivityInput,
LocalActivityInput,
Expand All @@ -44,6 +45,7 @@ import {
SignalHandlerOptions,
UpdateHandlerOptions,
WorkflowInfo,
UpdateInfo,
} from './interfaces';
import { LocalActivityDoBackoff } from './errors';
import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes';
Expand Down Expand Up @@ -868,6 +870,19 @@ export function workflowInfo(): WorkflowInfo {
return activator.info;
}

/**
* Get information about the current update if any.
*
* @return Info for the current update handler the code calling this is executing
* within if any.
*
* @experimental
*/
export function currentUpdateInfo(): UpdateInfo | undefined {
antlai-temporal marked this conversation as resolved.
Show resolved Hide resolved
assertInWorkflowContext('Workflow.currentUpdateInfo(...) may only be used from a Workflow Execution.');
return UpdateScope.current();
}

/**
* Returns whether or not code is executing in workflow context
*/
Expand Down
Loading