Skip to content

Commit

Permalink
feat: socket
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos committed Sep 12, 2023
1 parent 8f6feb0 commit 172358e
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 15 deletions.
25 changes: 25 additions & 0 deletions packages/@eventual/aws-cdk/src/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
EntityStreamSpec,
EVENTUAL_SYSTEM_COMMAND_NAMESPACE,
ServiceType,
SocketSpec,
SubscriptionSpec,
TaskSpec,
} from "@eventual/core/internal";
Expand Down Expand Up @@ -91,6 +92,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
monoEntityStreamWorkerFunction,
monoBucketHandlerWorkerFunction,
transactionWorkerFunction,
monoSocketWorkerFunction,
],
[
// also bundle each of the internal eventual API Functions as they have no dependencies
Expand Down Expand Up @@ -124,6 +126,7 @@ export async function buildService(request: BuildAWSRuntimeProps) {
name: "default",
},
},
sockets: await bundleSocketHandlers(serviceSpec.sockets),
search: serviceSpec.search,
entities: {
entities: await Promise.all(
Expand Down Expand Up @@ -293,6 +296,24 @@ export async function buildService(request: BuildAWSRuntimeProps) {
);
}

async function bundleSocketHandlers(specs: SocketSpec[]) {
return await Promise.all(
specs.map(async (spec) => {
return {
entry: await bundleFile(
specPath,
spec,
"socket",
"socket-worker",
spec.name,
monoSocketWorkerFunction!
),
spec,
};
})
);
}

async function bundleFile<
Spec extends CommandSpec | SubscriptionSpec | TaskSpec
>(
Expand Down Expand Up @@ -349,6 +370,10 @@ export async function buildService(request: BuildAWSRuntimeProps) {
name: ServiceType.TransactionWorker,
entry: runtimeHandlersEntrypoint("transaction-worker"),
},
{
name: ServiceType.SocketWorker,
entry: runtimeHandlersEntrypoint("socket-worker"),
},
]
.map((s) => ({
...s,
Expand Down
123 changes: 123 additions & 0 deletions packages/@eventual/aws-cdk/src/socket-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { IWebSocketApi, WebSocketApi } from "@aws-cdk/aws-apigatewayv2-alpha";
import { WebSocketLambdaIntegration } from "@aws-cdk/aws-apigatewayv2-integrations-alpha";
import { ENV_NAMES, socketServiceSocketName } from "@eventual/aws-runtime";
import { SocketFunction } from "@eventual/core-runtime";
import { IPrincipal } from "aws-cdk-lib/aws-iam";
import type { Function, FunctionProps } from "aws-cdk-lib/aws-lambda";
import { Duration } from "aws-cdk-lib/core";
import { Construct } from "constructs";
import type openapi from "openapi3-ts";
import { SpecHttpApiProps } from "./constructs/spec-http-api.js";
import { EventualResource } from "./resource.js";
import { WorkerServiceConstructProps } from "./service-common.js";
import { ServiceFunction } from "./service-function.js";
import { ServiceLocal } from "./service.js";
import { ServiceEntityProps } from "./utils.js";
import { DeepCompositePrincipal } from "./deep-composite-principal.js";

export type ApiOverrides = Omit<SpecHttpApiProps, "apiDefinition">;

export type Sockets<Service> = ServiceEntityProps<Service, "Socket", Socket>;

export type SocketOverrides<Service> = Partial<
ServiceEntityProps<Service, "Socket", SocketHandlerProps>
>;

export interface SocketsProps<Service = any>
extends WorkerServiceConstructProps {
local: ServiceLocal | undefined;
openApi: {
info: openapi.InfoObject;
};
overrides?: SocketOverrides<Service>;
}

/**
* Properties that can be overridden for an individual API handler Function.
*/
export type SocketHandlerProps = Partial<
Omit<FunctionProps, "code" | "runtime" | "functionName">
>;

export class SocketService<Service = any> {
/**
* API Gateway for providing service api
*/
public readonly sockets: Sockets<Service>;

constructor(props: SocketsProps<Service>) {
const socketsScope = new Construct(props.serviceScope, "Sockets");

this.sockets = Object.fromEntries(
Object.entries(props.build.sockets).map(([name, socket]) => [
name,
new Socket(socketsScope, {
serviceProps: props,
socketService: this,
socket,
local: props.local,
}),
])
) as Sockets<Service>;
}
}

interface SocketProps {
serviceProps: SocketsProps<any>;
socketService: SocketService<any>;
socket: SocketFunction;
local: ServiceLocal | undefined;
}

class Socket extends Construct implements EventualResource {
public grantPrincipal: IPrincipal;
public gateway: IWebSocketApi;
public handler: Function;

constructor(scope: Construct, props: SocketProps) {
const socketName = props.socket.spec.name;

super(scope, socketName);

this.handler = new ServiceFunction(this, "DefaultHandler", {
build: props.serviceProps.build,
bundledFunction: props.socket,
functionNameSuffix: `socket-${socketName}-default`,
serviceName: props.serviceProps.serviceName,
defaults: {
timeout: Duration.minutes(1),
environment: {
[ENV_NAMES.SOCKET_NAME]: socketName,
...props.serviceProps.environment,
},
},
runtimeProps: props.socket.spec.options,
overrides: props.serviceProps.overrides?.[socketName],
});

const integration = new WebSocketLambdaIntegration("default", this.handler);

this.gateway = new WebSocketApi(this, "Gateway", {
apiName: socketServiceSocketName(
props.serviceProps.serviceName,
socketName
),
defaultRouteOptions: {
integration,
},
connectRouteOptions: {
integration,
},
disconnectRouteOptions: {
integration,
},
});

this.grantPrincipal = props.local
? new DeepCompositePrincipal(
this.handler.grantPrincipal,
props.local.environmentRole
)
: this.handler.grantPrincipal;
}
}
2 changes: 2 additions & 0 deletions packages/@eventual/aws-runtime/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const ENV_NAMES = {
WORKFLOW_EXECUTION_LOG_GROUP_NAME:
"EVENTUAL_WORKFLOW_EXECUTION_LOG_GROUP_NAME",
DEFAULT_LOG_LEVEL: "EVENTUAL_LOG_LEVEL",
SOCKET_NAME: "EVENTUAL_SOCKET_NAME",
ENTITY_NAME: "EVENTUAL_ENTITY_NAME",
ENTITY_STREAM_NAME: "EVENTUAL_ENTITY_STREAM_NAME",
BUCKET_NAME: "EVENTUAL_BUCKET_NAME",
Expand Down Expand Up @@ -60,6 +61,7 @@ export const serviceLogGroupName = () =>
export const serviceUrl = () => tryGetEnv<string>(ENV_NAMES.SERVICE_URL);
export const defaultLogLevel = () =>
tryGetEnv<LogLevel>(ENV_NAMES.DEFAULT_LOG_LEVEL) ?? LogLevel.INFO;
export const socketName = () => tryGetEnv(ENV_NAMES.SOCKET_NAME);
export const entityName = () => tryGetEnv(ENV_NAMES.ENTITY_NAME);
export const entityStreamName = () => tryGetEnv(ENV_NAMES.ENTITY_STREAM_NAME);
export const bucketName = () => tryGetEnv(ENV_NAMES.BUCKET_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export default {
transactions: [],
events: [],
commands: [],
sockets: [],
tasks: [],
subscriptions: [],
buckets: { buckets: [] },
Expand Down
22 changes: 22 additions & 0 deletions packages/@eventual/aws-runtime/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ export function serviceFunctionName(serviceName: string, suffix: string) {
);
}

export function serviceWebSocketName(serviceName: string, suffix: string) {
const serviceNameAndSeparatorLength = serviceName.length + 1;
const remaining = 128 - serviceNameAndSeparatorLength;
return sanitizeFunctionName(
`${serviceName}-${suffix.substring(0, remaining)}`
);
}

/**
* Bucket names must:
* * be between 3 and 63 characters long (inc)
Expand Down Expand Up @@ -258,6 +266,10 @@ export function bucketServiceBucketSuffix(bucketName: string) {
return `bucket-${bucketName}`;
}

export function socketServiceSocketSuffix(socketName: string) {
return `socket-${socketName}`;
}

export function taskServiceFunctionName(
serviceName: string,
taskId: string
Expand All @@ -282,6 +294,16 @@ export function bucketServiceBucketName(
return serviceBucketName(serviceName, bucketServiceBucketSuffix(bucketName));
}

export function socketServiceSocketName(
serviceName: string,
socketName: string
) {
return serviceWebSocketName(
serviceName,
socketServiceSocketSuffix(socketName)
);
}

/**
* Valid lambda function names contains letters, numbers, dash, or underscore and no spaces.
*/
Expand Down
4 changes: 4 additions & 0 deletions packages/@eventual/core-runtime/src/build-manifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
TaskSpec,
TransactionSpec,
IndexSpec,
SocketSpec,
} from "@eventual/core/internal";

export interface BuildManifest {
Expand All @@ -23,6 +24,7 @@ export interface BuildManifest {
* The events and their schema.
*/
events: EventSpec[];
sockets: SocketFunction[];
/**
* All subscriptions to events declared within the service.
*/
Expand Down Expand Up @@ -114,3 +116,5 @@ export type EntityStreamFunction = BundledFunction<EntityStreamSpec>;

export type BucketNotificationHandlerFunction =
BundledFunction<BucketNotificationHandlerSpec>;

export type SocketFunction = BundledFunction<SocketSpec>;
25 changes: 10 additions & 15 deletions packages/@eventual/core/src/http/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { CommandSpec, isSourceLocation } from "../internal/service-spec.js";
import type { ServiceContext } from "../service.js";
import type { Middleware } from "./middleware.js";
import type { ParsePath } from "./path.js";
import { parseArgs } from "../internal/util.js";

export interface CommandContext {
service: ServiceContext;
Expand Down Expand Up @@ -209,7 +210,7 @@ export function command<
Output = void,
Context extends CommandContext = CommandContext
>(...args: any[]): Command<Name, Input, Output, Context, any, any> {
const [sourceLocation, name, options, handler] = parseCommandArgs<
const { sourceLocation, name, options, handler } = parseCommandArgs<
Name,
Input,
Output
Expand All @@ -236,18 +237,12 @@ export function parseCommandArgs<
Output = void,
Context extends CommandContext = CommandContext
>(args: any[]) {
return [
// TODO: is this 4x scan too inefficient, or is the trade-off between simplicity and performance worth it here?
// i think it would be marginal looping over a small array multiple times but i could be wrong
args.find(isSourceLocation),
args.find((a): a is Name => typeof a === "string")!,
args.find((a) => typeof a === "object" && !isSourceLocation(a)) as
| CommandOptions<Input, Output, any, any>
| undefined,
args.find((a) => typeof a === "function") as CommandHandler<
Input,
Output,
Context
>,
] as const;
return parseArgs(args, {
sourceLocation: isSourceLocation,
name: (a: any): a is Name => typeof a === "string",
options: (a: any): a is CommandOptions<Input, Output, any, any> =>
typeof a === "object" && !isSourceLocation(a),
handler: (a: any): a is CommandHandler<Input, Output, Context> =>
typeof a === "function",
});
}
10 changes: 10 additions & 0 deletions packages/@eventual/core/src/internal/service-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface ServiceSpec {
transactions: TransactionSpec[];
tasks: TaskSpec[];
commands: CommandSpec<any, any, any, any>[];
sockets: SocketSpec<any>[];
/**
* Open API 3 schema definitions for all known Events in this Service.
*/
Expand Down Expand Up @@ -289,3 +290,12 @@ export interface EntityIndexSpec<Name extends string = string> {
export interface TransactionSpec<Name extends string = string> {
name: Name;
}

export type SocketOptions = FunctionRuntimeProps;

export interface SocketSpec<Name extends string = string> {
name: Name;
path: string;
sourceLocation?: SourceLocation;
options: SocketOptions;
}
1 change: 1 addition & 0 deletions packages/@eventual/core/src/internal/service-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PropertyKind, createEventualProperty } from "./properties.js";

export enum ServiceType {
CommandWorker = "CommandWorker",
SocketWorker = "SocketWorker",
Subscription = "Subscription",
OrchestratorWorker = "OrchestratorWorker",
EntityStreamWorker = "EntityStreamWorker",
Expand Down
15 changes: 15 additions & 0 deletions packages/@eventual/core/src/internal/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,18 @@ export function or<F extends ((a: any) => a is any)[]>(
export function encodeExecutionId(executionId: string) {
return Buffer.from(executionId, "utf-8").toString("base64");
}

export function parseArgs<Args extends Record<string, any | undefined>>(
args: any[],
predicates: Predicates<Args>
): Args {
return Object.fromEntries(
Object.entries(predicates).map(
([name, predicate]) => [name, args.find(predicate)] as const
)
) as Args;
}

export type Predicates<Args extends Record<string, any | undefined>> = {
[arg in keyof Args]: (value: any) => value is Args[arg];
};
Loading

0 comments on commit 172358e

Please sign in to comment.