Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queue #430

Merged
merged 22 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
event,
expectSignal,
index,
queue,
sendSignal,
sendTaskHeartbeat,
signal,
Expand Down Expand Up @@ -1087,6 +1088,75 @@ export const bucketWorkflow = workflow(
}
);

const queueSignal = signal<{ n: number }>("queueSignal");
const fifoQueueSignal = signal<{ n: number }>("fifoQueueSignal");

export const testQueue = queue<{
executionId: string;
source: "workflow" | "queue";
n: number;
}>("testQueue", {}, async (messages) => {
await Promise.all(
messages.map(async (m) => {
if (m.message.source === "workflow") {
await testFifoQueue.sendMessage({
executionId: m.message.executionId,
source: "queue",
n: m.message.n + 1,
});
} else if (m.message.source === "queue") {
await queueSignal.sendSignal(m.message.executionId, {
n: m.message.n + 1,
});
}
})
);
});

export const testFifoQueue = queue<{
executionId: string;
source: "workflow" | "queue";
n: number;
}>(
"testFifoQueue",
{
fifo: true,
groupBy: (m) => m.executionId,
dedupe: { contentBasedDeduplication: true },
},
async (messages) => {
await Promise.all(
messages.map(async (m) => {
if (m.message.source === "workflow") {
await testQueue.sendMessage({
executionId: m.message.executionId,
source: "queue",
n: m.message.n + 1,
});
} else if (m.message.source === "queue") {
await fifoQueueSignal.sendSignal(m.message.executionId, {
n: m.message.n + 1,
});
}
})
);
}
);

export const queueWorkflow = workflow(
"queueWorkflow",
async (_, { execution: { id } }) => {
const [, , { n }, { n: fifo_n }] = await Promise.all([
testQueue.sendMessage({ executionId: id, source: "workflow", n: 1 }),
testFifoQueue.sendMessage({ executionId: id, source: "workflow", n: 1 }),
queueSignal.expectSignal(),
fifoQueueSignal.expectSignal(),
]);

return { n, fifo_n };
}
);

export const hello3 = api.post("/hello3", () => {
return new HttpResponse("hello?");
});
Expand Down
3 changes: 3 additions & 0 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
failedWorkflow,
heartbeatWorkflow,
parentWorkflow,
queueWorkflow,
timedOutWorkflow,
timedWorkflow,
transactionWorkflow,
Expand Down Expand Up @@ -253,6 +254,8 @@ eventualRuntimeTestHarness(
signalResult4: { data: "hello again again again!" },
copied: "hello again again again!",
});

testCompletion("queue", queueWorkflow, { n: 3, fifo_n: 3 });
},
{
name: "s3 persist failures",
Expand Down
152 changes: 67 additions & 85 deletions packages/@eventual/aws-cdk/src/build.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { build, BuildSource, infer } from "@eventual/compiler";
import { BuildManifest } from "@eventual/core-runtime";
import { BuildManifest, QueueRuntime } from "@eventual/core-runtime";
import {
BucketNotificationHandlerSpec,
CommandSpec,
EntityStreamSpec,
EVENTUAL_SYSTEM_COMMAND_NAMESPACE,
ServiceType,
QueueHandlerSpec,
QueueSpec,
SubscriptionSpec,
TaskSpec,
} from "@eventual/core/internal";
Expand Down Expand Up @@ -70,6 +71,17 @@ export interface BuildAWSRuntimeProps {
};
}

const WORKER_ENTRY_POINTS = [
"orchestrator",
"task-worker",
"command-worker",
"subscription-worker",
"entity-stream-worker",
"bucket-handler-worker",
"queue-handler-worker",
"transaction-worker",
] as const;

