Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: workflow API #1264

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b21a22c
refactor: reimplement Workflow API
himself65 Sep 26, 2024
498c734
revert: get / set api
himself65 Sep 26, 2024
ad81885
fix: type
himself65 Sep 27, 2024
06f0e5e
fix: step call
himself65 Sep 27, 2024
e4bab4e
fix: type
himself65 Sep 27, 2024
044054c
fix: stream event
himself65 Sep 27, 2024
72a26c4
fix: bring things back
himself65 Sep 27, 2024
18b93b3
fix: lint
himself65 Sep 27, 2024
5109e7b
fix: update example
himself65 Sep 27, 2024
1118029
feat: better handling of multi input
himself65 Sep 27, 2024
2d2047c
feat: in out checker
himself65 Sep 27, 2024
1ce4a55
fix: more test
himself65 Sep 27, 2024
3c623e1
feat: chat engine example
himself65 Sep 28, 2024
6e008fc
feat: workflow example
himself65 Sep 28, 2024
0ab163f
feat: loading
himself65 Sep 28, 2024
5eaafc7
fix: remove validation.ts
himself65 Sep 28, 2024
a2a242c
feat: recover api
himself65 Sep 28, 2024
def035b
Merge branch 'main' into himself65/20240925/signal
himself65 Sep 28, 2024
53c50b0
fix: pnpm
himself65 Sep 28, 2024
ee2a61f
chore: remove waku example
himself65 Sep 28, 2024
3310467
Merge remote-tracking branch 'upstream/main' into himself65/20240925/…
himself65 Sep 28, 2024
4bef7e4
feat: update examples
himself65 Sep 28, 2024
eae934a
fix: upload file
himself65 Sep 28, 2024
c64fa6c
Update giant-jars-grab.md
himself65 Sep 28, 2024
e75db73
feat: update example
himself65 Sep 29, 2024
568c5a7
feat: update example
himself65 Sep 29, 2024
cac0ab7
fix: lock
himself65 Sep 29, 2024
3a0913b
fix: utils
himself65 Sep 29, 2024
02788cc
fix: tsconfig
himself65 Sep 29, 2024
72c23a4
Merge remote-tracking branch 'upstream/main' into himself65/20240925/…
himself65 Sep 30, 2024
ee03495
fix: concurrency streaming
himself65 Sep 30, 2024
e1a3398
fix: concurrency regression
himself65 Sep 30, 2024
5d5a170
fix: type check
himself65 Sep 30, 2024
b7a7c7b
text: add cycle test
marcusschiesser Sep 30, 2024
51058ad
Merge remote-tracking branch 'upstream/ms/add-concurrent-workflowtest…
himself65 Sep 30, 2024
0145eb5
feat: update test sample
himself65 Sep 30, 2024
be46c7f
feat: implement `or` API
himself65 Sep 30, 2024
999832f
feat: `sendEvent`
himself65 Sep 30, 2024
c021076
feat: support requireEvent and sendEvent API
himself65 Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/giant-jars-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@llamaindex/core": patch
---

feat: update workflow implementation

- combine await behavior with async iterator, now there's only one queue to handle the `startEvent`.
- each Context will have its own step map, so the step can be executed in parallel.
9 changes: 9 additions & 0 deletions packages/core/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ export function stringifyJSONToMessageContent(value: JSONValue): string {
return JSON.stringify(value, null, 2).replace(/"([^"]*)"/g, "$1");
}

export function assertExists<T>(
value: T | null | undefined,
message: string = "Value does not exist",
): asserts value is T {
if (value === null || value === undefined) {
throw new Error(message);
}
}

