Skip to content

Commit

Permalink
feat: queue (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Sep 12, 2023
1 parent 9104996 commit 6f0dfea
Show file tree
Hide file tree
Showing 61 changed files with 2,199 additions and 177 deletions.
70 changes: 70 additions & 0 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
event,
expectSignal,
index,
queue,
sendSignal,
sendTaskHeartbeat,
signal,
Expand Down Expand Up @@ -1087,6 +1088,75 @@ export const bucketWorkflow = workflow(
}
);

const queueSignal = signal<{ n: number }>("queueSignal");
const fifoQueueSignal = signal<{ n: number }>("fifoQueueSignal");

export const testQueue = queue<{
executionId: string;
source: "workflow" | "queue";
n: number;
}>("testQueue", {}, async (messages) => {
await Promise.all(
messages.map(async (m) => {
if (m.message.source === "workflow") {
await testFifoQueue.sendMessage({
executionId: m.message.executionId,
source: "queue",
n: m.message.n + 1,
});
} else if (m.message.source === "queue") {
await queueSignal.sendSignal(m.message.executionId, {
n: m.message.n + 1,
});
}
})
);
});

export const testFifoQueue = queue<{
executionId: string;
source: "workflow" | "queue";
n: number;
}>(
"testFifoQueue",
{
fifo: true,
groupBy: (m) => m.executionId,
dedupe: { contentBasedDeduplication: true },
},
async (messages) => {
await Promise.all(
messages.map(async (m) => {
if (m.message.source === "workflow") {
await testQueue.sendMessage({
executionId: m.message.executionId,
source: "queue",
n: m.message.n + 1,
});
} else if (m.message.source === "queue") {
await fifoQueueSignal.sendSignal(m.message.executionId, {
n: m.message.n + 1,
});
}
})
);
}
);

export const queueWorkflow = workflow(
"queueWorkflow",
async (_, { execution: { id } }) => {
const [, , { n }, { n: fifo_n }] = await Promise.all([
testQueue.sendMessage({ executionId: id, source: "workflow", n: 1 }),
testFifoQueue.sendMessage({ executionId: id, source: "workflow", n: 1 }),
queueSignal.expectSignal(),
fifoQueueSignal.expectSignal(),
]);

return { n, fifo_n };
}
);

export const hello3 = api.post("/hello3", () => {
return new HttpResponse("hello?");
});
Expand Down
3 changes: 3 additions & 0 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
failedWorkflow,
heartbeatWorkflow,
parentWorkflow,
queueWorkflow,
timedOutWorkflow,
timedWorkflow,
transactionWorkflow,
Expand Down Expand Up @@ -253,6 +254,8 @@ eventualRuntimeTestHarness(
signalResult4: { data: "hello again again again!" },
copied: "hello again again again!",
});

testCompletion("queue", queueWorkflow, { n: 3, fifo_n: 3 });
},
{
name: "s3 persist failures",
Expand Down
152 changes: 67 additions & 85 deletions packages/@eventual/aws-cdk/src/build.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { build, BuildSource, infer } from "@eventual/compiler";
import { BuildManifest } from "@eventual/core-runtime";
import { BuildManifest, QueueRuntime } from "@eventual/core-runtime";
import {
BucketNotificationHandlerSpec,
CommandSpec,
EntityStreamSpec,
EVENTUAL_SYSTEM_COMMAND_NAMESPACE,
ServiceType,
QueueHandlerSpec,
QueueSpec,
SubscriptionSpec,
TaskSpec,
} from "@eventual/core/internal";
Expand Down Expand Up @@ -70,6 +71,17 @@ export interface BuildAWSRuntimeProps {
};
}

const WORKER_ENTRY_POINTS = [
"orchestrator",
"task-worker",
"command-worker",
"subscription-worker",
"entity-stream-worker",
"bucket-handler-worker",
"queue-handler-worker",
"transaction-worker",
] as const;