export async function buildService(request: BuildAWSRuntimeProps) {
const outDir = request.outDir;
const serviceSpec = await infer(request.entry, request.openApi);
Expand All @@ -82,16 +94,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
await fs.promises.writeFile(specPath, JSON.stringify(serviceSpec, null, 2));

const [
[
// bundle the default handlers first as we refer to them when bundling all of the individual handlers
orchestrator,
monoTaskFunction,
monoCommandFunction,
monoSubscriptionFunction,
monoEntityStreamWorkerFunction,
monoBucketHandlerWorkerFunction,
transactionWorkerFunction,
],
monolithFunctions,
[
// also bundle each of the internal eventual API Functions as they have no dependencies
taskFallbackHandler,
Expand Down Expand Up @@ -119,7 +122,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
subscriptions,
commands,
commandDefault: {
entry: monoCommandFunction!,
entry: monolithFunctions["command-worker"],
spec: {
name: "default",
},
Expand All @@ -142,9 +145,20 @@ export async function buildService(request: BuildAWSRuntimeProps) {
}))
),
},
queues: {
queues: await Promise.all(
serviceSpec.queues.map(
async (q) =>
({
...q,
handler: await bundleQueueHandler(q),
} satisfies QueueRuntime)
)
),
},
system: {
entityService: {
transactionWorker: { entry: transactionWorkerFunction! },
transactionWorker: { entry: monolithFunctions["transaction-worker"] },
},
taskService: {
fallbackHandler: { entry: taskFallbackHandler! },
Expand Down Expand Up @@ -192,7 +206,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
},
workflowService: {
orchestrator: {
entry: orchestrator!,
entry: monolithFunctions.orchestrator!,
},
},
},
Expand All @@ -207,14 +221,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
return await Promise.all(
commandSpecs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"command",
"command-worker",
spec.name,
monoCommandFunction!
),
entry: await bundleFile(spec, "command", "command-worker", spec.name),
spec,
};
})
Expand All @@ -226,12 +233,10 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"subscription",
"subscription-worker",
spec.name,
monoSubscriptionFunction!
spec.name
),
spec,
};
Expand All @@ -243,14 +248,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
return await Promise.all(
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"task",
"task-worker",
spec.name,
monoTaskFunction!
),
entry: await bundleFile(spec, "task", "task-worker", spec.name),
spec,
};
})
Expand All @@ -262,12 +260,10 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"entity-streams",
"entity-stream-worker",
spec.name,
monoEntityStreamWorkerFunction!
spec.name
),
spec,
};
Expand All @@ -280,28 +276,36 @@ export async function buildService(request: BuildAWSRuntimeProps) {
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"bucket-handlers",
"bucket-handler-worker",
spec.name,
monoBucketHandlerWorkerFunction!
spec.name
),
spec,
};
})
);
}

async function bundleQueueHandler(spec: QueueSpec) {
return {
entry: await bundleFile(
spec.handler,
"queue-handlers",
"queue-handler-worker",
spec.name
),
spec: spec.handler,
};
}

async function bundleFile<
Spec extends CommandSpec | SubscriptionSpec | TaskSpec
Spec extends CommandSpec | SubscriptionSpec | TaskSpec | QueueHandlerSpec
>(
specPath: string,
spec: Spec,
pathPrefix: string,
entryPoint: string,
name: string,
monoFunction: string
entryPoint: (typeof WORKER_ENTRY_POINTS)[number],
name: string
): Promise<string> {
return spec.sourceLocation?.fileName
? // we know the source location of the command, so individually build it from that
Expand All @@ -315,48 +319,26 @@ export async function buildService(request: BuildAWSRuntimeProps) {
injectedEntry: spec.sourceLocation.fileName,
injectedServiceSpec: specPath,
})
: monoFunction;
: monolithFunctions[entryPoint];
}

function bundleMonolithDefaultHandlers(specPath: string) {
return Promise.all(
[
{
name: ServiceType.OrchestratorWorker,
entry: runtimeHandlersEntrypoint("orchestrator"),
},
{
name: ServiceType.TaskWorker,
entry: runtimeHandlersEntrypoint("task-worker"),
},
{
name: ServiceType.CommandWorker,
entry: runtimeHandlersEntrypoint("command-worker"),
},
{
name: ServiceType.Subscription,
entry: runtimeHandlersEntrypoint("subscription-worker"),
},
{
name: ServiceType.EntityStreamWorker,
entry: runtimeHandlersEntrypoint("entity-stream-worker"),
},
{
name: ServiceType.BucketNotificationHandlerWorker,
entry: runtimeHandlersEntrypoint("bucket-handler-worker"),
},
{
name: ServiceType.TransactionWorker,
entry: runtimeHandlersEntrypoint("transaction-worker"),
},
]
.map((s) => ({
...s,
injectedEntry: request.entry,
injectedServiceSpec: specPath,
}))
.map(buildFunction)
);
async function bundleMonolithDefaultHandlers(specPath: string) {
return Object.fromEntries(
await Promise.all(
WORKER_ENTRY_POINTS.map(
async (name) =>
[
name,
await buildFunction({
entry: runtimeHandlersEntrypoint(name),
name,
injectedEntry: request.entry,
injectedServiceSpec: specPath,
}),
] as const
)
)
) as Record<(typeof WORKER_ENTRY_POINTS)[number], string>;
}

function bundleEventualSystemFunctions(specPath: string) {
Expand Down
Loading