Skip to content

Commit

Permalink
tests, working, some refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos committed Sep 14, 2023
1 parent 786bb32 commit a9cffa9
Show file tree
Hide file tree
Showing 26 changed files with 618 additions and 149 deletions.
2 changes: 2 additions & 0 deletions apps/tests/aws-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@eventual/cli": "workspace:^",
"@eventual/client": "workspace:^",
"@eventual/core": "workspace:^",
"ws": "^8.14.1",
"zod": "^3.21.4"
},
"devDependencies": {
Expand All @@ -30,6 +31,7 @@
"@types/aws-lambda": "^8.10.115",
"@types/jest": "^29.5.1",
"@types/node": "^18",
"@types/ws": "^8.5.5",
"aws-cdk": "^2.80.0",
"esbuild": "^0.17.4",
"jest": "^29",
Expand Down
211 changes: 194 additions & 17 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
sendSignal,
sendTaskHeartbeat,
signal,
socket,
subscription,
task,
time,
Expand All @@ -36,6 +37,7 @@ import {
} from "@eventual/core";
import type openapi from "openapi3-ts";
import { Readable } from "stream";
import { WebSocket } from "ws";
import z from "zod";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";

Expand Down Expand Up @@ -1378,23 +1380,198 @@ export const searchBlog = command(
}
);

// check types of entity
function streamShouldHaveEmptyIfNoInclude() {
counter.stream("", {}, (item) => {
if (item.operation === "modify") {
// @ts-expect-error - no oldValue without includeOld: true
item.oldValue!.namespace;
/**
* Socket Test!
*
* 1. a command start a workflow that will communicate with the sockets.
* 2. the command starts 2 web sockets against socket1
* 3. on each connection, the socket will send a signal to the workflow
* 4. when the workflow get the connection, it will send a start signal to the socket
* 5. the socket will send a "n" (local id for the connection) and value to the socket, which will forward the message to the workflow
* 6. when the workflow has received 2 connections with n and v values, it will send a message to each socket with the n and v incremented by 1
* 7. the socket will resolve the value in the connection, completing the test
*/

interface SocketMessage {
id: string;
v: number;
}

export const socket1 = socket("socket1", {
$connect: async ({ connectionId, query }) => {
console.log(query);
const { id, n } = (query ?? {}) as { n?: string; id?: string };
if (!id || !n) {
throw new Error("Missing ID");
}
});
counter.stream(
"",
{
includeOld: true,
},
(item) => {
if (item.operation === "modify") {
item.oldValue.namespace;
}
console.log("sending signal to", id);
await socketConnectSignal.sendSignal(id, {
connectionId,
n: Number(n),
});
console.log("signal sent to", id);
},
$disconnect: async () => undefined,
$default: async ({ connectionId, body }) => {
console.log(body);
const data = body ? (JSON.parse(body) as SocketMessage) : undefined;
console.log("data", data);
if (!data) {
throw new Error("Expected data");
}
);
console.log("sending signal to", data.id);
await socketMessageSignal.sendSignal(data.id, {
...data,
connectionId,
});
console.log("signal sent to", data.id);
},
});

export const socketConnectSignal = signal<{ connectionId: string; n: number }>(
"socketConnectSignal"
);
export const socketMessageSignal = signal<{
connectionId: string;
v: number;
}>("socketMessageSignal");

interface StartSocketEvent {
type: "start";
n: number;
v: number;
}

interface DataSocketEvent {
type: "data";
n: number;
v: number;
}

export const socketWorkflow = workflow(
"socketWorkflow",
{ timeout: duration(15, "minutes") },
async () => {
const connections: Record<string, { n: number; v?: number }> = {};

socketConnectSignal.onSignal(async ({ connectionId, n }) => {
connections[connectionId] = { n };
await socket1.send(
connectionId,
JSON.stringify({
type: "start",
n,
v: n + 1,
} satisfies StartSocketEvent)
);
});

socketMessageSignal.onSignal(({ connectionId, v }) => {
const entry = connections[connectionId];
if (entry) {
entry.v = v;
}
});

await condition(
() =>
Object.keys(connections).length > 1 &&
Object.values(connections).every((c) => c.v !== undefined)
);

await Promise.all(
Object.entries(connections).map(async ([connectionId, { n, v }]) => {
await socket1.send(
connectionId,
JSON.stringify({
type: "data",
n,
v: (v ?? 0) + 1,
} satisfies DataSocketEvent)
);
})
);

// close the connections
await Promise.all(
Object.entries(connections).map(async ([connectionId]) => {
await socket1.delete(connectionId);
})
);
}
);

export const socketTest = command(
"socketTest",
{ handlerTimeout: duration(5, "minute") },
async () => {
// start workflow that waits for connections
const { executionId } = await socketWorkflow.startExecution({
input: undefined,
});
const encodedId = encodeURIComponent(executionId);

console.log("pre-socket");

const ws1 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=0`);
const ws2 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=1`);

console.log("setup-socket");

const running1 = setupWS(executionId, ws1);
const running2 = setupWS(executionId, ws2);

console.log("waiting...");

return await Promise.all([running1, running2]);
}
);

function setupWS(executionId: string, ws: WebSocket) {
let n: number | undefined;
let v: number | undefined;
return new Promise<number>((resolve, reject) => {
ws.on("error", (err) => {
console.log("error", err);
reject(err);
});
ws.on("message", (data) => {
try {
console.log(n, "message");
const d = (data as Buffer).toString("utf8");
console.log(d);
const event = JSON.parse(d) as StartSocketEvent | DataSocketEvent;
if (event.type === "start") {
n = event.n;
// after connecting, we will send our "n" and incremented "value" back.
ws.send(
JSON.stringify({
id: executionId,
v: event.v + 1,
} satisfies SocketMessage)
);
} else if (event.type === "data") {
v = event.v;
} else {
console.log("unexpected event", event);
reject(event);
}
} catch (err) {
console.error(err);
reject(err);
}
});
ws.on("close", (code, reason) => {
try {
console.log(code, reason.toString("utf-8"));
console.log(n, "close", v);
if (n === undefined) {
throw new Error("n was not set");
}
resolve(v ?? -1);
} catch (err) {
reject(err);
}
});
});
}
10 changes: 10 additions & 0 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ test("test service context", async () => {
});
});

test("socket test", async () => {
const rpcResponse = await (
await fetch(`${url}/${commandRpcPath({ name: "socketTest" })}`, {
method: "POST",
})
).json();

expect(rpcResponse).toEqual([3, 4]);
});

if (!process.env.TEST_LOCAL) {
test("index.search", async () => {
const serviceClient = new ServiceClient<typeof TestService>({
Expand Down
3 changes: 3 additions & 0 deletions packages/@eventual/aws-cdk/src/entity-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ export class EntityService<Service> {
this.configureReadWriteEntityTable(this.transactionWorker);
props.workflowService.configureSendSignal(this.transactionWorker);
props.eventService.configureEmit(this.transactionWorker);
props.socketService.configureInvokeSocketEndpoints(
this.transactionWorker
);
}
}

Expand Down
Loading

0 comments on commit a9cffa9

Please sign in to comment.