Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: socket #445

Merged
merged 40 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
89d28ea
feat: queue
thantos Aug 2, 2023
0300086
more queue work
thantos Aug 4, 2023
6d8a753
complete runtime changes
thantos Aug 9, 2023
20f6a07
add fifo to core and runtime
thantos Aug 9, 2023
e2f76d5
add aws queue client
thantos Aug 10, 2023
37b5d2a
runtime queue handler worker
thantos Aug 10, 2023
335b03d
queue handler worker aws
thantos Aug 10, 2023
9c054a9
update aws handlers with queue client
thantos Aug 10, 2023
b43b1c5
local support
thantos Aug 11, 2023
ef62792
simplify queue
thantos Aug 11, 2023
9e744c3
start cdk changes
thantos Aug 11, 2023
b261147
queue cdk service and shared props
thantos Aug 11, 2023
7c94130
runtime test
thantos Aug 14, 2023
b43fdc6
tests pass
thantos Aug 23, 2023
47d73db
suppor delay in local queue
thantos Aug 23, 2023
9402ad3
support turning off SSE
thantos Aug 23, 2023
389952c
send message batch and delete batch
thantos Aug 23, 2023
a95bab0
woops
thantos Aug 23, 2023
5b1a986
Merge branch 'main' into sussman/feat/queue
thantos Aug 24, 2023
61d1cfe
re-add comma
thantos Aug 28, 2023
cc24252
fix(local): remove local ambigous event order issue
thantos Aug 28, 2023
d2a1f88
Merge branch 'main' into sussman/feat/queue
thantos Sep 12, 2023
59d654d
feat: add task event input
thantos Sep 12, 2023
554966b
feat: socket
thantos Sep 12, 2023
050e656
more things
thantos Sep 12, 2023
a0c6438
more items done
thantos Sep 13, 2023
0c6ba62
Merge branch 'main' into sussman/feat/socket
thantos Sep 13, 2023
7945304
more
thantos Sep 13, 2023
95f2245
infer
thantos Sep 13, 2023
6d4af52
worker things
thantos Sep 13, 2023
786bb32
snapshot
thantos Sep 13, 2023
a9cffa9
tests, working, some refactors
thantos Sep 14, 2023
c9791e8
feedback
thantos Sep 14, 2023
b14c5e8
rename delete to disconnect
thantos Sep 14, 2023
cb5b2e5
feedback and middleware
thantos Sep 14, 2023
f2ef505
service data and display
thantos Sep 14, 2023
20dafc7
shapshot
thantos Sep 14, 2023
e35d43d
support middleware for all socket routes
thantos Sep 14, 2023
47ad64e
local!!!
thantos Sep 14, 2023
41d55c0
snapshot
thantos Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/tests/aws-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@eventual/cli": "workspace:^",
"@eventual/client": "workspace:^",
"@eventual/core": "workspace:^",
"ws": "^8.14.1",
"zod": "^3.21.4"
},
"devDependencies": {
Expand All @@ -30,6 +31,7 @@
"@types/aws-lambda": "^8.10.115",
"@types/jest": "^29.5.1",
"@types/node": "^18",
"@types/ws": "^8.5.5",
"aws-cdk": "^2.80.0",
"esbuild": "^0.17.4",
"jest": "^29",
Expand Down
222 changes: 205 additions & 17 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
sendSignal,
sendTaskHeartbeat,
signal,
socket,
subscription,
task,
time,
Expand All @@ -36,6 +37,7 @@ import {
} from "@eventual/core";
import type openapi from "openapi3-ts";
import { Readable } from "stream";
import { WebSocket } from "ws";
import z from "zod";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";

Expand Down Expand Up @@ -1378,23 +1380,209 @@ export const searchBlog = command(
}
);

