Skip to content

Commit

Permalink
IMN-797 Add events service v1 in catalog-platformstate-writer (#943)
Browse files Browse the repository at this point in the history
Co-authored-by: Stefano Hu <[email protected]>
  • Loading branch information
taglioni-r and shuyec authored Oct 4, 2024
1 parent 5ca0bc4 commit 0a94f11
Show file tree
Hide file tree
Showing 5 changed files with 912 additions and 13 deletions.
152 changes: 149 additions & 3 deletions packages/catalog-platformstate-writer/src/consumerServiceV1.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,149 @@
import { match } from "ts-pattern";
import { EServiceEventEnvelopeV1 } from "pagopa-interop-models";
import {
Descriptor,
descriptorState,
EServiceDescriptorV1,
EServiceEventEnvelopeV1,
EServiceId,
fromDescriptorV1,
makeGSIPKEServiceIdDescriptorId,
makePlatformStatesEServiceDescriptorPK,
missingKafkaMessageDataError,
PlatformStatesCatalogEntry,
unsafeBrandId,
} from "pagopa-interop-models";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
deleteCatalogEntry,
descriptorStateToItemState,
readCatalogEntry,
updateDescriptorStateInPlatformStatesEntry,
updateDescriptorStateInTokenGenerationStatesTable,
writeCatalogEntry,
} from "./utils.js";

export async function handleMessageV1(
message: EServiceEventEnvelopeV1,
_dynamoDBClient: DynamoDBClient
dynamoDBClient: DynamoDBClient
): Promise<void> {
await match(message)
.with({ type: "EServiceDescriptorUpdated" }, async (msg) => {
const eserviceId = unsafeBrandId<EServiceId>(msg.data.eserviceId);
const descriptor = parseDescriptor(msg.data.eserviceDescriptor, msg.type);

const eserviceDescriptorPK = makePlatformStatesEServiceDescriptorPK({
eserviceId,
descriptorId: descriptor.id,
});
await match(descriptor.state)
.with(descriptorState.published, async () => {
const existingCatalogEntry = await readCatalogEntry(
eserviceDescriptorPK,
dynamoDBClient
);

if (existingCatalogEntry) {
if (existingCatalogEntry.version > msg.version) {
// Stops processing if the message is older than the catalog entry
return Promise.resolve();
} else {
// suspended->published

await updateDescriptorStateInPlatformStatesEntry(
dynamoDBClient,
eserviceDescriptorPK,
descriptorStateToItemState(descriptor.state),
msg.version
);
}
} else {
// draft -> published

const catalogEntry: PlatformStatesCatalogEntry = {
PK: eserviceDescriptorPK,
state: descriptorStateToItemState(descriptor.state),
descriptorAudience: descriptor.audience,
descriptorVoucherLifespan: descriptor.voucherLifespan,
version: msg.version,
updatedAt: new Date().toISOString(),
};

await writeCatalogEntry(catalogEntry, dynamoDBClient);
}

// token-generation-states
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId,
descriptorId: descriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
})
.with(descriptorState.suspended, async () => {
const existingCatalogEntry = await readCatalogEntry(
eserviceDescriptorPK,
dynamoDBClient
);

if (
!existingCatalogEntry ||
existingCatalogEntry.version > msg.version
) {
return Promise.resolve();
} else {
// platform-states
await updateDescriptorStateInPlatformStatesEntry(
dynamoDBClient,
eserviceDescriptorPK,
descriptorStateToItemState(descriptor.state),
msg.version
);

// token-generation-states
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId,
descriptorId: descriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
}
})
.with(descriptorState.archived, async () => {
const eserviceId = unsafeBrandId<EServiceId>(msg.data.eserviceId);
const descriptor = parseDescriptor(
msg.data.eserviceDescriptor,
msg.type
);

// platform-states
const primaryKey = makePlatformStatesEServiceDescriptorPK({
eserviceId,
descriptorId: descriptor.id,
});
await deleteCatalogEntry(primaryKey, dynamoDBClient);

// token-generation-states
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId,
descriptorId: descriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
})
.with(descriptorState.draft, descriptorState.deprecated, () =>
Promise.resolve()
)
.exhaustive();
})
.with(
{ type: "EServiceAdded" },
{ type: "ClonedEServiceAdded" },
Expand All @@ -20,9 +157,18 @@ export async function handleMessageV1(
{ type: "EServiceDocumentAdded" },
{ type: "EServiceDocumentDeleted" },
{ type: "EServiceDescriptorAdded" },
{ type: "EServiceDescriptorUpdated" },
{ type: "EServiceRiskAnalysisDeleted" },
() => Promise.resolve()
)
.exhaustive();
}

export const parseDescriptor = (
descriptorV1: EServiceDescriptorV1 | undefined,
eventType: string
): Descriptor => {
if (!descriptorV1) {
throw missingKafkaMessageDataError("descriptor", eventType);
}
return fromDescriptorV1(descriptorV1);
};
Loading

0 comments on commit 0a94f11

Please sign in to comment.