Skip to content

Commit

Permalink
feat: refactor call exec (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Aug 11, 2023
1 parent b02c774 commit 3121cdf
Show file tree
Hide file tree
Showing 126 changed files with 4,288 additions and 3,842 deletions.
2 changes: 0 additions & 2 deletions packages/@eventual/aws-cdk/src/workflow-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ export class WorkflowService {
"WorkflowService"
);

// TODO: move in the table

this.logGroup =
props.overrides?.logGroup ??
new LogGroup(props.serviceScope, "WorkflowExecutionLogs", {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { OpenSearchClient } from "@eventual/core-runtime";
import {
Client,
Connection,
NodeOptions,
} from "@opensearch-project/opensearch";
import aws4 from "aws4";

export class AWSOpenSearchClient {
export class AWSOpenSearchClient implements OpenSearchClient {
public readonly client: Client;
constructor({
node,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { EventualServiceClient, HttpMethod, HttpRequest } from "@eventual/core";
import {
HttpMethod,
HttpRequest,
type EventualServiceClient,
} from "@eventual/core";
import {
createCommandWorker,
getLazy,
LazyValue,
registerWorkerIntrinsics,
type CommandWorker,
type ApiHandlerDependencies,
} from "@eventual/core-runtime";
import type { ServiceSpec } from "@eventual/core/internal";
import type {
APIGatewayProxyEventV2,
APIGatewayProxyHandlerV2,
Expand All @@ -19,17 +21,17 @@ import { Buffer } from "buffer";
* Each webhook registers routes on the central router that
* then handles the request.
*/
export function createApiGCommandAdaptor({
commandWorker,
export function createApiGCommandWorker({
serviceClientBuilder,
serviceSpec,
serviceName: _serviceName,
...deps
}: {
commandWorker: CommandWorker;
serviceName: LazyValue<string>;
serviceSpec?: ServiceSpec;
serviceClientBuilder?: (serviceUrl: string) => EventualServiceClient;
}): APIGatewayProxyHandlerV2 {
} & Omit<
ApiHandlerDependencies,
"serviceClient" | "serviceUrl"
>): APIGatewayProxyHandlerV2 {
const serviceName = getLazy(_serviceName);
return async function (
event: APIGatewayProxyEventV2
Expand All @@ -40,15 +42,17 @@ export function createApiGCommandAdaptor({
const serviceClient = serviceClientBuilder
? serviceClientBuilder(serviceUrl)
: undefined;
registerWorkerIntrinsics({
openSearchClient: undefined,
bucketStore: undefined,
entityStore: undefined,

const commandWorker = createCommandWorker({
bucketStore: deps.bucketStore,
entityStore: deps.entityStore,
openSearchClient: deps.openSearchClient,
serviceClient,
serviceSpec,
serviceName,
serviceUrl,
serviceName: getLazy(serviceName),
});

const requestBody = event.body
? event.isBase64Encoded
? Buffer.from(event.body, "base64")
Expand Down
22 changes: 8 additions & 14 deletions packages/@eventual/aws-runtime/src/handlers/command-worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import "@eventual/injected/entry";
import serviceSpec from "@eventual/injected/spec";

import { createCommandWorker } from "@eventual/core-runtime";
import {
createBucketStore,
createEntityStore,
Expand All @@ -10,32 +9,27 @@ import {
createServiceClient,
createTransactionClient,
} from "../create.js";
import { createApiGCommandAdaptor } from "./apig-command-adapter.js";
import { serviceName } from "../env.js";
import { createApiGCommandWorker } from "./apig-command-worker.js";

/**
* Handle inbound command and rest api requests.
*
* Each command registers routes on the central router that
* then handles the request.
*/
export default createApiGCommandAdaptor({
commandWorker: createCommandWorker({
bucketStore: createBucketStore(),
entityStore: createEntityStore(),
openSearchClient: await createOpenSearchClient(),
// the service client, spec, and service url will be created at runtime, using a computed uri from the apigateway request
serviceClient: undefined,
serviceSpec: undefined,
serviceName,
}),
serviceName,
serviceSpec,
export default createApiGCommandWorker({
bucketStore: createBucketStore(),
entityStore: createEntityStore(),
openSearchClient: await createOpenSearchClient(),
// the service client, spec, and service url will be created at runtime, using a computed uri from the apigateway request
// pulls the service url from the request instead of env variables to reduce the circular dependency between commands and the gateway.
serviceClientBuilder: (serviceUrl) =>
createServiceClient({
serviceUrl,
eventClient: createEventClient(),
transactionClient: createTransactionClient(),
}),
serviceName,
serviceSpec,
});
5 changes: 3 additions & 2 deletions packages/@eventual/aws-runtime/src/handlers/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import "@eventual/injected/entry";

import {
createDefaultWorkflowCallExecutor,
createOrchestrator,
ExecutionQueueEventEnvelope,
RemoteExecutorProvider,
WorkflowCallExecutor,
} from "@eventual/core-runtime";
import type { SQSEvent, SQSRecord } from "aws-lambda";
import { AWSMetricsClient } from "../clients/metrics-client.js";
Expand All @@ -30,13 +30,14 @@ import { serviceName } from "../env.js";
* from within an AWS Lambda Function attached to a SQS FIFO queue.
*/
const orchestrate = createOrchestrator({
bucketStore: createBucketStore(),
executionHistoryStore: createExecutionHistoryStore(),
executorProvider: new RemoteExecutorProvider({
executionHistoryStateStore: createExecutionHistoryStateStore(),
}),
logAgent: createLogAgent(),
metricsClient: AWSMetricsClient,
callExecutor: new WorkflowCallExecutor({
callExecutor: createDefaultWorkflowCallExecutor({
bucketStore: createBucketStore(),
entityStore: createEntityStore(),
eventClient: createEventClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import serviceSpec from "@eventual/injected/spec";

import type { AnyCommand } from "@eventual/core";
import {
createCommandWorker,
createEmitEventsCommand,
createExecuteTransactionCommand,
createGetExecutionCommand,
Expand All @@ -28,18 +27,15 @@ import {
createWorkflowClient,
} from "../create.js";
import { serviceName } from "../env.js";
import { createApiGCommandAdaptor } from "./apig-command-adapter.js";
import { createApiGCommandWorker } from "./apig-command-worker.js";

function systemCommandWorker(
..._commands: AnyCommand[]
): APIGatewayProxyHandlerV2<Response> {
return createApiGCommandAdaptor({
commandWorker: createCommandWorker({
bucketStore: createBucketStore(),
entityStore: undefined,
serviceClient: undefined,
serviceSpec: undefined,
}),
return createApiGCommandWorker({
bucketStore: createBucketStore(),
entityStore: undefined,
openSearchClient: undefined,
serviceSpec,
serviceName,
});
Expand Down
11 changes: 7 additions & 4 deletions packages/@eventual/aws-runtime/src/stores/bucket-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
S3Client,
} from "@aws-sdk/client-s3";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import {
import type {
Bucket,
BucketGeneratePresignedResult,
CopyBucketObjectOptions,
Expand All @@ -24,10 +24,11 @@ import {
PutBucketOptions,
} from "@eventual/core";
import {
BucketStore,
LazyValue,
computeDurationSeconds,
getLazy,
type BucketStore,
type LazyValue,
streamToBuffer,
} from "@eventual/core-runtime";
import { assertNever } from "@eventual/core/internal";
import { Readable } from "stream";
Expand All @@ -48,6 +49,7 @@ export interface AWSBucketStoreProps {

export class AWSBucketStore implements BucketStore {
constructor(private props: AWSBucketStoreProps) {}

public async get(
bucketName: string,
key: string,
Expand Down Expand Up @@ -117,7 +119,8 @@ export class AWSBucketStore implements BucketStore {
new PutObjectCommand({
Bucket: this.physicalName(bucketName),
Key: key,
Body: data,
// S3 requires the content length when given a stream, we'll just give them a buffer instead
Body: data instanceof Readable ? await streamToBuffer(data) : data,
CacheControl: options?.cacheControl,
ContentEncoding: options?.contentEncoding,
ContentMD5: options?.contentMD5,
Expand Down
50 changes: 33 additions & 17 deletions packages/@eventual/cli/src/commands/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ import {
isSucceededExecution,
} from "@eventual/core";
import {
AllPropertyRetriever,
Result,
UnsupportedPropertyRetriever,
WorkflowExecutor,
isFailed,
isResolved,
normalizeFailedResult,
parseWorkflowName,
resultToString,
runExecutor,
serviceTypeScope,
} from "@eventual/core-runtime";
import {
Result,
ServiceType,
getEventualResource,
} from "@eventual/core/internal";
import { ServiceType, getEventualResource } from "@eventual/core/internal";
import { discoverEventualConfig } from "@eventual/project";
import path from "path";
import { Argv } from "yargs";
Expand All @@ -43,7 +41,12 @@ export const replay = (yargs: Argv) =>
type: "string",
}),
serviceAction(
async (spinner, serviceClient, { entry, execution, service }) => {
async (
spinner,
serviceClient,
{ entry, execution, service },
{ serviceName, serviceData }
) => {
spinner.start("Constructing replay...");
const config = await discoverEventualConfig();

Expand Down Expand Up @@ -79,19 +82,32 @@ export const replay = (yargs: Argv) =>
}
spinner.start("Running program");

await serviceTypeScope(ServiceType.OrchestratorWorker, async () => {
const executor = new WorkflowExecutor<any, any, any>(
workflow,
events
);
const unsupportedPropertyRetriever = new UnsupportedPropertyRetriever(
"Replay Workflow Executor"
);

const executor = new WorkflowExecutor<any, any, any>(
workflow,
events,
// TODO: these properties should come from the history
new AllPropertyRetriever({
ServiceClient: serviceClient,
ServiceName: serviceName ?? unsupportedPropertyRetriever,
OpenSearchClient: unsupportedPropertyRetriever,
BucketPhysicalName: unsupportedPropertyRetriever,
ServiceSpec: unsupportedPropertyRetriever,
ServiceType: ServiceType.OrchestratorWorker,
ServiceUrl: serviceData.apiEndpoint ?? unsupportedPropertyRetriever,
TaskToken: unsupportedPropertyRetriever,
})
);

const res = await runExecutor(executor, [], new Date());
const res = await runExecutor(executor, [], new Date());

assertExpectedResult(executionObj, res.result);
assertExpectedResult(executionObj, res.result);

spinner.succeed();
process.stdout.write(`result: ${resultToString(res.result)}\n`);
});
spinner.succeed();
process.stdout.write(`result: ${resultToString(res.result)}\n`);
}
)
);
Expand Down
Loading

0 comments on commit 3121cdf

Please sign in to comment.