diff --git a/__integrations__/__tests__/system/deduplication/table.test.ts b/__integrations__/__tests__/system/deduplication/table.test.ts index f109a23..2234f8f 100644 --- a/__integrations__/__tests__/system/deduplication/table.test.ts +++ b/__integrations__/__tests__/system/deduplication/table.test.ts @@ -3,6 +3,7 @@ import * as E from "fp-ts/Either"; import * as TE from "fp-ts/lib/TaskEither"; import { pipe } from "fp-ts/lib/function"; import { + IOutputDocument, createIndexIfNotExists, getElasticClient, } from "../../../../src/output/elasticsearch/elasticsearch"; @@ -14,33 +15,38 @@ import { } from "../../../../src/output/factory"; import { ELASTIC_NODE, STORAGE_CONN_STRING } from "../../../env"; import { - createTable, - deleteTable, getTableClient, + getTableDocument, } from "../../../../src/utils/tableStorage"; -import { deleteData, deleteIndex } from "../../../utils/elasticsearch"; +import { deleteIndex } from "../../../utils/elasticsearch"; +import { createTableIfNotExists, deleteTableWithAbort } from "../../../utils/table"; +import { NonEmptyString } from "@pagopa/ts-commons/lib/strings"; -const INDEX_NAME = "index"; +const INDEX_NAME = "index" as NonEmptyString; const FIRST_ID = "first_id"; const currentTimestamp = Date.now(); const oldTimestamp = new Date(currentTimestamp - 86400000).getTime(); -const newerDocument = { - id: FIRST_ID, +const getNewerDocument = (id: string = FIRST_ID) => ({ + id, _timestamp: currentTimestamp, value: "first", -}; +}); -const olderDocument = { - id: FIRST_ID, +const getOlderDocument = (id: string = FIRST_ID) => ({ + id, _timestamp: oldTimestamp, value: "first", -}; +}); const tableDeduplicationStrategyConfig: DeduplicationStrategyConfig = { type: DeduplicationStrategyType.TableStorage, + tableName: INDEX_NAME, storageConnectionString: STORAGE_CONN_STRING, + opts: { + allowInsecureConnection: true + } }; describe("table deduplication", () => { @@ -50,7 +56,7 @@ describe("table deduplication", () => { TE.fromEither, TE.chainFirst((client) => createIndexIfNotExists(client, INDEX_NAME)), TE.chain(() => - createTable( + createTableIfNotExists( getTableClient(INDEX_NAME, { allowInsecureConnection: true })( STORAGE_CONN_STRING, ), @@ -63,17 +69,19 @@ describe("table deduplication", () => { }), )(); }, 10000); - + afterAll(async () => { await pipe( getElasticClient(ELASTIC_NODE), TE.fromEither, TE.chainFirst((client) => deleteIndex(client, INDEX_NAME)), - TE.chain(() => deleteTable( - getTableClient(INDEX_NAME, { allowInsecureConnection: true })( - STORAGE_CONN_STRING, + TE.chain(() => + deleteTableWithAbort( + getTableClient(INDEX_NAME, { allowInsecureConnection: true })( + STORAGE_CONN_STRING, + ), ), - )), + ), TE.getOrElse((e) => { throw Error( `Cannot destroy integration tests data - ${JSON.stringify(e.message)}`, @@ -81,7 +89,8 @@ describe("table deduplication", () => { }), )(); }, 10000); - it("should create the document when it doesn't exists", async () => { + it("should create the document if it doesn't exists", async () => { + const olderDocument = getOlderDocument(); await pipe( E.Do, E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), @@ -97,7 +106,7 @@ describe("table deduplication", () => { ), TE.bimap( (err) => - new Error( + Error( `it should not fail while finding an existing index - ${err.message}`, ), ({ service }) => @@ -107,12 +116,35 @@ describe("table deduplication", () => { TE.bimap(E.toError, (doc) => expect(doc._source).toEqual(olderDocument), ), + TE.chain(() => + pipe( + getTableClient(INDEX_NAME, { allowInsecureConnection: true })( + STORAGE_CONN_STRING, + ), + TE.of, + TE.chain((tableClient) => + getTableDocument( + tableClient, + INDEX_NAME, + olderDocument.id, + ), + ), + TE.chain( + TE.fromOption(() => Error("Table document should exists")), + ), + TE.map((tableDoc) => expect(tableDoc).toEqual(olderDocument)), + ), + ), ), ), + TE.mapLeft((e) => { + throw e; + }), )(); }); it("should update the document when it has a greater timestamp than the one in the index", async () => { + const newerDocument = getNewerDocument(); await pipe( E.Do, E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), @@ -144,6 +176,7 @@ describe("table deduplication", () => { }); it("should not update the document when it has a lower timestamp than the one in the index", async () => { + const [olderDocument, newerDocument] = [getOlderDocument(), getNewerDocument()] await pipe( E.Do, E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), diff --git a/__integrations__/package.json b/__integrations__/package.json index be369f9..16b54a4 100644 --- a/__integrations__/package.json +++ b/__integrations__/package.json @@ -6,13 +6,15 @@ "keywords": [], "author": "", "scripts": { - "start": "docker-compose up -d", + "start": "docker-compose --env-file environments/.env up -d", "stop": "docker-compose down", "build": "tsc", "test": "jest --runInBand" }, "dependencies": { + "@azure/abort-controller": "^2.0.0", "@azure/cosmos": "^4.0.0", + "@azure/data-tables": "^13.2.2", "dotenv": "^16.3.1", "fp-ts": "^2.16.1" }, diff --git a/__integrations__/utils/table.ts b/__integrations__/utils/table.ts new file mode 100644 index 0000000..8f0dbfa --- /dev/null +++ b/__integrations__/utils/table.ts @@ -0,0 +1,24 @@ +import * as DT from "@azure/data-tables"; +import * as E from "fp-ts/lib/Either"; +import * as TE from "fp-ts/lib/TaskEither"; +import { pipe } from "fp-ts/lib/function"; + +export const createTableIfNotExists = ( + tableClient: DT.TableClient, +): TE.TaskEither => + pipe(new AbortController(), (controller) => + TE.tryCatch( + () => tableClient.createTable({ abortSignal: controller.signal }), + E.toError, + ), + ); + +export const deleteTableWithAbort = ( + tableClient: DT.TableClient, +): TE.TaskEither => + pipe(new AbortController(), (controller) => + TE.tryCatch( + () => tableClient.deleteTable({ abortSignal: controller.signal }), + E.toError, + ), + ); diff --git a/__integrations__/yarn.lock b/__integrations__/yarn.lock index 6c00ca4..642d496 100644 --- a/__integrations__/yarn.lock +++ b/__integrations__/yarn.lock @@ -17,6 +17,13 @@ dependencies: tslib "^2.2.0" +"@azure/abort-controller@^2.0.0": + version "2.0.0" + resolved "https://registry.yarnpkg.com/@azure/abort-controller/-/abort-controller-2.0.0.tgz#a66d26c7f64977e3ff4b9e0b136296cb4bd47e8b" + integrity sha512-RP/mR/WJchR+g+nQFJGOec+nzeN/VvjlwbinccoqfhTsTHbb8X5+mLDp48kHT0ueyum0BNSwGm0kX0UZuIqTGg== + dependencies: + tslib "^2.2.0" + "@azure/core-auth@^1.3.0", "@azure/core-auth@^1.4.0": version "1.5.0" resolved "https://registry.npmjs.org/@azure/core-auth/-/core-auth-1.5.0.tgz#a41848c5c31cb3b7c84c409885267d55a2c92e44" @@ -26,6 +33,40 @@ "@azure/core-util" "^1.1.0" tslib "^2.2.0" +"@azure/core-client@^1.0.0": + version "1.8.0" + resolved "https://registry.yarnpkg.com/@azure/core-client/-/core-client-1.8.0.tgz#fce9b0af62ba469510e4ed6169b75622d31e2216" + integrity sha512-+gHS3gEzPlhyQBMoqVPOTeNH031R5DM/xpCvz72y38C09rg4Hui/1sJS/ujoisDZbbSHyuRLVWdFlwL0pIFwbg== + dependencies: + "@azure/abort-controller" "^2.0.0" + "@azure/core-auth" "^1.4.0" + "@azure/core-rest-pipeline" "^1.9.1" + "@azure/core-tracing" "^1.0.0" + "@azure/core-util" "^1.0.0" + "@azure/logger" "^1.0.0" + tslib "^2.2.0" + +"@azure/core-paging@^1.1.1": + version "1.5.0" + resolved "https://registry.yarnpkg.com/@azure/core-paging/-/core-paging-1.5.0.tgz#5a5b09353e636072e6a7fc38f7879e11d0afb15f" + integrity sha512-zqWdVIt+2Z+3wqxEOGzR5hXFZ8MGKK52x4vFLw8n58pR6ZfKRx3EXYTxTaYxYHc/PexPUTyimcTWFJbji9Z6Iw== + dependencies: + tslib "^2.2.0" + +"@azure/core-rest-pipeline@^1.1.0", "@azure/core-rest-pipeline@^1.9.1": + version "1.14.0" + resolved "https://registry.yarnpkg.com/@azure/core-rest-pipeline/-/core-rest-pipeline-1.14.0.tgz#9ff394941580a6dee9f0c8a759e16065c524bcfc" + integrity sha512-Tp4M6NsjCmn9L5p7HsW98eSOS7A0ibl3e5ntZglozT0XuD/0y6i36iW829ZbBq0qihlGgfaeFpkLjZ418KDm1Q== + dependencies: + "@azure/abort-controller" "^2.0.0" + "@azure/core-auth" "^1.4.0" + "@azure/core-tracing" "^1.0.1" + "@azure/core-util" "^1.3.0" + "@azure/logger" "^1.0.0" + http-proxy-agent "^5.0.0" + https-proxy-agent "^5.0.0" + tslib "^2.2.0" + "@azure/core-rest-pipeline@^1.2.0": version "1.13.0" resolved "https://registry.npmjs.org/@azure/core-rest-pipeline/-/core-rest-pipeline-1.13.0.tgz#770b003c351b4869e3f1c85800bacb947c98cd33" @@ -47,6 +88,14 @@ dependencies: tslib "^2.2.0" +"@azure/core-util@^1.0.0": + version "1.7.0" + resolved "https://registry.yarnpkg.com/@azure/core-util/-/core-util-1.7.0.tgz#3a2f73e8c7eed0666e8b6ff9ca2c1951e175feba" + integrity sha512-Zq2i3QO6k9DA8vnm29mYM4G8IE9u1mhF1GUabVEqPNX8Lj833gdxQ2NAFxt2BZsfAL+e9cT8SyVN7dFVJ/Hf0g== + dependencies: + "@azure/abort-controller" "^2.0.0" + tslib "^2.2.0" + "@azure/core-util@^1.1.0", "@azure/core-util@^1.3.0": version "1.6.1" resolved "https://registry.npmjs.org/@azure/core-util/-/core-util-1.6.1.tgz#fea221c4fa43c26543bccf799beb30c1c7878f5a" @@ -55,6 +104,14 @@ "@azure/abort-controller" "^1.0.0" tslib "^2.2.0" +"@azure/core-xml@^1.0.0": + version "1.3.4" + resolved "https://registry.yarnpkg.com/@azure/core-xml/-/core-xml-1.3.4.tgz#516092f948b1b609b6c2afb8437acd204527e323" + integrity sha512-B1xI79Ur/u+KR69fGTcsMNj8KDjBSqAy0Ys6Byy4Qm1CqoUy7gCT5A7Pej0EBWRskuH6bpCwrAnosfmQEalkcg== + dependencies: + fast-xml-parser "^4.2.4" + tslib "^2.2.0" + "@azure/cosmos@^4.0.0": version "4.0.0" resolved "https://registry.npmjs.org/@azure/cosmos/-/cosmos-4.0.0.tgz#5fda8b35cb62bbcda52159b96c4c3981a843d5b9" @@ -74,6 +131,21 @@ universal-user-agent "^6.0.0" uuid "^8.3.0" +"@azure/data-tables@^13.2.2": + version "13.2.2" + resolved "https://registry.yarnpkg.com/@azure/data-tables/-/data-tables-13.2.2.tgz#9aa82992d7317a779ecf66436ffa96b887d8e616" + integrity sha512-Dq2Aq0mMMF0BPzYQKdBY/OtO7VemP/foh6z+mJpUO1hRL+65C1rGQUJf20LJHotSyU8wHb4HJzOs+Z50GXSy1w== + dependencies: + "@azure/core-auth" "^1.3.0" + "@azure/core-client" "^1.0.0" + "@azure/core-paging" "^1.1.1" + "@azure/core-rest-pipeline" "^1.1.0" + "@azure/core-tracing" "^1.0.0" + "@azure/core-xml" "^1.0.0" + "@azure/logger" "^1.0.0" + tslib "^2.2.0" + uuid "^8.3.0" + "@azure/logger@^1.0.0": version "1.0.4" resolved "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz#28bc6d0e5b3c38ef29296b32d35da4e483593fa1" @@ -1144,6 +1216,13 @@ fast-json-stable-stringify@2.x, fast-json-stable-stringify@^2.1.0: resolved "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz#874bf69c6f404c2b5d99c481341399fd55892633" integrity sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw== +fast-xml-parser@^4.2.4: + version "4.3.5" + resolved "https://registry.yarnpkg.com/fast-xml-parser/-/fast-xml-parser-4.3.5.tgz#e2f2a2ae8377e9c3dc321b151e58f420ca7e5ccc" + integrity sha512-sWvP1Pl8H03B8oFJpFR3HE31HUfwtX7Rlf9BNsvdpujD4n7WMhfmu8h9wOV2u+c1k0ZilTADhPqypzx2J690ZQ== + dependencies: + strnum "^1.0.5" + fb-watchman@^2.0.0: version "2.0.2" resolved "https://registry.npmjs.org/fb-watchman/-/fb-watchman-2.0.2.tgz#e9524ee6b5c77e9e5001af0f85f3adbb8623255c" @@ -2174,6 +2253,11 @@ strip-json-comments@^3.1.1: resolved "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-3.1.1.tgz#31f1281b3832630434831c310c01cccda8cbe006" integrity sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig== +strnum@^1.0.5: + version "1.0.5" + resolved "https://registry.yarnpkg.com/strnum/-/strnum-1.0.5.tgz#5c4e829fe15ad4ff0d20c3db5ac97b73c9b072db" + integrity sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA== + supports-color@^5.3.0: version "5.5.0" resolved "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz#e2e69a44ac8772f78a1ec0b35b689df6530efc8f" diff --git a/src/output/__tests__/tablestorage-deduplication.test.ts b/src/output/__tests__/tablestorage-deduplication.test.ts index adbcf69..964644c 100644 --- a/src/output/__tests__/tablestorage-deduplication.test.ts +++ b/src/output/__tests__/tablestorage-deduplication.test.ts @@ -25,7 +25,6 @@ const mockService: IOutputService = { update: mockUpdate }; -const getTableClient = jest.spyOn(tableUtils, "getTableClient"); const getTableDocumentSpy = jest.spyOn(tableUtils, "getTableDocument"); const upsertTableDocumentSpy = jest .spyOn(tableUtils, "upsertTableDocument") @@ -37,13 +36,12 @@ describe("tableStorageDeduplication", () => { }); it("should handle error when getting document from table", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.left(new Error("Failed to get document")) ); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( result => { expect(result).toEqual(new Error("Failed to get document")); @@ -64,13 +62,12 @@ describe("tableStorageDeduplication", () => { }); it("should handle error when indexing document", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.none)); mockInsert.mockImplementationOnce(() => TE.left(new Error("Failed to index document")) ); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( result => { expect(result).toEqual(new Error("Failed to index document")); @@ -91,7 +88,6 @@ describe("tableStorageDeduplication", () => { }); it("should handle error when updating document index", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.some({ _timestamp: 1 })) ); @@ -100,7 +96,7 @@ describe("tableStorageDeduplication", () => { ); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( result => { expect(result).toEqual( @@ -123,14 +119,13 @@ describe("tableStorageDeduplication", () => { }); it("should handle error when inserting on table storage", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.none)); upsertTableDocumentSpy.mockImplementationOnce(() => TE.left(new Error("Failed to insert on table storage")) ); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( result => { expect(result).toEqual( @@ -151,13 +146,12 @@ describe("tableStorageDeduplication", () => { )(); }); it("should index a new document and store it in table if document does not exist", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.none)); mockInsert.mockImplementationOnce(() => TE.right(void 0)); upsertTableDocumentSpy.mockImplementationOnce(() => TE.right(void 0)); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( () => { throw new Error("it should not fail"); @@ -182,7 +176,6 @@ describe("tableStorageDeduplication", () => { }); it("should update existing document in index and store it in table if existing document is older than new document", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.some({ _timestamp: 1 } as Record)) ); @@ -190,7 +183,7 @@ describe("tableStorageDeduplication", () => { upsertTableDocumentSpy.mockImplementationOnce(() => TE.right(void 0)); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( () => { throw new Error("it should not fail"); @@ -215,7 +208,6 @@ describe("tableStorageDeduplication", () => { }); it("should not upsert document in index and store it in table if existing document is newer than occuring document", async () => { - getTableClient.mockImplementationOnce(() => () => mockTableClient); getTableDocumentSpy.mockImplementationOnce(() => TE.right(O.some({ _timestamp: 3 })) ); @@ -223,7 +215,7 @@ describe("tableStorageDeduplication", () => { upsertTableDocumentSpy.mockImplementationOnce(() => TE.right(void 0)); await pipe( - tableStorageDeduplication(tableStorageConnectionString)("testIndex", mockDocument)(mockService), + tableStorageDeduplication(mockTableClient)("testIndex", mockDocument)(mockService), TE.bimap( () => { throw new Error("it should not fail"); diff --git a/src/output/factory.ts b/src/output/factory.ts index 3046805..a39b1c6 100644 --- a/src/output/factory.ts +++ b/src/output/factory.ts @@ -15,10 +15,21 @@ export const IndexerDeduplicationStrategyConfig = t.type({ type: t.literal(DeduplicationStrategyType.Indexer) }); -export const TableDeduplicationStrategyConfig = t.type({ - storageConnectionString: NonEmptyString, - type: t.literal(DeduplicationStrategyType.TableStorage) -}); +export const TableDeduplicationStrategyConfig = t.intersection([ + t.type({ + storageConnectionString: NonEmptyString, + tableName: NonEmptyString, + type: t.literal(DeduplicationStrategyType.TableStorage) + }), + t.partial({ + opts: t.type({ + allowInsecureConnection: t.boolean + }) + }) +]); +export type TableDeduplicationStrategyConfig = t.TypeOf< + typeof TableDeduplicationStrategyConfig +>; export const DeduplicationStrategyConfig = t.union([ TableDeduplicationStrategyConfig, @@ -35,7 +46,7 @@ export const getDeduplicationStrategy = ( case DeduplicationStrategyType.Indexer: return indexerDeduplicationStrategy; case DeduplicationStrategyType.TableStorage: - return tableStorageDeduplicationStrategy(config.storageConnectionString); + return tableStorageDeduplicationStrategy(config); default: throw new Error("Strategy not supported"); } diff --git a/src/output/service.ts b/src/output/service.ts index fd527d1..aa6aeda 100644 --- a/src/output/service.ts +++ b/src/output/service.ts @@ -1,9 +1,11 @@ import * as TE from "fp-ts/TaskEither"; -import { NonEmptyString } from "@pagopa/ts-commons/lib/strings"; +import { pipe } from "fp-ts/lib/function"; +import { getTableClient } from "../utils/tableStorage"; import { IOutputDocument } from "./elasticsearch/elasticsearch"; import { IOutputService } from "./elasticsearch/service"; import { indexerDeduplication } from "./indexer-deduplication"; import { tableStorageDeduplication } from "./tablestorage-deduplication"; +import { TableDeduplicationStrategyConfig } from "./factory"; export interface IDeduplicationStrategy { readonly execute: ( @@ -17,7 +19,14 @@ export const indexerDeduplicationStrategy: IDeduplicationStrategy = { }; export const tableStorageDeduplicationStrategy = ( - storageConnectionString: NonEmptyString -): IDeduplicationStrategy => ({ - execute: tableStorageDeduplication(storageConnectionString) -}); + config: TableDeduplicationStrategyConfig +): IDeduplicationStrategy => + pipe( + getTableClient( + config.tableName, + config.opts + )(config.storageConnectionString), + tableClient => ({ + execute: tableStorageDeduplication(tableClient) + }) + ); diff --git a/src/output/tablestorage-deduplication.ts b/src/output/tablestorage-deduplication.ts index 7715831..6d58598 100644 --- a/src/output/tablestorage-deduplication.ts +++ b/src/output/tablestorage-deduplication.ts @@ -3,72 +3,58 @@ import { defaultLog } from "@pagopa/winston-ts"; import * as O from "fp-ts/Option"; import * as TE from "fp-ts/TaskEither"; import { constVoid, flow, pipe } from "fp-ts/lib/function"; -import { NonEmptyString } from "@pagopa/ts-commons/lib/strings"; -import { - getTableClient, - getTableDocument, - upsertTableDocument -} from "../utils/tableStorage"; +import * as DT from "@azure/data-tables"; +import { getTableDocument, upsertTableDocument } from "../utils/tableStorage"; import { IOutputDocument } from "./elasticsearch/elasticsearch"; import { IOutputService } from "./elasticsearch/service"; -export const tableStorageDeduplication = ( - tableStorageConnectionString: NonEmptyString -) => (indexName: string, document: IOutputDocument) => ( - service: IOutputService -): TE.TaskEither => +export const tableStorageDeduplication = (tableClient: DT.TableClient) => ( + indexName: string, + document: IOutputDocument +) => (service: IOutputService): TE.TaskEither => pipe( - TE.Do, - TE.bind("tableClient", () => - TE.of(getTableClient(indexName)(tableStorageConnectionString)) + getTableDocument(tableClient, indexName, document.id), + defaultLog.taskEither.infoLeft( + e => `Error getting document from index table => ${String(e)}` ), - defaultLog.taskEither.info(`tableStorageDeduplication => ${document}`), - TE.chain(({ tableClient }) => - pipe( - getTableDocument(tableClient, indexName, document.id), - defaultLog.taskEither.infoLeft( - e => `Error getting document from index table => ${String(e)}` + defaultLog.taskEither.info("indexing document"), + TE.chain( + flow( + O.fold( + () => + pipe( + service.insert(indexName, document), + TE.map(() => O.some(document)) + ), + flow( + O.fromPredicate( + // eslint-disable-next-line no-underscore-dangle + retrievedDoc => retrievedDoc._timestamp < document._timestamp + ), + O.map(() => + pipe( + service.update(indexName, document), + TE.map(() => O.some(document)) + ) + ), + O.getOrElse(() => TE.right(O.none)) + ) ), - defaultLog.taskEither.info("indexing document"), TE.chain( flow( - O.fold( - () => - pipe( - service.insert(indexName, document), - TE.map(() => O.some(document)) - ), - flow( - O.fromPredicate( + O.map(() => + pipe( + upsertTableDocument(tableClient, { + rowKey: document.id, + partitionKey: indexName, + id: document.id, // eslint-disable-next-line no-underscore-dangle - retrievedDoc => retrievedDoc._timestamp < document._timestamp - ), - O.map(() => - pipe( - service.update(indexName, document), - TE.map(() => O.some(document)) - ) - ), - O.getOrElse(() => TE.right(O.none)) + _timestamp: document._timestamp + }), + TE.map(constVoid) ) ), - TE.chain( - flow( - O.map(() => - pipe( - upsertTableDocument(tableClient, { - rowKey: document.id, - partitionKey: indexName, - id: document.id, - // eslint-disable-next-line no-underscore-dangle - _timestamp: document._timestamp - }), - TE.map(constVoid) - ) - ), - O.getOrElse(() => TE.right(void 0)) - ) - ) + O.getOrElse(() => TE.right(void 0)) ) ) )