export {
extractDataUrlComponents,
extractImage,
Expand Down
224 changes: 187 additions & 37 deletions packages/core/src/workflow/context.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,81 @@
import { type EventTypes, type WorkflowEvent } from "./events";
import { type StepFunction, type Workflow } from "./workflow";
import { assertExists } from "../utils";
import {
type EventTypes,
StartEvent,
StopEvent,
WorkflowEvent,
} from "./events";

export type StepFunction<
Start = string,
T extends WorkflowEvent = WorkflowEvent,
> = (context: Context<Start>, ev: T) => Promise<WorkflowEvent>;

export type StepMap = Map<
StepFunction<any>,
{ inputs: EventTypes[]; outputs: EventTypes[] | undefined }
>;

export type ReadonlyStepMap = ReadonlyMap<
StepFunction<any>,
{ inputs: EventTypes[]; outputs: EventTypes[] | undefined }
>;

export type ContextParams<Start> = {
startEvent: StartEvent<Start>;
steps: StepMap;
timeout: number | null;
verbose: boolean;
};

export class Context<Start = string>
implements AsyncIterable<WorkflowEvent, void, void>, Promise<StopEvent>
{
readonly #steps: ReadonlyStepMap;
// reverse map of #steps, helper for get the next step
readonly #eventMap: WeakMap<typeof WorkflowEvent, StepFunction<Start>>;

readonly #startEvent: StartEvent<Start>;
readonly #queue: WorkflowEvent[] = [];
readonly #globals: Map<string, any> = new Map();

export class Context {
#workflow: Workflow;
#queues: Map<StepFunction, WorkflowEvent[]> = new Map();
#eventBuffer: Map<EventTypes, WorkflowEvent[]> = new Map();
#globals: Map<string, any> = new Map();
#streamingQueue: WorkflowEvent[] = [];
running: boolean = true;

#running: boolean = true;
#timeout: number | null = null;
#verbose: boolean = false;

constructor(params: { workflow: Workflow; verbose?: boolean }) {
this.#workflow = params.workflow;
#getStepFunction(event: WorkflowEvent): StepFunction<Start> | undefined {
return this.#eventMap.get(event.constructor as EventTypes);
}

get running(): boolean {
return this.#running;
}

constructor(params: ContextParams<Start>) {
this.#steps = params.steps;
this.#startEvent = params.startEvent;
// init eventMap
this.#eventMap = new WeakMap();
for (const [step, { inputs }] of this.#steps) {
for (const input of inputs) {
this.#eventMap.set(input, step);
}
}

if (typeof params.timeout === "number") {
this.#timeout = params.timeout;
}
this.#verbose = params.verbose ?? false;

// push start event to the queue
const step = this.#getStepFunction(this.#startEvent);
assertExists(step, "No step found for start event");
this.#pushEvent(this.#startEvent, step);
}

// these two API is not type safe
set(key: string, value: any): void {
himself65 marked this conversation as resolved.
Show resolved Hide resolved
this.#globals.set(key, value);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the globals here. because I think it's not really used in the code. Im thinking wheter to support this by adding extend syntax

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see globals removed? But in any case, the get/set api (which stores stuff into globals) is a pretty useful api for sharing data between steps

}
Expand All @@ -25,7 +86,6 @@ export class Context {
} else if (defaultValue !== undefined) {
return defaultValue;
}
throw new Error(`Key '${key}' not found in Context`);
}

collectEvents(
Expand Down Expand Up @@ -62,50 +122,140 @@ export class Context {
return null;
}

sendEvent(message: WorkflowEvent, step?: StepFunction): void {
const stepName = step?.name ? `step ${step.name}` : "all steps";
#pushEvent(event: WorkflowEvent, step: StepFunction<Start>): void {
const stepName = step.name ? `step ${step.name}` : "all steps";
if (this.#verbose) {
console.log(`Sending event ${message} to ${stepName}`);
console.log(`Sending event ${event} to ${stepName}`);
}
if (step === undefined) {
for (const queue of this.#queues.values()) {
queue.push(message);
}
} else {
if (!this.#workflow.hasStep(step)) {
throw new Error(`Step ${step} does not exist`);
}

if (!this.#queues.has(step)) {
this.#queues.set(step, []);
}
this.#queues.get(step)!.push(message);
if (!this.#steps.has(step)) {
throw new Error(`Step ${step} does not exist`);
}

this.#queue.push(event);
}

getNextEvent(step: StepFunction): WorkflowEvent | undefined {
const queue = this.#queues.get(step);
if (queue && queue.length > 0) {
return queue.shift();
// make sure it will only be called once
#iterator: AsyncGenerator<WorkflowEvent, void, void> | null = null;
#signal: AbortSignal | null = null;
get #iteratorSingleton(): AsyncGenerator<WorkflowEvent, void, void> {
if (this.#iterator === null) {
this.#iterator = this.#createStreamEvents();
}
return undefined;
return this.#iterator;
}

writeEventToStream(event: WorkflowEvent): void {
this.#streamingQueue.push(event);
himself65 marked this conversation as resolved.
Show resolved Hide resolved
[Symbol.asyncIterator](): AsyncGenerator<WorkflowEvent, void, void> {
return this.#iteratorSingleton;
}

async *streamEvents(): AsyncGenerator<WorkflowEvent, void, void> {
async *#createStreamEvents(): AsyncGenerator<WorkflowEvent, void, void> {
while (true) {
const event = this.#streamingQueue.shift();
const event = this.#queue.shift();
if (event) {
yield event;
// event handling
const step = this.#getStepFunction(event);
assertExists(step, `No step found for event ${event.displayName}`);
const nextEvent = await step.call(null, this, event);
if (nextEvent instanceof StopEvent) {
himself65 marked this conversation as resolved.
Show resolved Hide resolved
this.#running = false;
yield nextEvent;
return;
} else {
const nextStep = this.#getStepFunction(nextEvent);
assertExists(
nextStep,
`No step found for event ${nextEvent.displayName}`,
);
this.#pushEvent(nextEvent, nextStep);
}
} else {
if (!this.running) {
break;
if (!this.#running) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
}

then<TResult1, TResult2 = never>(
onfulfilled?:
| ((value: StopEvent) => TResult1 | PromiseLike<TResult1>)
| null
| undefined,
onrejected?:
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
| undefined,
) {
const stopWorkflow = () => {
if (this.#running) {
this.#running = false;
}
};

if (this.#timeout !== null) {
const timeout = this.#timeout;
this.#signal = AbortSignal.timeout(timeout * 1000);
this.#signal.addEventListener(
"abort",
() => {
stopWorkflow();
},
{
once: true,
},
);
}

return new Promise<StopEvent>(async (resolve, reject) => {
this.#signal?.addEventListener("abort", () => {
reject(new Error(`Operation timed out after ${this.#timeout} seconds`));
});
try {
for await (const event of this.#iteratorSingleton) {
if (event instanceof StartEvent) {
if (this.#verbose) {
console.log(`Starting workflow with event ${event}`);
}
}
if (event instanceof StopEvent) {
resolve(event);
}
}
const nextValue = await this.#iteratorSingleton.next();
if (nextValue.done === false) {
reject(new Error("Workflow did not complete"));
}
} catch (err) {
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
reject(err);
}
})
.then(onfulfilled)
.catch(onrejected);
}

catch<TResult = never>(
onrejected?:
| ((reason: any) => TResult | PromiseLike<TResult>)
| null
| undefined,
) {
return this.then((v) => v, onrejected);
}

finally(onfinally?: (() => void) | undefined | null) {
return this.then(
() => {
onfinally?.();
},
() => {
onfinally?.();
},
) as Promise<any>;
}

[Symbol.toStringTag]: string = "Context";
}
4 changes: 3 additions & 1 deletion packages/core/src/workflow/events.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
export class WorkflowEvent<T extends Record<string, any> = any> {
displayName: string;
data: T;

constructor(data: T) {
this.data = data;
this.displayName = this.constructor.name;
}

toString() {
return `${this.constructor.name}(${JSON.stringify(this.data)})`;
return this.displayName;
himself65 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
11 changes: 8 additions & 3 deletions packages/core/src/workflow/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export * from "./context";
export * from "./events";
export * from "./workflow";
export { Context } from "./context";
export {
StartEvent,
StopEvent,
WorkflowEvent,
type EventTypes,
} from "./events";
export { Workflow } from "./workflow";
Loading
Loading