From 169f91de370e4efd852ab8f0b52471015e0b10df Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 09:40:31 -0700 Subject: [PATCH 1/6] fix: log errors in entity stream worker --- .../core-runtime/src/handlers/entity-stream-worker.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/@eventual/core-runtime/src/handlers/entity-stream-worker.ts b/packages/@eventual/core-runtime/src/handlers/entity-stream-worker.ts index af3dfa6f..0eca90c2 100644 --- a/packages/@eventual/core-runtime/src/handlers/entity-stream-worker.ts +++ b/packages/@eventual/core-runtime/src/handlers/entity-stream-worker.ts @@ -60,7 +60,9 @@ export function createEntityStreamWorker( if (result !== false) { continue; } - } catch {} + } catch (err) { + console.error(err); + } // if the handler fails or returns false, return the rest of the items return itemGroup.slice(Number(i)).map((i) => i.id); } From 200c939ede5b17d91eb60944dcd16c6a6192814e Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 10:18:14 -0700 Subject: [PATCH 2/6] fix: accurately type includeOld --- apps/tests/aws-runtime/test/test-service.ts | 22 +++++++++++++++++ .../@eventual/aws-cdk/src/entity-service.ts | 1 + .../src/handlers/entity-stream-worker.ts | 1 + .../src/local/stores/entity-store.ts | 2 +- packages/@eventual/core/src/entity/entity.ts | 7 +++--- packages/@eventual/core/src/entity/stream.ts | 24 +++++++++++-------- .../core/src/internal/service-spec.ts | 5 ++-- 7 files changed, 46 insertions(+), 16 deletions(-) diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index d7b2a24a..9b6926fb 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -616,6 +616,7 @@ export const counterWatcher = counter.stream( console.log(item); if (item.operation === "remove") { const { n } = item.oldValue!; + item.operation; await entitySignal2.sendSignal(item.key.id, { n: n + 1 }); } else if (item.newValue.namespace === "default") { const { n } = item.newValue; @@ -1286,3 +1287,24 @@ export const searchBlog = command( }; } ); + +// check types of entity +function streamShouldHaveEmptyIfNoInclude() { + counter.stream("", {}, (item) => { + if (item.operation === "modify") { + // @ts-expect-error - no oldValue without includeOld: true + item.oldValue!.namespace; + } + }); + counter.stream( + "", + { + includeOld: true, + }, + (item) => { + if (item.operation === "modify") { + item.oldValue.namespace; + } + } + ); +} diff --git a/packages/@eventual/aws-cdk/src/entity-service.ts b/packages/@eventual/aws-cdk/src/entity-service.ts index 63bed1e0..d48d148e 100644 --- a/packages/@eventual/aws-cdk/src/entity-service.ts +++ b/packages/@eventual/aws-cdk/src/entity-service.ts @@ -368,6 +368,7 @@ export class EntityStream extends Construct implements EventualResource { : Duration.seconds(0), reportBatchItemFailures: true, startingPosition: StartingPosition.TRIM_HORIZON, + ...(filters.length > 0 ? { filters } : {}), }), ], diff --git a/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts b/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts index f5866dee..ebbc3f97 100644 --- a/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts +++ b/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts @@ -104,6 +104,7 @@ export default (async (event) => { newValue: newValue as any, newVersion, operation, + // @ts-ignore oldValue, oldVersion, }; diff --git a/packages/@eventual/core-runtime/src/local/stores/entity-store.ts b/packages/@eventual/core-runtime/src/local/stores/entity-store.ts index 65b9e618..bacf845b 100644 --- a/packages/@eventual/core-runtime/src/local/stores/entity-store.ts +++ b/packages/@eventual/core-runtime/src/local/stores/entity-store.ts @@ -131,7 +131,7 @@ export class LocalEntityStore extends EntityStore { item: { key: convertNormalizedEntityKeyToMap(key), operation: "remove" as const, - oldValue: item.value, + oldValue: item.value as any, oldVersion: item.version, } as EntityStreamItem, }); diff --git a/packages/@eventual/core/src/entity/entity.ts b/packages/@eventual/core/src/entity/entity.ts index f58d0713..29c2bf27 100644 --- a/packages/@eventual/core/src/entity/entity.ts +++ b/packages/@eventual/core/src/entity/entity.ts @@ -153,11 +153,12 @@ export interface Entity< ): EntityIndexMapper; stream< Name extends string = string, - Operations extends EntityStreamOperation[] = EntityStreamOperation[] + Operations extends EntityStreamOperation[] = EntityStreamOperation[], + IncludeOld extends boolean = false >( name: Name, - options: EntityStreamOptions, - handler: EntityStreamHandler + options: EntityStreamOptions, + handler: EntityStreamHandler ): EntityStream; stream( name: string, diff --git a/packages/@eventual/core/src/entity/stream.ts b/packages/@eventual/core/src/entity/stream.ts index 4274af99..d7723851 100644 --- a/packages/@eventual/core/src/entity/stream.ts +++ b/packages/@eventual/core/src/entity/stream.ts @@ -29,13 +29,14 @@ export interface EntityStreamHandler< Sort extends EntityCompositeKeyPart | undefined = | EntityCompositeKeyPart | undefined, - Operations extends EntityStreamOperation[] = EntityStreamOperation[] + Operations extends EntityStreamOperation[] = EntityStreamOperation[], + IncludeOld extends boolean = false > { /** * Provides the keys, new value */ ( - item: EntityStreamItem, + item: EntityStreamItem, context: EntityStreamContext ): Promise | void | false; } @@ -76,11 +77,12 @@ export type EntityStreamItem< Sort extends EntityCompositeKeyPart | undefined = | EntityCompositeKeyPart | undefined, - Operations extends EntityStreamOperation[] = EntityStreamOperation[] + Operations extends EntityStreamOperation[] = EntityStreamOperation[], + IncludeOld extends boolean = false > = ( | EntityStreamInsertItem - | EntityStreamModifyItem - | EntityStreamRemoveItem + | EntityStreamModifyItem + | EntityStreamRemoveItem ) & { id: string; operation: Operations[number] }; export interface EntityStreamInsertItem< @@ -100,13 +102,14 @@ export interface EntityStreamModifyItem< Partition extends EntityCompositeKeyPart = EntityCompositeKeyPart, Sort extends EntityCompositeKeyPart | undefined = | EntityCompositeKeyPart - | undefined + | undefined, + IncludeOld extends boolean = false > extends EntityStreamItemBase { operation: "modify"; newValue: Attr; newVersion: number; - oldValue?: Attr; - oldVersion?: number; + oldValue: IncludeOld extends true ? Attr : undefined; + oldVersion: number; } export interface EntityStreamRemoveItem< @@ -114,10 +117,11 @@ export interface EntityStreamRemoveItem< Partition extends EntityCompositeKeyPart = EntityCompositeKeyPart, Sort extends EntityCompositeKeyPart | undefined = | EntityCompositeKeyPart - | undefined + | undefined, + IncludeOld extends boolean = false > extends EntityStreamItemBase { operation: "remove"; - oldValue?: Attr; + oldValue?: IncludeOld extends true ? Attr : undefined; oldVersion?: number; } diff --git a/packages/@eventual/core/src/internal/service-spec.ts b/packages/@eventual/core/src/internal/service-spec.ts index 63ff19bc..f2d27c28 100644 --- a/packages/@eventual/core/src/internal/service-spec.ts +++ b/packages/@eventual/core/src/internal/service-spec.ts @@ -227,7 +227,8 @@ export interface EntityStreamOptions< Sort extends EntityCompositeKeyPart | undefined = | EntityCompositeKeyPart | undefined, - Operations extends EntityStreamOperation[] = EntityStreamOperation[] + Operations extends EntityStreamOperation[] = EntityStreamOperation[], + IncludeOld extends boolean = false > extends FunctionRuntimeProps { /** * A list of operations to be send to the stream. @@ -238,7 +239,7 @@ export interface EntityStreamOptions< /** * When true, the old value will be sent with the new value. */ - includeOld?: boolean; + includeOld?: IncludeOld; /** * One or more key queries that will be included in the stream. */ From 018df1f019cc24057ed8c29c41086bebf972e855 Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 10:20:37 -0700 Subject: [PATCH 3/6] fix: do not remove user's key from values --- .../src/handlers/entity-stream-worker.ts | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts b/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts index ebbc3f97..ce4333ce 100644 --- a/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts +++ b/packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts @@ -82,18 +82,18 @@ export default (async (event) => { const normalizedKey = normalizeCompositeKey(entity, bestValue); const keyMap = convertNormalizedEntityKeyToMap(normalizedKey); - if (newValue) { - delete newValue[EntityEntityRecord.VERSION_FIELD]; - delete newValue[normalizedKey.partition.keyAttribute]; - if (normalizedKey.sort) { - delete newValue[normalizedKey.sort.keyAttribute]; - } - } - if (oldValue) { - delete oldValue[EntityEntityRecord.VERSION_FIELD]; - delete oldValue[normalizedKey.partition.keyAttribute]; - if (normalizedKey.sort) { - delete oldValue[normalizedKey.sort.keyAttribute]; + removeSyntheticKey(newValue); + removeSyntheticKey(oldValue); + + function removeSyntheticKey(value: Record | undefined) { + if (value) { + delete value[EntityEntityRecord.VERSION_FIELD]; + if (normalizedKey.partition.attributes.length > 0) { + delete value[normalizedKey.partition.keyAttribute]; + } + if (normalizedKey.sort && normalizedKey.sort.attributes.length > 0) { + delete value[normalizedKey.sort.keyAttribute]; + } } } From 0a3ed6c00747f1cdbd0061422c1a1061c1b61d27 Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 10:37:04 -0700 Subject: [PATCH 4/6] refactor --- apps/tests/aws-runtime/test/test-service.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/tests/aws-runtime/test/test-service.ts b/apps/tests/aws-runtime/test/test-service.ts index 9b6926fb..d5bbe155 100644 --- a/apps/tests/aws-runtime/test/test-service.ts +++ b/apps/tests/aws-runtime/test/test-service.ts @@ -616,7 +616,6 @@ export const counterWatcher = counter.stream( console.log(item); if (item.operation === "remove") { const { n } = item.oldValue!; - item.operation; await entitySignal2.sendSignal(item.key.id, { n: n + 1 }); } else if (item.newValue.namespace === "default") { const { n } = item.newValue; From 59112eb1ab6335d8ca4328959e41a3da045dc441 Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 10:44:05 -0700 Subject: [PATCH 5/6] fix: set options on task --- packages/@eventual/core/src/task.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/@eventual/core/src/task.ts b/packages/@eventual/core/src/task.ts index f1fbd6f7..7ea59c1f 100644 --- a/packages/@eventual/core/src/task.ts +++ b/packages/@eventual/core/src/task.ts @@ -321,6 +321,7 @@ export function task( return sendTaskHeartbeat(request.taskToken); }; func.sourceLocation = sourceLocation; + func.options = opts; // @ts-ignore func.handler = handler; From 2f0188ace7c3eec14a4b11676bc41fa212dd0014 Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 7 Sep 2023 10:44:31 -0700 Subject: [PATCH 6/6] fix: set kind --- packages/@eventual/core/src/task.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/@eventual/core/src/task.ts b/packages/@eventual/core/src/task.ts index 7ea59c1f..c348a00b 100644 --- a/packages/@eventual/core/src/task.ts +++ b/packages/@eventual/core/src/task.ts @@ -322,6 +322,7 @@ export function task( }; func.sourceLocation = sourceLocation; func.options = opts; + func.kind = "Task"; // @ts-ignore func.handler = handler;