export async function buildService(request: BuildAWSRuntimeProps) {
const outDir = request.outDir;
const serviceSpec = await infer(request.entry, request.openApi);
Expand All @@ -82,16 +94,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
await fs.promises.writeFile(specPath, JSON.stringify(serviceSpec, null, 2));

const [
[
// bundle the default handlers first as we refer to them when bundling all of the individual handlers
orchestrator,
monoTaskFunction,
monoCommandFunction,
monoSubscriptionFunction,
monoEntityStreamWorkerFunction,
monoBucketHandlerWorkerFunction,
transactionWorkerFunction,
],
monolithFunctions,
[
// also bundle each of the internal eventual API Functions as they have no dependencies
taskFallbackHandler,
Expand Down Expand Up @@ -119,7 +122,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
subscriptions,
commands,
commandDefault: {
entry: monoCommandFunction!,
entry: monolithFunctions["command-worker"],
spec: {
name: "default",
},
Expand All @@ -142,9 +145,20 @@ export async function buildService(request: BuildAWSRuntimeProps) {
}))
),
},
queues: {
queues: await Promise.all(
serviceSpec.queues.map(
async (q) =>
({
...q,
handler: await bundleQueueHandler(q),
} satisfies QueueRuntime)
)
),
},
system: {
entityService: {
transactionWorker: { entry: transactionWorkerFunction! },
transactionWorker: { entry: monolithFunctions["transaction-worker"] },
},
taskService: {
fallbackHandler: { entry: taskFallbackHandler! },
Expand Down Expand Up @@ -192,7 +206,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
},
workflowService: {
orchestrator: {
entry: orchestrator!,
entry: monolithFunctions.orchestrator!,
},
},
},
Expand All @@ -207,14 +221,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
return await Promise.all(
commandSpecs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"command",
"command-worker",
spec.name,
monoCommandFunction!
),
entry: await bundleFile(spec, "command", "command-worker", spec.name),
spec,
};
})
Expand All @@ -226,12 +233,10 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"subscription",
"subscription-worker",
spec.name,
monoSubscriptionFunction!
spec.name
),
spec,
};
Expand All @@ -243,14 +248,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
return await Promise.all(
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"task",
"task-worker",
spec.name,
monoTaskFunction!
),
entry: await bundleFile(spec, "task", "task-worker", spec.name),
spec,
};
})
Expand All @@ -262,12 +260,10 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"entity-streams",
"entity-stream-worker",
spec.name,
monoEntityStreamWorkerFunction!
spec.name
),
spec,
};
Expand All @@ -280,28 +276,36 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"bucket-handlers",
"bucket-handler-worker",
spec.name,
monoBucketHandlerWorkerFunction!
spec.name
),
spec,
};
})
);
}

async function bundleQueueHandler(spec: QueueSpec) {
return {
entry: await bundleFile(
spec.handler,
"queue-handlers",
"queue-handler-worker",
spec.name
),
spec: spec.handler,
};
}

async function bundleFile<
Spec extends CommandSpec | SubscriptionSpec | TaskSpec
Spec extends CommandSpec | SubscriptionSpec | TaskSpec | QueueHandlerSpec
>(
specPath: string,
spec: Spec,
pathPrefix: string,
entryPoint: string,
name: string,
monoFunction: string
entryPoint: (typeof WORKER_ENTRY_POINTS)[number],
name: string
): Promise<string> {
return spec.sourceLocation?.fileName
? // we know the source location of the command, so individually build it from that
Expand All @@ -315,48 +319,26 @@ export async function buildService(request: BuildAWSRuntimeProps) {
injectedEntry: spec.sourceLocation.fileName,
injectedServiceSpec: specPath,
})
: monoFunction;
: monolithFunctions[entryPoint];
}

function bundleMonolithDefaultHandlers(specPath: string) {
return Promise.all(
[
{
name: ServiceType.OrchestratorWorker,
entry: runtimeHandlersEntrypoint("orchestrator"),
},
{
name: ServiceType.TaskWorker,
entry: runtimeHandlersEntrypoint("task-worker"),
},
{
name: ServiceType.CommandWorker,
entry: runtimeHandlersEntrypoint("command-worker"),
},
{
name: ServiceType.Subscription,
entry: runtimeHandlersEntrypoint("subscription-worker"),
},
{
name: ServiceType.EntityStreamWorker,
entry: runtimeHandlersEntrypoint("entity-stream-worker"),
},
{
name: ServiceType.BucketNotificationHandlerWorker,
entry: runtimeHandlersEntrypoint("bucket-handler-worker"),
},
{
name: ServiceType.TransactionWorker,
entry: runtimeHandlersEntrypoint("transaction-worker"),
},
]
.map((s) => ({
...s,
injectedEntry: request.entry,
injectedServiceSpec: specPath,
}))
.map(buildFunction)
);
async function bundleMonolithDefaultHandlers(specPath: string) {
return Object.fromEntries(
await Promise.all(
WORKER_ENTRY_POINTS.map(
async (name) =>
[
name,
await buildFunction({
entry: runtimeHandlersEntrypoint(name),
name,
injectedEntry: request.entry,
injectedServiceSpec: specPath,
}),
] as const
)
)
) as Record<(typeof WORKER_ENTRY_POINTS)[number], string>;
}

function bundleEventualSystemFunctions(specPath: string) {
Expand Down
Loading

0 comments on commit 6f0dfea

Please sign in to comment.