Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gracefull shutdown #7410

Open
wants to merge 1 commit into
base: production
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { tenants } from 'api/tenants';
import { TaskManager } from 'api/services/tasksmanager/TaskManager';
import { permissionsContext } from 'api/permissions/permissionsContext';
import { Logger } from 'api/log.v2/contracts/Logger';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';
import { InvalidATServerResponse } from '../../errors/generateATErrors';
import { AutomaticTranslationFactory } from '../../AutomaticTranslationFactory';
import { Validator } from '../../infrastructure/Validator';
Expand All @@ -11,23 +13,29 @@ export class ATServiceListener {

private taskManager: TaskManager;

constructor(ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory) {
constructor(
ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory,
logger: Logger = DefaultLogger()
) {
const validator = new Validator<TranslationResult>(translationResultSchema);
this.taskManager = new TaskManager({
serviceName: ATServiceListener.SERVICE_NAME,
processResults: async result => {
if (!validator.validate(result)) {
throw new InvalidATServerResponse(validator.getErrors()[0].message, {
cause: validator.getErrors()[0],
});
}
this.taskManager = new TaskManager(
{
serviceName: ATServiceListener.SERVICE_NAME,
processResults: async result => {
if (!validator.validate(result)) {
throw new InvalidATServerResponse(validator.getErrors()[0].message, {
cause: validator.getErrors()[0],
});
}

await tenants.run(async () => {
permissionsContext.setCommandContext();
await ATFactory.defaultSaveEntityTranslations().execute(result);
}, result.key[0]);
await tenants.run(async () => {
permissionsContext.setCommandContext();
await ATFactory.defaultSaveEntityTranslations().execute(result);
}, result.key[0]);
},
},
});
logger
);
}

start(interval = 500) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import RedisSMQ from 'rsmq';
import { UserSchema } from 'shared/types/userType';
import waitForExpect from 'wait-for-expect';
import { ATServiceListener } from '../ATServiceListener';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';

const prepareATFactory = (executeSpy: jest.Mock<any, any, any>) => {
// @ts-ignore
Expand Down Expand Up @@ -39,7 +40,7 @@ describe('ATServiceListener', () => {
userInContext = permissionsContext.getUserInContext();
});

listener = new ATServiceListener(prepareATFactory(executeSpy));
listener = new ATServiceListener(prepareATFactory(executeSpy), createMockLogger());
redisClient = Redis.createClient(redisUrl);
redisSMQ = new RedisSMQ({ client: redisClient });

Expand Down
4 changes: 3 additions & 1 deletion app/api/files/ocrRoutes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Application, Request, Response, NextFunction } from 'express';
import { storage } from 'api/files';
import needsAuthorization from 'api/auth/authMiddleware';
import { isOcrEnabled, ocrManager, getOcrStatus } from 'api/services/ocr/OcrManager';
import { isOcrEnabled, getOcrStatus, OcrManager } from 'api/services/ocr/OcrManager';
import { files } from './files';
import { validation, createError } from '../utils';

Expand Down Expand Up @@ -57,9 +57,11 @@ const ocrRoutes = (app: Application) => {
needsAuthorization(['admin', 'editor']),
validation.validateRequest(ocrRequestDecriptor),
async (req, res) => {
const ocrManager = new OcrManager();
const file = await fileFromRequest(req);

await ocrManager.addToQueue(file);
await ocrManager.stop();

res.sendStatus(200);
}
Expand Down
5 changes: 0 additions & 5 deletions app/api/files/specs/ocrRoutes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import request from 'supertest';
import { storage } from 'api/files';
import relationships from 'api/relationships';
import { search } from 'api/search';
import { ocrManager } from 'api/services/ocr/OcrManager';
import settings from 'api/settings/settings';
import { getFixturesFactory } from 'api/utils/fixturesFactory';
import db, { DBFixture } from 'api/utils/testing_db';
Expand Down Expand Up @@ -94,10 +93,6 @@ describe('OCR service', () => {
jest.spyOn(setupSockets, 'emitToTenant').mockImplementation(() => {});
});

beforeAll(() => {
ocrManager.start();
});

afterAll(async () => {
jest.restoreAllMocks();
await testingEnvironment.tearDown();
Expand Down
78 changes: 43 additions & 35 deletions app/api/services/convertToPDF/ConvertToPdfWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import path from 'path';
import { pipeline } from 'stream/promises';
import { TaskManager } from '../tasksmanager/TaskManager';
import { convertToPDFService } from './convertToPdfService';
import { Logger } from 'api/log.v2/contracts/Logger';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';

const ajv = new Ajv();

Expand Down Expand Up @@ -38,47 +40,53 @@ export class ConvertToPdfWorker {

taskManager: TaskManager;

constructor() {
this.taskManager = new TaskManager({
serviceName: this.SERVICE_NAME,
processResults: async result => {
if (result.success === false) {
throw new Error(result.error_message);
}
if (!validateResult(result)) {
throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]);
}
await tenants.run(async () => {
permissionsContext.setCommandContext();
const [attachment] = await files.get({ filename: result.params.filename });
if (!attachment.entity) {
throw new Error('attachment does not have an entity');
constructor(logger: Logger = DefaultLogger()) {
this.taskManager = new TaskManager(
{
serviceName: this.SERVICE_NAME,
processResults: async result => {
if (result.success === false) {
throw new Error(result.error_message);
}
await files.save({ ...attachment, status: 'ready' });
if (!validateResult(result)) {
throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]);
}
await tenants.run(async () => {
permissionsContext.setCommandContext();
const [attachment] = await files.get({ filename: result.params.filename });
if (!attachment.entity) {
throw new Error('attachment does not have an entity');
}
await files.save({ ...attachment, status: 'ready' });

const filename = `${generateFileName({})}.pdf`;
const filename = `${generateFileName({})}.pdf`;

await storage.storeFile(
filename,
await convertToPDFService.download(new URL(result.file_url)),
'document'
);
await pipeline(
await storage.readableFile(filename, 'document'),
createWriteStream(path.join(os.tmpdir(), filename))
);
await storage.storeFile(
filename,
await convertToPDFService.download(new URL(result.file_url)),
'document'
);
await pipeline(
await storage.readableFile(filename, 'document'),
createWriteStream(path.join(os.tmpdir(), filename))
);

await processDocument(attachment.entity, {
filename,
destination: os.tmpdir(),
originalname: chageFileExtesion(attachment.originalname || generateFileName({}), 'pdf'),
mimetype: 'application/pdf',
});
await processDocument(attachment.entity, {
filename,
destination: os.tmpdir(),
originalname: chageFileExtesion(
attachment.originalname || generateFileName({}),
'pdf'
),
mimetype: 'application/pdf',
});

emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity);
}, result.params.namespace);
emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity);
}, result.params.namespace);
},
},
});
logger
);
}

start(interval = 500) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import * as handleError from 'api/utils/handleError.js';
import { ObjectId } from 'mongodb';
import Redis from 'redis';
import RedisSMQ from 'rsmq';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';
import waitForExpect from 'wait-for-expect';
import { convertToPDFService } from '../convertToPdfService';
import { ConvertToPdfWorker } from '../ConvertToPdfWorker';

describe('convertToPdfWorker', () => {
const worker = new ConvertToPdfWorker();
const worker = new ConvertToPdfWorker(createMockLogger());
const redisUrl = `redis://${config.redis.host}:${config.redis.port}`;
const redisClient = Redis.createClient(redisUrl);
const redisSMQ = new RedisSMQ({ client: redisClient });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class InformationExtraction {
this.taskManager.subscribeToResults();
}

async stop() {
await this.taskManager.stop();
}

requestResults = async (message: InternalIXResultsMessage) => {
const response = await request.get(message.data_url);

Expand Down
18 changes: 11 additions & 7 deletions app/api/services/ocr/OcrManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
markError,
markReady,
} from './ocrRecords';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';
import { Logger } from 'api/log.v2/contracts/Logger';

interface OcrSettings {
url: string;
Expand Down Expand Up @@ -182,11 +184,14 @@ class OcrManager {

ocrTaskManager: TaskManager;

constructor() {
this.ocrTaskManager = new TaskManager({
serviceName: this.SERVICE_NAME,
processResults,
});
constructor(logger: Logger = DefaultLogger()) {
this.ocrTaskManager = new TaskManager(
{
serviceName: this.SERVICE_NAME,
processResults,
},
logger
);
}

start() {
Expand Down Expand Up @@ -227,5 +232,4 @@ class OcrManager {
}
}

const ocrManager = new OcrManager();
export { ocrManager, OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus };
export { OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus };
6 changes: 4 additions & 2 deletions app/api/services/ocr/specs/OcrManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { ResultsMessage, TaskManager } from '../../tasksmanager/TaskManager';
import { mockTaskManagerImpl } from '../../tasksmanager/specs/TaskManagerImplementationMocker';
import { fixtures, fixturesFactory } from './fixtures/fixtures';
import { cleanupRecordsOfFiles } from '../ocrRecords';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';

jest.mock('api/services/tasksmanager/TaskManager.ts');

Expand Down Expand Up @@ -99,11 +100,11 @@ describe('OcrManager', () => {
success: true,
};

ocrManager = new OcrManager();
ocrManager = new OcrManager(createMockLogger());
ocrManager.start();
});

beforeEach(() => {
beforeEach(async () => {
mocks.jestMocks['storage.fileContents'] = jest
.spyOn(storage, 'fileContents')
.mockResolvedValue(Buffer.from('file_content'));
Expand All @@ -112,6 +113,7 @@ describe('OcrManager', () => {
afterAll(async () => {
mocks.release();
await testingEnvironment.tearDown();
await ocrManager.stop();
});

describe('on success', () => {
Expand Down
4 changes: 4 additions & 0 deletions app/api/services/pdfsegmentation/PDFSegmentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class PDFSegmentation {
this.segmentationTaskManager.subscribeToResults();
}

async stop() {
await this.segmentationTaskManager.stop();
}

segmentOnePdf = async (
file: { filename: string; _id: ObjectIdSchema },
serviceUrl: string,
Expand Down
Loading
Loading