// check types of entity
function streamShouldHaveEmptyIfNoInclude() {
counter.stream("", {}, (item) => {
if (item.operation === "modify") {
// @ts-expect-error - no oldValue without includeOld: true
item.oldValue!.namespace;
/**
* Socket Test!
*
* 1. a command start a workflow that will communicate with the sockets.
* 2. the command starts 2 web sockets against socket1
* 3. on each connection, the socket will send a signal to the workflow
* 4. when the workflow get the connection, it will send a start signal to the socket
* 5. the socket will send a "n" (local id for the connection) and value to the socket, which will forward the message to the workflow
* 6. when the workflow has received 2 connections with n and v values, it will send a message to each socket with the n and v incremented by 1
* 7. the socket will resolve the value in the connection, completing the test
*/

interface SocketMessage {
id: string;
v: number;
}

const jsonSocket = socket.use({
message: ({ request: { body }, context, next }) => {
console.log(body);
const data = body
? body instanceof Buffer
? (JSON.parse(body.toString("utf-8")) as SocketMessage)
: (JSON.parse(body) as SocketMessage)
: undefined;
console.log("data", data);
if (!data) {
throw new Error("Expected data");
}
});
counter.stream(
"",
{
includeOld: true,
},
(item) => {
if (item.operation === "modify") {
item.oldValue.namespace;
return next({ ...context, data });
},
});

export const socket1 = jsonSocket.use(({ request, context, next }) => {
const { id, n } = (request.query ?? {}) as { n?: string; id?: string };
if (!id || !n) {
throw new Error("Missing ID");
}
return next({ ...context, id, n });
})("socket1", {
$connect: async ({ connectionId }, { id, n }) => {
console.log("sending signal to", id);
await socketConnectSignal.sendSignal(id, {
connectionId,
n: Number(n),
});
console.log("signal sent to", id);
},
$disconnect: async () => undefined,
$default: async ({ connectionId }, { data }) => {
console.log("sending signal to", data.id);
await socketMessageSignal.sendSignal(data.id, {
...data,
connectionId,
});
console.log("signal sent to", data.id);
},
});

export const socketConnectSignal = signal<{ connectionId: string; n: number }>(
"socketConnectSignal"
);
export const socketMessageSignal = signal<{
connectionId: string;
v: number;
}>("socketMessageSignal");

interface StartSocketEvent {
type: "start";
n: number;
v: number;
}

interface DataSocketEvent {
type: "data";
n: number;
v: number;
}

export const socketWorkflow = workflow(
"socketWorkflow",
{ timeout: duration(15, "minutes") },
async () => {
const connections: Record<string, { n: number; v?: number }> = {};

socketConnectSignal.onSignal(async ({ connectionId, n }) => {
connections[connectionId] = { n };
await socket1.send(
connectionId,
JSON.stringify({
type: "start",
n,
v: n + 1,
} satisfies StartSocketEvent)
);
});

socketMessageSignal.onSignal(({ connectionId, v }) => {
const entry = connections[connectionId];
if (entry) {
entry.v = v;
}
}
);
});

await condition(
() =>
Object.keys(connections).length > 1 &&
Object.values(connections).every((c) => c.v !== undefined)
);

await Promise.all(
Object.entries(connections).map(async ([connectionId, { n, v }]) => {
await socket1.send(
connectionId,
JSON.stringify({
type: "data",
n,
v: (v ?? 0) + 1,
} satisfies DataSocketEvent)
);
})
);

// close the connections
await Promise.all(
Object.entries(connections).map(async ([connectionId]) => {
await socket1.disconnect(connectionId);
})
);
}
);

export const socketTest = command(
"socketTest",
{ handlerTimeout: duration(5, "minute") },
async () => {
// start workflow that waits for connections
const { executionId } = await socketWorkflow.startExecution({
input: undefined,
});
const encodedId = encodeURIComponent(executionId);

console.log("pre-socket");

const ws1 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=0`);
const ws2 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=1`);

console.log("setup-socket");

const running1 = setupWS(executionId, ws1);
const running2 = setupWS(executionId, ws2);

console.log("waiting...");

return await Promise.all([running1, running2]);
}
);

function setupWS(executionId: string, ws: WebSocket) {
let n: number | undefined;
let v: number | undefined;
return new Promise<number>((resolve, reject) => {
ws.on("error", (err) => {
console.log("error", err);
reject(err);
});
ws.on("message", (data) => {
try {
console.log(n, "message");
const d = (data as Buffer).toString("utf8");
console.log(d);
const event = JSON.parse(d) as StartSocketEvent | DataSocketEvent;
if (event.type === "start") {
n = event.n;
// after connecting, we will send our "n" and incremented "value" back.
ws.send(
JSON.stringify({
id: executionId,
v: event.v + 1,
} satisfies SocketMessage)
);
} else if (event.type === "data") {
v = event.v;
} else {
console.log("unexpected event", event);
reject(event);
}
} catch (err) {
console.error(err);
reject(err);
}
});
ws.on("close", (code, reason) => {
try {
console.log(code, reason.toString("utf-8"));
console.log(n, "close", v);
if (n === undefined) {
throw new Error("n was not set");
}
resolve(v ?? -1);
} catch (err) {
reject(err);
}
});
});
}
10 changes: 10 additions & 0 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ test("test service context", async () => {
});
});

