Skip to content

Commit

Permalink
Refactor for blob and table
Browse files Browse the repository at this point in the history
  • Loading branch information
AleDore committed Mar 11, 2024
1 parent bdd3bad commit 77dcb7f
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { pipe } from "fp-ts/lib/function";
import * as TE from "fp-ts/TaskEither";
import * as O from "fp-ts/Option";
import { blobEnrich } from "../blob";
import * as blobUtils from "../../utils/blobStorage";
import * as blobUtils from "../../../utils/blobStorage";
const input = {
blobName: "pk",
foo: "foo",
Expand All @@ -15,7 +15,7 @@ const containerClientMock = {} as any;
describe("blobEnrich", () => {
it("should raise an error if blobName Field is not strings", async () => {
await pipe(
blobEnrich(containerClientMock, "bar")(input),
blobEnrich(containerClientMock)("bar")(input),
TE.bimap(
err => {
expect(err).toBeDefined();
Expand All @@ -35,7 +35,7 @@ describe("blobEnrich", () => {
TE.left(Error("Cannot read Blob"))
);
await pipe(
blobEnrich(containerClientMock, "foo")(input),
blobEnrich(containerClientMock)("foo")(input),
TE.bimap(
err => {
expect(err).toBeDefined();
Expand All @@ -51,7 +51,7 @@ describe("blobEnrich", () => {
it("should return unmodified input if Blob Document is missing", async () => {
getBlobDocumentMock.mockImplementationOnce(() => TE.right(O.none));
await pipe(
blobEnrich(containerClientMock, "foo")(input),
blobEnrich(containerClientMock)("foo")(input),
TE.bimap(
() => fail("it should not fail"),
result => expect(result).toEqual(input)
Expand All @@ -64,7 +64,7 @@ describe("blobEnrich", () => {
TE.right(O.some({ baz: "baz" }))
);
await pipe(
blobEnrich(containerClientMock, "blobName", "enrichedFieldName")(input),
blobEnrich(containerClientMock)("blobName", "enrichedFieldName")(input),
TE.bimap(
() => fail("it should not fail"),
result =>
Expand All @@ -81,7 +81,7 @@ describe("blobEnrich", () => {
TE.right(O.some({ baz: "baz" }))
);
await pipe(
blobEnrich(containerClientMock, "blobName")(input),
blobEnrich(containerClientMock)("blobName")(input),
TE.bimap(
() => fail("it should not fail"),
result =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as TE from "fp-ts/TaskEither";
import * as O from "fp-ts/Option";
import { pipe } from "fp-ts/lib/function";
import { tableEnrich } from "../table";
import * as tableUtils from "../../utils/tableStorage";
import * as tableUtils from "../../../utils/tableStorage";

const input = {
partitionKey: "pk",
Expand All @@ -17,7 +17,7 @@ const tableClientMock = {} as any;
describe("tableEnrich", () => {
it("should raise an error if input Key Fields are not strings", async () => {
await pipe(
tableEnrich(tableClientMock, "foo", "bar")(input),
tableEnrich(tableClientMock)("foo", "bar")(input),
TE.bimap(
err => {
expect(err).toBeDefined();
Expand All @@ -37,7 +37,7 @@ describe("tableEnrich", () => {
TE.left(Error("Table unreachable"))
);
await pipe(
tableEnrich(tableClientMock, "partitionKey", "partitionKey")(input),
tableEnrich(tableClientMock)("partitionKey", "partitionKey")(input),
TE.bimap(
err => {
expect(err).toBeDefined();
Expand All @@ -51,7 +51,7 @@ describe("tableEnrich", () => {
it("should return unmodified input if table document is missing", async () => {
getTableDocumentMock.mockImplementationOnce(() => TE.right(O.none));
await pipe(
tableEnrich(tableClientMock, "partitionKey", "rowKey")(input),
tableEnrich(tableClientMock)("partitionKey", "rowKey")(input),
TE.bimap(
() => fail("it should not fail"),
result => expect(result).toEqual(input)
Expand All @@ -64,7 +64,7 @@ describe("tableEnrich", () => {
TE.right(O.some({ baz: "baz" }))
);
await pipe(
tableEnrich(tableClientMock, "partitionKey", "rowKey")(input),
tableEnrich(tableClientMock)("partitionKey", "rowKey")(input),
TE.bimap(
() => fail("it should not fail"),
result => expect(result).toEqual({ ...input, baz: "baz" })
Expand All @@ -77,12 +77,9 @@ describe("tableEnrich", () => {
TE.right(O.some({ baz: "baz" }))
);
await pipe(
tableEnrich(
tableClientMock,
"partitionKey",
"rowKey",
"enrichedField"
)(input),
tableEnrich(tableClientMock)("partitionKey", "rowKey", "enrichedField")(
input
),
TE.bimap(
() => fail("it should not fail"),
result =>
Expand All @@ -96,7 +93,7 @@ describe("tableEnrich", () => {
TE.right(O.some({ baz: "baz" }))
);
await pipe(
tableEnrich(tableClientMock, "partitionKey", "rowKey")(input),
tableEnrich(tableClientMock)("partitionKey", "rowKey")(input),
TE.bimap(
() => fail("it should not fail"),
result => expect(result).toEqual({ ...input, baz: "baz" })
Expand Down
14 changes: 7 additions & 7 deletions src/enrichment/blob.ts → src/enrichment/storage/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import * as BS from "@azure/storage-blob";
import { errorsToReadableMessages } from "@pagopa/ts-commons/lib/reporters";

import { NonEmptyString } from "@pagopa/ts-commons/lib/strings";
import { flattenField } from "../formatter/flatten";
import { getBlobDocument } from "../utils/blobStorage";
import { toJsonObject } from "../utils/data";
import { flattenField } from "../../formatter/flatten";
import { getBlobDocument } from "../../utils/blobStorage";
import { toJsonObject } from "../../utils/data";

export const blobEnrich = <T extends Record<string, unknown>>(
blobContainerClient: BS.ContainerClient,
blobNameField: keyof T,
outputFieldName?: string
) => (input: T): TE.TaskEither<Error, T> =>
blobContainerClient: BS.ContainerClient
) => (blobNameField: keyof T, outputFieldName?: string) => (
input: T
): TE.TaskEither<Error, T> =>
pipe(
input[blobNameField],
NonEmptyString.decode,
Expand Down
22 changes: 22 additions & 0 deletions src/enrichment/storage/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { pipe } from "fp-ts/lib/function";
import { getBlobContainerClient } from "../../utils/blobStorage";
import { getTableClient } from "../../utils/tableStorage";
import { blobEnrich } from "./blob";
import { tableEnrich } from "./table";

export const createBlobStorageEnrichmentService = (
connectionString: string,
containerName: string
): ReturnType<typeof blobEnrich> =>
pipe(
getBlobContainerClient(connectionString, containerName),
containerClient => blobEnrich(containerClient)
);

export const createTableStorageEnrichmentService = (
connectionString: string,
tableName: string
): ReturnType<typeof tableEnrich> =>
pipe(getTableClient(connectionString)(tableName), tableClient =>
tableEnrich(tableClient)
);
7 changes: 4 additions & 3 deletions src/enrichment/table.ts → src/enrichment/storage/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import * as DT from "@azure/data-tables";
import * as t from "io-ts";
import { NonEmptyString } from "@pagopa/ts-commons/lib/strings";
import { errorsToReadableMessages } from "@pagopa/ts-commons/lib/reporters";
import { flattenField } from "../formatter/flatten";
import { getTableDocument } from "../utils/tableStorage";
import { flattenField } from "../../formatter/flatten";
import { getTableDocument } from "../../utils/tableStorage";

export const InputKeyFields = t.type({
partitionKey: NonEmptyString,
Expand All @@ -17,7 +17,8 @@ export const InputKeyFields = t.type({
export type InputKeyFields = t.TypeOf<typeof InputKeyFields>;

export const tableEnrich = <T extends Record<string, unknown>>(
tableClient: DT.TableClient,
tableClient: DT.TableClient
) => (
partitionKeyField: keyof T,
rowKeyField: keyof T,
outputFieldName?: string
Expand Down
4 changes: 4 additions & 0 deletions src/formatter/types.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";

export type MappingFormatter<I, O> = (input: I) => E.Either<Error, O>;

export type MappingEnrichment<I, O> = (input: I) => TE.TaskEither<Error, O>;
55 changes: 48 additions & 7 deletions src/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import {
SelectInputMapping
} from "@pagopa/data-indexer-commons";
import { SingleInputMapping } from "@pagopa/data-indexer-commons/lib/types/mapping/singleInput";
import { EnrichmentDataSource } from "@pagopa/data-indexer-commons/lib/types/enrichment/enrichment";

import { flow, pipe } from "fp-ts/lib/function";
import * as O from "fp-ts/lib/Option";
import * as E from "fp-ts/lib/Either";
import * as RA from "fp-ts/lib/ReadonlyArray";
import * as AR from "fp-ts/lib/Array";
import { MappingFormatter } from "../formatter/types";
import {
createBlobStorageEnrichmentService,
createTableStorageEnrichmentService
} from "../enrichment/storage/service";
import { MappingEnrichment, MappingFormatter } from "../formatter/types";
import { applySingleInput } from "../formatter/apply";
import {
IFormatterMapping,
Expand All @@ -22,6 +27,9 @@ import {
singleInputFormatterHandlerMappings
} from "./mappings";

const NOT_MAPPED_ERROR = "Not Mapped!";
const NOT_IMPLEMENTED_ERROR = "Not Implemented!";

const getHandler = <T>(
arr: ReadonlyArray<IFormatterMapping<T>>,
dataMapping: DataMapping
Expand All @@ -34,7 +42,7 @@ const getHandler = <T>(
AR.rights,
AR.head,
O.getOrElse(() => {
throw Error("Not mapped!");
throw Error(NOT_MAPPED_ERROR);
})
);

Expand Down Expand Up @@ -84,7 +92,34 @@ export const mapFormatting = <T extends Record<string, unknown>>(
case "EXCLUDE_INPUT":
return getExcludeInputHandler(mapping);
default:
throw Error("Not mapped!");
throw Error(NOT_MAPPED_ERROR);
}
};

export const mapEnrichment = <T extends Record<string, unknown>>(
enrichment: EnrichmentDataSource
): MappingEnrichment<T, unknown> => {
switch (enrichment.type) {
case "BlobStorage":
return createBlobStorageEnrichmentService(
enrichment.params.connectionString,
enrichment.params.containerName
)(enrichment.params.blobFilename);
case "TableStorage":
return createTableStorageEnrichmentService(
enrichment.params.connectionString,
enrichment.params.tableName
)(enrichment.params.partitionKey, enrichment.params.rowKey);
case "API":
throw Error(NOT_IMPLEMENTED_ERROR);
case "CosmosDB":
throw Error(NOT_IMPLEMENTED_ERROR);
case "MongoDB":
throw Error(NOT_IMPLEMENTED_ERROR);
case "PosgresDB":
throw Error(NOT_IMPLEMENTED_ERROR);
default:
throw Error(NOT_MAPPED_ERROR);
}
};

Expand All @@ -98,14 +133,20 @@ export const constructDataPipelineHandlers = (config: Configuration) =>
flow(
O.fromNullable,
O.chain(O.fromPredicate(RA.isEmpty)),
O.map(_ =>
pipe(
_.dataMapping,
O.map(step => ({
enrichs: pipe(
step.dataEnrichment,
O.fromNullable,
O.map(RA.map(mapEnrichment)),
O.getOrElse(() => [])
),
mappings: pipe(
step.dataMapping,
O.fromNullable,
O.map(RA.map(mapFormatting)),
O.getOrElse(() => [])
)
)
}))
)
)
)
Expand Down

0 comments on commit 77dcb7f

Please sign in to comment.