From 2571ff02a4ff76c5d3a9b38dee110d9085a965d7 Mon Sep 17 00:00:00 2001 From: Davide Marro Date: Wed, 7 Feb 2024 16:32:23 +0100 Subject: [PATCH] Adding system tests for deduplication strategy --- .github/workflows/code-review.yaml | 18 ++- .../system/deduplication/timestamp.test.ts | 116 ++++++++++++++++-- __integrations__/env.ts | 2 +- __integrations__/utils/elasticsearch.ts | 29 +++++ src/deduplication/algorithm.ts | 2 +- .../elasticsearch/elasticsearch.ts | 12 +- src/deduplication/elasticsearch/service.ts | 2 +- 7 files changed, 163 insertions(+), 18 deletions(-) create mode 100644 __integrations__/utils/elasticsearch.ts diff --git a/.github/workflows/code-review.yaml b/.github/workflows/code-review.yaml index 58c4125..cc52782 100644 --- a/.github/workflows/code-review.yaml +++ b/.github/workflows/code-review.yaml @@ -53,8 +53,24 @@ jobs: echo "COSMOSDB_NAME=${{github.run_id}}" >> $GITHUB_ENV echo "COSMOSDB_CONNECTION_STRING=${{secrets.COSMOSDB_CONNECTION_STRING}}" >> $GITHUB_ENV + - name: Stop containers if up + run: | + docker-compose -f "__integrations__/docker-compose.yml" stop + docker-compose -f "__integrations__/docker-compose.yml" down + + - name: Start containers + run: docker-compose --env-path "__integrations__/environments/.env" -f "__integrations__/docker-compose.yml" up -d + + - name: Sleep + run: sleep 30s + - name: System tests run: | cd __integrations__ yarn install --immutable - yarn test \ No newline at end of file + yarn test + + - name: Stop containers + run: | + docker-compose -f "docker-compose.yml" stop + docker-compose -f "docker-compose.yml" down \ No newline at end of file diff --git a/__integrations__/__tests__/system/deduplication/timestamp.test.ts b/__integrations__/__tests__/system/deduplication/timestamp.test.ts index 29de043..3bed3c3 100644 --- a/__integrations__/__tests__/system/deduplication/timestamp.test.ts +++ b/__integrations__/__tests__/system/deduplication/timestamp.test.ts @@ -1,6 +1,7 @@ +import { defaultLog } from "@pagopa/winston-ts"; import * as E from "fp-ts/Either"; import * as TE from "fp-ts/lib/TaskEither"; -import { constVoid, pipe } from "fp-ts/lib/function"; +import { pipe } from "fp-ts/lib/function"; import { createIndexIfNotExists, getElasticClient, @@ -11,14 +12,31 @@ import { getDeduplicationStrategy, } from "../../../../src/deduplication/factory"; import { ELASTIC_NODE } from "../../../env"; +import { deleteData, deleteIndex } from "../../../utils/elasticsearch"; const INDEX_NAME = "index_name"; -const ID = "id"; +const FIRST_ID = "first_id"; + +const currentTimestamp = Date.now(); +const oldTimestamp = new Date(currentTimestamp - 86400000).getTime(); + +const newerDocument = { + id: FIRST_ID, + _timestamp: currentTimestamp, + value: "first", +}; + +const olderDocument = { + id: FIRST_ID, + _timestamp: oldTimestamp, + value: "first", +}; + beforeAll(async () => { await pipe( getElasticClient(ELASTIC_NODE), TE.fromEither, - TE.chain((client) => createIndexIfNotExists(client, INDEX_NAME)), + TE.chainFirst((client) => createIndexIfNotExists(client, INDEX_NAME)), TE.getOrElse((e) => { throw Error( `Cannot initialize integration tests - ${JSON.stringify(e.message)}`, @@ -27,8 +45,22 @@ beforeAll(async () => { )(); }, 10000); +afterAll(async () => { + await pipe( + getElasticClient(ELASTIC_NODE), + TE.fromEither, + TE.chainFirst((client) => deleteData(client, INDEX_NAME, FIRST_ID)), + TE.chainFirst((client) => deleteIndex(client, INDEX_NAME)), + TE.getOrElse((e) => { + throw Error( + `Cannot destroy integration tests data - ${JSON.stringify(e.message)}`, + ); + }), + )(); +}, 10000); + describe("deduplication", () => { - it("should create the index when it doesn't exists", async () => { + it("should create the document when it doesn't exists", async () => { await pipe( E.Do, E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), @@ -40,17 +72,83 @@ describe("deduplication", () => { ), TE.fromEither, TE.chainFirst(({ service, strategy }) => - strategy.execute(INDEX_NAME, { - id: ID, - _timestamp: Date.now(), - })(service), + strategy.execute(INDEX_NAME, olderDocument)(service), ), TE.bimap( (err) => new Error( `it should not fail while finding an existing index - ${err.message}`, ), - () => constVoid(), + ({ service }) => + pipe( + service.get(INDEX_NAME, { id: FIRST_ID }), + defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`), + TE.bimap(E.toError, (doc) => + expect(doc._source).toEqual(olderDocument), + ), + ), + ), + )(); + }); + + it("should update the document when it has a greater timestamp than the one in the index", async () => { + await pipe( + E.Do, + E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), + E.bind("strategy", () => + E.tryCatch( + () => getDeduplicationStrategy(DeduplicationStrategyType.Timestamp), + E.toError, + ), + ), + TE.fromEither, + TE.chainFirst(({ service, strategy }) => + strategy.execute(INDEX_NAME, newerDocument)(service), + ), + TE.bimap( + (err) => + new Error( + `it should not fail while finding an existing index - ${err.message}`, + ), + ({ service }) => + pipe( + service.get(INDEX_NAME, { id: FIRST_ID }), + defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`), + TE.bimap(E.toError, (doc) => + expect(doc._source).toEqual(newerDocument), + ), + ), + ), + )(); + }); + + it("should not update the document when it has a lower timestamp than the one in the index", async () => { + await pipe( + E.Do, + E.bind("service", () => getElasticSearchService(ELASTIC_NODE)), + E.bind("strategy", () => + E.tryCatch( + () => getDeduplicationStrategy(DeduplicationStrategyType.Timestamp), + E.toError, + ), + ), + TE.fromEither, + TE.chainFirst(({ service, strategy }) => + strategy.execute(INDEX_NAME, olderDocument)(service), + ), + TE.bimap( + (err) => + new Error( + `it should not fail while finding an existing index - ${err.message}`, + ), + ({ service }) => + pipe( + service.get(INDEX_NAME, { id: FIRST_ID }), + defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`), + TE.bimap(E.toError, (doc) => + expect(doc._source).toEqual(newerDocument), + ), + ), ), )(); }); diff --git a/__integrations__/env.ts b/__integrations__/env.ts index 0ed1f9a..ae34f1f 100644 --- a/__integrations__/env.ts +++ b/__integrations__/env.ts @@ -7,4 +7,4 @@ export const COSMOSDB_KEY = process.env.COSMOSDB_KEY; export const COSMOSDB_CONNECTION_STRING = process.env.COSMOSDB_CONNECTION_STRING ?? "COSMOSDB_CONNECTION_STRING"; export const COSMOSDB_NAME = process.env.COSMOSDB_NAME ?? "db"; -export const ELASTIC_NODE = process.env.ELASTIC_NODE ?? "localhost:9200"; +export const ELASTIC_NODE = process.env.ELASTIC_NODE ?? "http://localhost:9200"; diff --git a/__integrations__/utils/elasticsearch.ts b/__integrations__/utils/elasticsearch.ts new file mode 100644 index 0000000..04f948c --- /dev/null +++ b/__integrations__/utils/elasticsearch.ts @@ -0,0 +1,29 @@ +import * as EL from "@elastic/elasticsearch"; +import * as E from "fp-ts/Either"; +import * as TE from "fp-ts/TaskEither"; +import { constVoid, pipe } from "fp-ts/lib/function"; + +export const deleteIndex = ( + elasticClient: EL.Client, + indexName: string, +): TE.TaskEither => + pipe( + TE.tryCatch( + () => elasticClient.indices.delete({ index: indexName }), + E.toError, + ), + TE.map(constVoid), + ); + +export const deleteData = ( + elasticClient: EL.Client, + indexName: string, + id: string, +): TE.TaskEither => + pipe( + TE.tryCatch( + () => elasticClient.delete({ index: indexName, id: id }), + E.toError, + ), + TE.map(constVoid), + ); diff --git a/src/deduplication/algorithm.ts b/src/deduplication/algorithm.ts index 89417c7..233f74e 100644 --- a/src/deduplication/algorithm.ts +++ b/src/deduplication/algorithm.ts @@ -18,7 +18,7 @@ export const timestampDeduplication = ( resErr => resErr, docRead => pipe( - document._timestamp > docRead.fields._timestamp, + document._timestamp > docRead._source._timestamp, B.fold( () => constVoid(), () => diff --git a/src/deduplication/elasticsearch/elasticsearch.ts b/src/deduplication/elasticsearch/elasticsearch.ts index 4bbdc0c..46d3e79 100644 --- a/src/deduplication/elasticsearch/elasticsearch.ts +++ b/src/deduplication/elasticsearch/elasticsearch.ts @@ -49,18 +49,20 @@ export const createIndexIfNotExists = ( export const getDocument = (elasticClient: EL.Client) => ( indexName: string, document: IOutputDocument -): TE.TaskEither => +): TE.TaskEither> => pipe( TE.Do, defaultLog.taskEither.info(`getAndIndexDocument => ${document}`), () => TE.tryCatch( - () => elasticClient.get({ id: document.id, index: indexName }), + () => + elasticClient.get({ + id: document.id, + index: indexName + }), e => e as EL.errors.ResponseError ), - defaultLog.taskEither.infoLeft( - e => `Error getting document from index => ${String(e)}` - ) + defaultLog.taskEither.infoLeft(_ => `Error getting document from index`) ); export const indexDocument = (elasticClient: EL.Client) => ( diff --git a/src/deduplication/elasticsearch/service.ts b/src/deduplication/elasticsearch/service.ts index dc0d0f2..5836fc0 100644 --- a/src/deduplication/elasticsearch/service.ts +++ b/src/deduplication/elasticsearch/service.ts @@ -12,7 +12,7 @@ import { } from "./elasticsearch"; export type OutputClient = EL.Client; -export type OutputDataRead = GetResponse; +export type OutputDataRead = GetResponse; export type OutputDataWrite = EL.estypes.Result; export interface IOutputDeduplicationService {