test("socket test", async () => {
const rpcResponse = await (
await fetch(`${url}/${commandRpcPath({ name: "socketTest" })}`, {
method: "POST",
})
).json();

expect(rpcResponse).toEqual([3, 4]);
});

if (!process.env.TEST_LOCAL) {
test("index.search", async () => {
const serviceClient = new ServiceClient<typeof TestService>({
Expand Down
14 changes: 14 additions & 0 deletions packages/@eventual/aws-cdk/src/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
EVENTUAL_SYSTEM_COMMAND_NAMESPACE,
QueueHandlerSpec,
QueueSpec,
SocketSpec,
SubscriptionSpec,
TaskSpec,
} from "@eventual/core/internal";
Expand Down Expand Up @@ -80,6 +81,7 @@ const WORKER_ENTRY_POINTS = [
"bucket-handler-worker",
"queue-handler-worker",
"transaction-worker",
"socket-worker",
] as const;

export async function buildService(request: BuildAWSRuntimeProps) {
Expand Down Expand Up @@ -127,6 +129,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
name: "default",
},
},
sockets: await bundleSocketHandlers(serviceSpec.sockets),
search: serviceSpec.search,
entities: {
entities: await Promise.all(
Expand Down Expand Up @@ -299,6 +302,17 @@ export async function buildService(request: BuildAWSRuntimeProps) {
};
}

async function bundleSocketHandlers(specs: SocketSpec[]) {
return await Promise.all(
specs.map(async (spec) => {
return {
entry: await bundleFile(spec, "socket", "socket-worker", spec.name),
spec,
};
})
);
}

async function bundleFile<
Spec extends CommandSpec | SubscriptionSpec | TaskSpec | QueueHandlerSpec
>(
Expand Down
3 changes: 3 additions & 0 deletions packages/@eventual/aws-cdk/src/entity-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ export class EntityService<Service> {
this.configureReadWriteEntityTable(this.transactionWorker);
props.workflowService.configureSendSignal(this.transactionWorker);
props.eventService.configureEmit(this.transactionWorker);
props.socketService.configureInvokeSocketEndpoints(
this.transactionWorker
);
}
}

Expand Down
3 changes: 3 additions & 0 deletions packages/@eventual/aws-cdk/src/service-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { LazyInterface } from "./proxy-construct";
import { QueueService } from "./queue-service";
import { SearchService } from "./search/search-service";
import { Service } from "./service";
import { SocketService } from "./socket-service";

export interface ServiceConstructProps {
/**
Expand Down Expand Up @@ -38,6 +39,7 @@ export interface WorkerServiceConstructProps extends ServiceConstructProps {
bucketService: LazyInterface<BucketService<Service>>;
entityService: LazyInterface<EntityService<Service>>;
searchService: LazyInterface<SearchService<Service>> | undefined;
socketService: LazyInterface<SocketService<Service>>;
}

export function configureWorkerCalls(
Expand All @@ -50,4 +52,5 @@ export function configureWorkerCalls(
serviceProps.bucketService.configureReadWriteBuckets(func);
serviceProps.entityService.configureReadWriteEntityTable(func);
serviceProps.entityService.configureInvokeTransactions(func);
serviceProps.socketService.configureInvokeSocketEndpoints(func);
}
Loading