Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: log errors in entity stream worker, fix includeOld types, do not remove key from oldValue #440

Merged
merged 6 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1286,3 +1286,24 @@ export const searchBlog = command(
};
}
);

// check types of entity
function streamShouldHaveEmptyIfNoInclude() {
counter.stream("", {}, (item) => {
if (item.operation === "modify") {
// @ts-expect-error - no oldValue without includeOld: true
item.oldValue!.namespace;
}
});
counter.stream(
"",
{
includeOld: true,
},
(item) => {
if (item.operation === "modify") {
item.oldValue.namespace;
}
}
);
}
1 change: 1 addition & 0 deletions packages/@eventual/aws-cdk/src/entity-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ export class EntityStream extends Construct implements EventualResource {
: Duration.seconds(0),
reportBatchItemFailures: true,
startingPosition: StartingPosition.TRIM_HORIZON,

...(filters.length > 0 ? { filters } : {}),
}),
],
Expand Down
25 changes: 13 additions & 12 deletions packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ export default (async (event) => {
const normalizedKey = normalizeCompositeKey(entity, bestValue);
const keyMap = convertNormalizedEntityKeyToMap(normalizedKey);

if (newValue) {
delete newValue[EntityEntityRecord.VERSION_FIELD];
delete newValue[normalizedKey.partition.keyAttribute];
if (normalizedKey.sort) {
delete newValue[normalizedKey.sort.keyAttribute];
}
}
if (oldValue) {
delete oldValue[EntityEntityRecord.VERSION_FIELD];
delete oldValue[normalizedKey.partition.keyAttribute];
if (normalizedKey.sort) {
delete oldValue[normalizedKey.sort.keyAttribute];
removeSyntheticKey(newValue);
removeSyntheticKey(oldValue);

function removeSyntheticKey(value: Record<string, any> | undefined) {
if (value) {
delete value[EntityEntityRecord.VERSION_FIELD];
if (normalizedKey.partition.attributes.length > 0) {
delete value[normalizedKey.partition.keyAttribute];
}
if (normalizedKey.sort && normalizedKey.sort.attributes.length > 0) {
delete value[normalizedKey.sort.keyAttribute];
}
}
}

Expand All @@ -104,6 +104,7 @@ export default (async (event) => {
newValue: newValue as any,
newVersion,
operation,
// @ts-ignore
oldValue,
oldVersion,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ export function createEntityStreamWorker(
if (result !== false) {
continue;
}
} catch {}
} catch (err) {
console.error(err);
}
// if the handler fails or returns false, return the rest of the items
return itemGroup.slice(Number(i)).map((i) => i.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class LocalEntityStore extends EntityStore {
item: {
key: convertNormalizedEntityKeyToMap(key),
operation: "remove" as const,
oldValue: item.value,
oldValue: item.value as any,
oldVersion: item.version,
} as EntityStreamItem,
});
Expand Down
7 changes: 4 additions & 3 deletions packages/@eventual/core/src/entity/entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ export interface Entity<
): EntityIndexMapper<Name, Attr, Partition, IndexPartition, IndexSort>;
stream<
Name extends string = string,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
>(
name: Name,
options: EntityStreamOptions<Attr, Partition, Sort, Operations>,
handler: EntityStreamHandler<Attr, Partition, Sort, Operations>
options: EntityStreamOptions<Attr, Partition, Sort, Operations, IncludeOld>,
handler: EntityStreamHandler<Attr, Partition, Sort, Operations, IncludeOld>
): EntityStream<Name, Attr, Partition, Sort>;
stream<Name extends string = string>(
name: string,
Expand Down
24 changes: 14 additions & 10 deletions packages/@eventual/core/src/entity/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ export interface EntityStreamHandler<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> {
/**
* Provides the keys, new value
*/
(
item: EntityStreamItem<Attr, Partition, Sort, Operations>,
item: EntityStreamItem<Attr, Partition, Sort, Operations, IncludeOld>,
context: EntityStreamContext
): Promise<void | false> | void | false;
}
Expand Down Expand Up @@ -76,11 +77,12 @@ export type EntityStreamItem<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> = (
| EntityStreamInsertItem<Attr, Partition, Sort>
| EntityStreamModifyItem<Attr, Partition, Sort>
| EntityStreamRemoveItem<Attr, Partition, Sort>
| EntityStreamModifyItem<Attr, Partition, Sort, IncludeOld>
| EntityStreamRemoveItem<Attr, Partition, Sort, IncludeOld>
) & { id: string; operation: Operations[number] };

export interface EntityStreamInsertItem<
Expand All @@ -100,24 +102,26 @@ export interface EntityStreamModifyItem<
Partition extends EntityCompositeKeyPart<Attr> = EntityCompositeKeyPart<Attr>,
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined
| undefined,
IncludeOld extends boolean = false
> extends EntityStreamItemBase<Attr, Partition, Sort> {
operation: "modify";
newValue: Attr;
newVersion: number;
oldValue?: Attr;
oldVersion?: number;
oldValue: IncludeOld extends true ? Attr : undefined;
oldVersion: number;
}

export interface EntityStreamRemoveItem<
Attr extends Attributes = Attributes,
Partition extends EntityCompositeKeyPart<Attr> = EntityCompositeKeyPart<Attr>,
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined
| undefined,
IncludeOld extends boolean = false
> extends EntityStreamItemBase<Attr, Partition, Sort> {
operation: "remove";
oldValue?: Attr;
oldValue?: IncludeOld extends true ? Attr : undefined;
oldVersion?: number;
}

Expand Down
5 changes: 3 additions & 2 deletions packages/@eventual/core/src/internal/service-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ export interface EntityStreamOptions<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> extends FunctionRuntimeProps {
/**
* A list of operations to be send to the stream.
Expand All @@ -238,7 +239,7 @@ export interface EntityStreamOptions<
/**
* When true, the old value will be sent with the new value.
*/
includeOld?: boolean;
includeOld?: IncludeOld;
/**
* One or more key queries that will be included in the stream.
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/@eventual/core/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ export function task<Name extends string, Input = any, Output = any>(
return sendTaskHeartbeat(request.taskToken);
};
func.sourceLocation = sourceLocation;
func.options = opts;
func.kind = "Task";

// @ts-ignore
func.handler = handler;
Expand Down