Skip to content

Commit

Permalink
Adding system tests for deduplication strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
drmarro committed Feb 7, 2024
1 parent 8fd3b7d commit 2571ff0
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 18 deletions.
18 changes: 17 additions & 1 deletion .github/workflows/code-review.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,24 @@ jobs:
echo "COSMOSDB_NAME=${{github.run_id}}" >> $GITHUB_ENV
echo "COSMOSDB_CONNECTION_STRING=${{secrets.COSMOSDB_CONNECTION_STRING}}" >> $GITHUB_ENV
- name: Stop containers if up
run: |
docker-compose -f "__integrations__/docker-compose.yml" stop
docker-compose -f "__integrations__/docker-compose.yml" down
- name: Start containers
run: docker-compose --env-path "__integrations__/environments/.env" -f "__integrations__/docker-compose.yml" up -d

- name: Sleep
run: sleep 30s

- name: System tests
run: |
cd __integrations__
yarn install --immutable
yarn test
yarn test
- name: Stop containers
run: |
docker-compose -f "docker-compose.yml" stop
docker-compose -f "docker-compose.yml" down
116 changes: 107 additions & 9 deletions __integrations__/__tests__/system/deduplication/timestamp.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { defaultLog } from "@pagopa/winston-ts";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/lib/TaskEither";
import { constVoid, pipe } from "fp-ts/lib/function";
import { pipe } from "fp-ts/lib/function";
import {
createIndexIfNotExists,
getElasticClient,
Expand All @@ -11,14 +12,31 @@ import {
getDeduplicationStrategy,
} from "../../../../src/deduplication/factory";
import { ELASTIC_NODE } from "../../../env";
import { deleteData, deleteIndex } from "../../../utils/elasticsearch";

const INDEX_NAME = "index_name";
const ID = "id";
const FIRST_ID = "first_id";

const currentTimestamp = Date.now();
const oldTimestamp = new Date(currentTimestamp - 86400000).getTime();

const newerDocument = {
id: FIRST_ID,
_timestamp: currentTimestamp,
value: "first",
};

const olderDocument = {
id: FIRST_ID,
_timestamp: oldTimestamp,
value: "first",
};

beforeAll(async () => {
await pipe(
getElasticClient(ELASTIC_NODE),
TE.fromEither,
TE.chain((client) => createIndexIfNotExists(client, INDEX_NAME)),
TE.chainFirst((client) => createIndexIfNotExists(client, INDEX_NAME)),
TE.getOrElse((e) => {
throw Error(
`Cannot initialize integration tests - ${JSON.stringify(e.message)}`,
Expand All @@ -27,8 +45,22 @@ beforeAll(async () => {
)();
}, 10000);

afterAll(async () => {
await pipe(
getElasticClient(ELASTIC_NODE),
TE.fromEither,
TE.chainFirst((client) => deleteData(client, INDEX_NAME, FIRST_ID)),
TE.chainFirst((client) => deleteIndex(client, INDEX_NAME)),
TE.getOrElse((e) => {
throw Error(
`Cannot destroy integration tests data - ${JSON.stringify(e.message)}`,
);
}),
)();
}, 10000);

describe("deduplication", () => {
it("should create the index when it doesn't exists", async () => {
it("should create the document when it doesn't exists", async () => {
await pipe(
E.Do,
E.bind("service", () => getElasticSearchService(ELASTIC_NODE)),
Expand All @@ -40,17 +72,83 @@ describe("deduplication", () => {
),
TE.fromEither,
TE.chainFirst(({ service, strategy }) =>
strategy.execute(INDEX_NAME, {
id: ID,
_timestamp: Date.now(),
})(service),
strategy.execute(INDEX_NAME, olderDocument)(service),
),
TE.bimap(
(err) =>
new Error(
`it should not fail while finding an existing index - ${err.message}`,
),
() => constVoid(),
({ service }) =>
pipe(
service.get(INDEX_NAME, { id: FIRST_ID }),
defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`),
TE.bimap(E.toError, (doc) =>
expect(doc._source).toEqual(olderDocument),
),
),
),
)();
});

it("should update the document when it has a greater timestamp than the one in the index", async () => {
await pipe(
E.Do,
E.bind("service", () => getElasticSearchService(ELASTIC_NODE)),
E.bind("strategy", () =>
E.tryCatch(
() => getDeduplicationStrategy(DeduplicationStrategyType.Timestamp),
E.toError,
),
),
TE.fromEither,
TE.chainFirst(({ service, strategy }) =>
strategy.execute(INDEX_NAME, newerDocument)(service),
),
TE.bimap(
(err) =>
new Error(
`it should not fail while finding an existing index - ${err.message}`,
),
({ service }) =>
pipe(
service.get(INDEX_NAME, { id: FIRST_ID }),
defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`),
TE.bimap(E.toError, (doc) =>
expect(doc._source).toEqual(newerDocument),
),
),
),
)();
});

it("should not update the document when it has a lower timestamp than the one in the index", async () => {
await pipe(
E.Do,
E.bind("service", () => getElasticSearchService(ELASTIC_NODE)),
E.bind("strategy", () =>
E.tryCatch(
() => getDeduplicationStrategy(DeduplicationStrategyType.Timestamp),
E.toError,
),
),
TE.fromEither,
TE.chainFirst(({ service, strategy }) =>
strategy.execute(INDEX_NAME, olderDocument)(service),
),
TE.bimap(
(err) =>
new Error(
`it should not fail while finding an existing index - ${err.message}`,
),
({ service }) =>
pipe(
service.get(INDEX_NAME, { id: FIRST_ID }),
defaultLog.taskEither.info((doc) => `Doc retrieved ${doc._source}`),
TE.bimap(E.toError, (doc) =>
expect(doc._source).toEqual(newerDocument),
),
),
),
)();
});
Expand Down
2 changes: 1 addition & 1 deletion __integrations__/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ export const COSMOSDB_KEY = process.env.COSMOSDB_KEY;
export const COSMOSDB_CONNECTION_STRING =
process.env.COSMOSDB_CONNECTION_STRING ?? "COSMOSDB_CONNECTION_STRING";
export const COSMOSDB_NAME = process.env.COSMOSDB_NAME ?? "db";
export const ELASTIC_NODE = process.env.ELASTIC_NODE ?? "localhost:9200";
export const ELASTIC_NODE = process.env.ELASTIC_NODE ?? "http://localhost:9200";
29 changes: 29 additions & 0 deletions __integrations__/utils/elasticsearch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import * as EL from "@elastic/elasticsearch";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import { constVoid, pipe } from "fp-ts/lib/function";

export const deleteIndex = (
elasticClient: EL.Client,
indexName: string,
): TE.TaskEither<Error, void> =>
pipe(
TE.tryCatch(
() => elasticClient.indices.delete({ index: indexName }),
E.toError,
),
TE.map(constVoid),
);

export const deleteData = (
elasticClient: EL.Client,
indexName: string,
id: string,
): TE.TaskEither<Error, void> =>
pipe(
TE.tryCatch(
() => elasticClient.delete({ index: indexName, id: id }),
E.toError,
),
TE.map(constVoid),
);
2 changes: 1 addition & 1 deletion src/deduplication/algorithm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export const timestampDeduplication = (
resErr => resErr,
docRead =>
pipe(
document._timestamp > docRead.fields._timestamp,
document._timestamp > docRead._source._timestamp,
B.fold(
() => constVoid(),
() =>
Expand Down
12 changes: 7 additions & 5 deletions src/deduplication/elasticsearch/elasticsearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,20 @@ export const createIndexIfNotExists = (
export const getDocument = (elasticClient: EL.Client) => (
indexName: string,
document: IOutputDocument
): TE.TaskEither<EL.errors.ResponseError, GetResponse> =>
): TE.TaskEither<EL.errors.ResponseError, GetResponse<IOutputDocument>> =>
pipe(
TE.Do,
defaultLog.taskEither.info(`getAndIndexDocument => ${document}`),
() =>
TE.tryCatch(
() => elasticClient.get({ id: document.id, index: indexName }),
() =>
elasticClient.get<IOutputDocument>({
id: document.id,
index: indexName
}),
e => e as EL.errors.ResponseError
),
defaultLog.taskEither.infoLeft(
e => `Error getting document from index => ${String(e)}`
)
defaultLog.taskEither.infoLeft(_ => `Error getting document from index`)
);

export const indexDocument = (elasticClient: EL.Client) => (
Expand Down
2 changes: 1 addition & 1 deletion src/deduplication/elasticsearch/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "./elasticsearch";

export type OutputClient = EL.Client;
export type OutputDataRead = GetResponse;
export type OutputDataRead = GetResponse<IOutputDocument>;
export type OutputDataWrite = EL.estypes.Result;

export interface IOutputDeduplicationService {
Expand Down

0 comments on commit 2571ff0

Please sign in to comment.