diff --git a/__tests__/core/connection.ts b/__tests__/core/connection.ts index a81d14cb..d40289a0 100644 --- a/__tests__/core/connection.ts +++ b/__tests__/core/connection.ts @@ -1,4 +1,4 @@ -import * as Ioredis from "ioredis"; +import * as IORedis from "ioredis"; import { Connection } from "../../src"; import specHelper from "../utils/specHelper"; @@ -21,6 +21,7 @@ describe("connection", () => { test("it has loaded Lua commands", async () => { const connection = new Connection(specHelper.cleanConnectionDetails()); await connection.connect(); + //@ts-ignore expect(typeof connection.redis["popAndStoreJob"]).toBe("function"); connection.end(); }); @@ -33,10 +34,10 @@ describe("connection", () => { await connection.connect(); }); - let prefixedConnection; - let prefixedRedis; + let prefixedConnection: Connection; + let prefixedRedis: IORedis.Redis; beforeAll(async () => { - prefixedRedis = new Ioredis(null, null, { + prefixedRedis = new IORedis(null, null, { keyPrefix: "customNamespace:", db: db, }); diff --git a/__tests__/core/connectionError.ts b/__tests__/core/connectionError.ts index 758af59b..f2ab45ce 100644 --- a/__tests__/core/connectionError.ts +++ b/__tests__/core/connectionError.ts @@ -1,7 +1,7 @@ import { Connection } from "../../src"; import specHelper from "../utils/specHelper"; -describe("queue", () => { +describe("connection error", () => { test("can provide an error if connection failed", async () => { await new Promise(async (resolve) => { const connectionDetails = { diff --git a/__tests__/core/multiWorker.ts b/__tests__/core/multiWorker.ts index 58683e40..70591baf 100644 --- a/__tests__/core/multiWorker.ts +++ b/__tests__/core/multiWorker.ts @@ -7,7 +7,7 @@ const checkTimeout = specHelper.timeout / 10; const minTaskProcessors = 1; const maxTaskProcessors = 5; -const blockingSleep = (naptime) => { +const blockingSleep = (naptime: number) => { let sleeping = true; const now = new Date(); let alarm; @@ -23,7 +23,7 @@ const blockingSleep = (naptime) => { const jobs = { slowSleepJob: { - plugins: [], + plugins: [] as string[], pluginOptions: {}, perform: async () => { await new Promise((resolve) => { @@ -34,7 +34,7 @@ const jobs = { }, }, slowCPUJob: { - plugins: [], + plugins: [] as string[], pluginOptions: {}, perform: async () => { blockingSleep(1000); diff --git a/__tests__/core/queue.ts b/__tests__/core/queue.ts index 27d74e14..00d76183 100644 --- a/__tests__/core/queue.ts +++ b/__tests__/core/queue.ts @@ -1,4 +1,10 @@ -import { Queue, Worker } from "../../src"; +import { + ParsedJob, + ParsedFailedJobPayload, + Queue, + Worker, + Job, +} from "../../src"; import specHelper from "../utils/specHelper"; let queue: Queue; @@ -50,13 +56,13 @@ describe("queue", () => { ); expect(String(score)).toBe("10"); - let obj = await specHelper.redis.lpop( + const str = await specHelper.redis.lpop( specHelper.namespace + ":delayed:" + "10" ); - expect(obj).toBeDefined(); - obj = JSON.parse(obj); - expect(obj.class).toBe("someJob"); - expect(obj.args).toEqual([1, 2, 3]); + expect(str).toBeDefined(); + const job = JSON.parse(str) as ParsedJob; + expect(job.class).toBe("someJob"); + expect(job.args).toEqual([1, 2, 3]); }); test("can add delayed job whose timestamp is a string (enqueueAt)", async () => { @@ -68,13 +74,13 @@ describe("queue", () => { ); expect(String(score)).toBe("10"); - let obj = await specHelper.redis.lpop( + let str = await specHelper.redis.lpop( specHelper.namespace + ":delayed:" + "10" ); - expect(obj).toBeDefined(); - obj = JSON.parse(obj); - expect(obj.class).toBe("someJob"); - expect(obj.args).toEqual([1, 2, 3]); + expect(str).toBeDefined(); + const job = JSON.parse(str) as ParsedJob; + expect(job.class).toBe("someJob"); + expect(job.args).toEqual([1, 2, 3]); }); test("will not enqueue a delayed job at the same time with matching params with error", async () => { @@ -108,17 +114,17 @@ describe("queue", () => { await queue.enqueueIn(5 * 1000, specHelper.queue, "someJob", [1, 2, 3]); const score = await specHelper.redis.zscore( specHelper.namespace + ":delayed_queue_schedule", - now + now.toString() ); expect(String(score)).toBe(String(now)); - let obj = await specHelper.redis.lpop( + let str = await specHelper.redis.lpop( specHelper.namespace + ":delayed:" + now ); - expect(obj).toBeDefined(); - obj = JSON.parse(obj); - expect(obj.class).toBe("someJob"); - expect(obj.args).toEqual([1, 2, 3]); + expect(str).toBeDefined(); + const job = JSON.parse(str) as ParsedJob; + expect(job.class).toBe("someJob"); + expect(job.args).toEqual([1, 2, 3]); }); test("can add a delayed job whose time is a string (enqueueIn)", async () => { @@ -128,17 +134,17 @@ describe("queue", () => { await queue.enqueueIn(time, specHelper.queue, "someJob", [1, 2, 3]); const score = await specHelper.redis.zscore( specHelper.namespace + ":delayed_queue_schedule", - now + now.toString() ); expect(String(score)).toBe(String(now)); - let obj = await specHelper.redis.lpop( + let str = await specHelper.redis.lpop( specHelper.namespace + ":delayed:" + now ); - expect(obj).toBeDefined(); - obj = JSON.parse(obj); - expect(obj.class).toBe("someJob"); - expect(obj.args).toEqual([1, 2, 3]); + expect(str).toBeDefined(); + const job = JSON.parse(str) as ParsedJob; + expect(job.class).toBe("someJob"); + expect(job.args).toEqual([1, 2, 3]); }); test("can get the number of jobs currently enqueued", async () => { @@ -165,7 +171,7 @@ describe("queue", () => { [1, 2, 3] ); expect(timestamps.length).toBe(1); - expect(timestamps[0]).toBe("10"); + expect(timestamps[0]).toBe(10); }); test("will not match previously scheduled jobs with differnt args", async () => { @@ -367,7 +373,7 @@ describe("queue", () => { describe("failed job managment", () => { beforeEach(async () => { - const errorPayload = function (id) { + const errorPayload = function (id: number) { return JSON.stringify({ worker: "busted-worker-" + id, queue: "busted-queue", @@ -494,8 +500,8 @@ describe("queue", () => { }); describe("worker status", () => { - let workerA; - let workerB; + let workerA: Worker; + let workerB: Worker; const timeout = 500; const jobs = { @@ -505,7 +511,7 @@ describe("queue", () => { setTimeout(resolve, timeout); }); }, - }, + } as Job, }; beforeEach(async () => { @@ -599,10 +605,10 @@ describe("queue", () => { expect(cleanData.workerA.payload.class).toBe("slowJob"); expect(cleanData.workerA.payload.args[0].a).toBe(1); - let failedData = await specHelper.redis.rpop( + let str = await specHelper.redis.rpop( specHelper.namespace + ":" + "failed" ); - failedData = JSON.parse(failedData); + const failedData = JSON.parse(str) as ParsedFailedJobPayload; expect(failedData.queue).toBe(specHelper.queue); expect(failedData.exception).toBe( "Worker Timeout (killed manually)" diff --git a/__tests__/core/scheduler.ts b/__tests__/core/scheduler.ts index 26b5674b..476025e4 100644 --- a/__tests__/core/scheduler.ts +++ b/__tests__/core/scheduler.ts @@ -1,4 +1,10 @@ -import { Queue, Scheduler, Worker } from "../../src"; +import { + Queue, + Scheduler, + Worker, + Job, + ParsedFailedJobPayload, +} from "../../src"; import specHelper from "../utils/specHelper"; let scheduler: Scheduler; @@ -118,7 +124,7 @@ describe("scheduler", () => { }); describe("stuck workers", () => { - let worker; + let worker: Worker; const jobs = { stuck: { perform: async function () { @@ -128,7 +134,7 @@ describe("scheduler", () => { clearTimeout(this.pingTimer); }); }, - }, + } as Job, }; beforeAll(async () => { @@ -154,7 +160,7 @@ describe("scheduler", () => { await worker.start(); const workers = await queue.allWorkingOn(); - const h = {}; + const h: { [key: string]: any } = {}; h[worker.name] = "started"; expect(workers).toEqual(h); @@ -174,10 +180,10 @@ describe("scheduler", () => { expect(await queue.allWorkingOn()).toEqual({}); // check the failed list - let failed = await specHelper.redis.rpop( + const str = await specHelper.redis.rpop( specHelper.namespace + ":" + "failed" ); - failed = JSON.parse(failed); + const failed = JSON.parse(str) as ParsedFailedJobPayload; expect(failed.queue).toBe("stuckJobs"); expect(failed.exception).toBe( "Worker Timeout (killed manually)" diff --git a/__tests__/core/worker.ts b/__tests__/core/worker.ts index a191bdce..a99bcbd6 100644 --- a/__tests__/core/worker.ts +++ b/__tests__/core/worker.ts @@ -1,23 +1,24 @@ -import { Queue, Worker } from "../../src"; +import { ParsedFailedJobPayload, Job, Queue, Worker } from "../../src"; import specHelper from "../utils/specHelper"; -const jobs = { +const jobs: { [key: string]: Job } = { add: { perform: (a, b) => { return a + b; }, - }, + } as Job, + //@ts-ignore badAdd: { perform: () => { throw new Error("Blue Smoke"); }, - }, + } as Job, messWithData: { perform: (a) => { a.data = "new thing"; return a; }, - }, + } as Job, async: { perform: async () => { await new Promise((resolve) => { @@ -25,7 +26,7 @@ const jobs = { }); return "yay"; }, - }, + } as Job, twoSeconds: { perform: async () => { await new Promise((resolve) => { @@ -33,7 +34,8 @@ const jobs = { }); return "slow"; }, - }, + } as Job, + //@ts-ignore quickDefine: async () => { return "ok"; }, @@ -268,10 +270,10 @@ describe("worker", () => { }); test("will place failed jobs in the failed queue", async () => { - let data = await specHelper.redis.rpop( + let str = await specHelper.redis.rpop( specHelper.namespace + ":" + "failed" ); - data = JSON.parse(data); + const data = JSON.parse(str) as ParsedFailedJobPayload; expect(data.queue).toBe(specHelper.queue); expect(data.exception).toBe("Error"); expect(data.error).toBe('No job defined for class "somethingFake"'); diff --git a/__tests__/integration/ioredis-mock.ts b/__tests__/integration/ioredis-mock.ts index 0fe9dd72..a38174a0 100644 --- a/__tests__/integration/ioredis-mock.ts +++ b/__tests__/integration/ioredis-mock.ts @@ -1,7 +1,10 @@ -import { Queue, Worker, Scheduler } from "../../src"; -import * as RedisMock from "ioredis-mock"; +import { Queue, Worker, Scheduler, Job } from "../../src"; import specHelper from "../utils/specHelper"; +// import * as RedisMock from "ioredis-mock"; // TYPE HACK! +import * as IORedis from "ioredis"; +const RedisMock: typeof IORedis = require("ioredis-mock"); + // for ioredis-mock, we need to re-use a shared connection // setting "pkg" is important! const REDIS = new RedisMock(); @@ -13,7 +16,7 @@ const jobs = { const response = a + b; return response; }, - }, + } as Job, }; describe("testing with ioredis-mock package", () => { @@ -28,18 +31,12 @@ describe("testing with ioredis-mock package", () => { }); test("a queue can be created", async () => { - queue = new Queue( - { connection: connectionDetails, queues: ["math"] }, - jobs - ); + queue = new Queue({ connection: connectionDetails }, jobs); await queue.connect(); }); test("a scheduler can be created", async () => { - scheduler = new Scheduler( - { connection: connectionDetails, queues: ["math"] }, - jobs - ); + scheduler = new Scheduler({ connection: connectionDetails }, jobs); await scheduler.connect(); // await scheduler.start(); }); diff --git a/__tests__/integration/ioredis.ts b/__tests__/integration/ioredis.ts index 3f158cf9..1058ef4a 100644 --- a/__tests__/integration/ioredis.ts +++ b/__tests__/integration/ioredis.ts @@ -1,10 +1,9 @@ -import { Queue, Worker, Scheduler } from "../../src"; +import { Queue, Worker, Scheduler, Job } from "../../src"; import specHelper from "../utils/specHelper"; const connectionDetails = { pkg: "ioredis", host: "127.0.0.1", - password: null, port: 6379, database: parseInt(process.env.JEST_WORKER_ID || "0"), }; @@ -15,7 +14,7 @@ const jobs = { const response = a + b; return response; }, - }, + } as Job, }; describe("testing with ioredis package", () => { @@ -30,18 +29,12 @@ describe("testing with ioredis package", () => { }); test("a queue can be created", async () => { - queue = new Queue( - { connection: connectionDetails, queues: ["math"] }, - jobs - ); + queue = new Queue({ connection: connectionDetails }, jobs); await queue.connect(); }); test("a scheduler can be created", async () => { - scheduler = new Scheduler( - { connection: connectionDetails, queues: ["math"] }, - jobs - ); + scheduler = new Scheduler({ connection: connectionDetails }, jobs); await scheduler.connect(); // await scheduler.start(); }); diff --git a/__tests__/plugins/custom_plugins.ts b/__tests__/plugins/custom_plugins.ts index 2316bec7..34228908 100644 --- a/__tests__/plugins/custom_plugins.ts +++ b/__tests__/plugins/custom_plugins.ts @@ -1,17 +1,18 @@ import specHelper from "../utils/specHelper"; -import { Queue } from "../../src"; +import { Queue, Job } from "../../src"; import { CustomPlugin } from "../utils/custom-plugin"; describe("plugins", () => { describe("custom plugins", () => { test("runs a custom plugin outside of the plugins directory", async () => { const jobs = { + //@ts-ignore myJob: { plugins: [CustomPlugin], - perform: function (a, b, callback) { + perform: async () => { throw new Error("should not get here"); }, - }, + } as Job, }; const queue = new Queue( diff --git a/__tests__/plugins/delayedQueueLock.ts b/__tests__/plugins/delayedQueueLock.ts index 17c008eb..7e37f307 100644 --- a/__tests__/plugins/delayedQueueLock.ts +++ b/__tests__/plugins/delayedQueueLock.ts @@ -1,14 +1,14 @@ import specHelper from "../utils/specHelper"; -import { Plugin, Queue, Plugins, Worker } from "../../src"; +import { Queue, Plugins } from "../../src"; -let queue; +let queue: Queue; const jobDelay = 100; const jobs = { slowAdd: { plugins: [Plugins.JobLock], pluginOptions: { jobLock: {} }, - perform: async (a, b) => { + perform: async (a: number, b: number) => { const answer = a + b; await new Promise((resolve) => { setTimeout(resolve, jobDelay); @@ -19,7 +19,7 @@ const jobs = { uniqueJob: { plugins: [Plugins.DelayQueueLock], pluginOptions: { queueLock: {}, delayQueueLock: {} }, - perform: async (a, b) => { + perform: async (a: number, b: number) => { const answer = a + b; return answer; }, @@ -33,7 +33,7 @@ describe("plugins", () => { queue = new Queue( { connection: specHelper.cleanConnectionDetails(), - queues: [specHelper.queue], + queue: [specHelper.queue], }, jobs ); diff --git a/__tests__/plugins/jobLock.ts b/__tests__/plugins/jobLock.ts index 38e36db3..24fdcfa6 100644 --- a/__tests__/plugins/jobLock.ts +++ b/__tests__/plugins/jobLock.ts @@ -1,16 +1,16 @@ import specHelper from "../utils/specHelper"; -import { Queue, Plugins, Worker } from "../../src"; +import { Queue, Plugins, Worker, ParsedJob, Job } from "../../src"; -let queue; +let queue: Queue; const jobDelay = 1000; -let worker1; -let worker2; +let worker1: Worker; +let worker2: Worker; const jobs = { slowAdd: { plugins: [Plugins.JobLock], pluginOptions: { jobLock: {} }, - perform: async (a, b) => { + perform: async (a: number, b: number) => { const answer = a + b; await new Promise((resolve) => { setTimeout(resolve, jobDelay); @@ -36,7 +36,7 @@ describe("plugins", () => { queue = new Queue( { connection: specHelper.cleanConnectionDetails(), - queues: [specHelper.queue], + queue: [specHelper.queue], }, jobs ); @@ -85,7 +85,7 @@ describe("plugins", () => { const startTime = new Date().getTime(); let completed = 0; - const onComplete = function (q, job, result) { + const onComplete = function () { completed++; if (completed === 2) { worker1.end(); @@ -117,6 +117,7 @@ describe("plugins", () => { let calls = 0; const functionJobs = { + //@ts-ignore jobLockAdd: { plugins: [Plugins.JobLock], pluginOptions: { @@ -135,10 +136,10 @@ describe("plugins", () => { }, }, }, - perform: (a, b) => { + perform: (a: number, b: number) => { return a + b; }, - }, + } as Job, }; worker1 = new Worker( @@ -195,13 +196,13 @@ describe("plugins", () => { await worker2.end(); const timestamps = await queue.timestamps(); - let dealyedJob = await specHelper.redis.lpop( + let str = await specHelper.redis.lpop( specHelper.namespace + ":delayed:" + Math.round(timestamps[0] / 1000) ); - expect(dealyedJob).toBeDefined(); - dealyedJob = JSON.parse(dealyedJob); + expect(str).toBeDefined(); + const dealyedJob = JSON.parse(str) as ParsedJob; expect(dealyedJob.class).toBe("slowAdd"); expect(dealyedJob.args).toEqual([1, 2]); @@ -304,7 +305,7 @@ describe("plugins", () => { const startTime = new Date().getTime(); let completed = 0; - const onComplete = async function (q, job, result) { + const onComplete = async function () { completed++; if (completed === 2) { await worker1.end(); diff --git a/__tests__/plugins/noop.ts b/__tests__/plugins/noop.ts index 78bbf486..47357f42 100644 --- a/__tests__/plugins/noop.ts +++ b/__tests__/plugins/noop.ts @@ -1,8 +1,8 @@ import specHelper from "../utils/specHelper"; -import { Scheduler, Plugins, Queue, Worker } from "../../src"; +import { Scheduler, Plugins, Queue, Worker, Job } from "../../src"; -let queue; -let scheduler; +let queue: Queue; +let scheduler: Scheduler; let loggedErrors = []; const jobs = { @@ -10,7 +10,7 @@ const jobs = { plugins: [Plugins.Noop], pluginOptions: { Noop: { - logger: (error) => { + logger: (error: Error) => { loggedErrors.push(error); }, }, @@ -18,20 +18,20 @@ const jobs = { perform: () => { throw new Error("BUSTED"); }, - }, + } as Job, happyJob: { plugins: [Plugins.Noop], pluginOptions: { Noop: { - logger: (error) => { + logger: (error: Error) => { loggedErrors.push(error); }, }, }, - perform: function () { + perform: async () => { // nothing }, - }, + } as Job, }; describe("plugins", () => { diff --git a/__tests__/plugins/queueLock.ts b/__tests__/plugins/queueLock.ts index 9714649f..da589c47 100644 --- a/__tests__/plugins/queueLock.ts +++ b/__tests__/plugins/queueLock.ts @@ -1,22 +1,21 @@ import specHelper from "../utils/specHelper"; -import { Plugin, Plugins, Queue, Worker } from "../../src"; - -let queue; +import { Plugin, Plugins, Queue, Worker, Job } from "../../src"; +let queue: Queue; class NeverRunPlugin extends Plugin { - beforeEnqueue() { + async beforeEnqueue() { return true; } - afterEnqueue() { + async afterEnqueue() { return true; } - beforePerform() { + async beforePerform() { return false; } - afterPerform() { + async afterPerform() { return true; } } @@ -26,11 +25,11 @@ const jobs = { plugins: [Plugins.QueueLock], pluginOptions: { queueLock: {}, delayQueueLock: {} }, perform: (a, b) => a + b, - }, + } as Job, blockingJob: { plugins: [Plugins.QueueLock, NeverRunPlugin], perform: (a, b) => a + b, - }, + } as Job, jobWithLockTimeout: { plugins: [Plugins.QueueLock], pluginOptions: { @@ -39,7 +38,7 @@ const jobs = { }, }, perform: (a, b) => a + b, - }, + } as Job, stuckJob: { plugins: [Plugins.QueueLock], pluginOptions: { @@ -47,10 +46,10 @@ const jobs = { lockTimeout: specHelper.smallTimeout, }, }, - perform: (a, b) => { + perform: async (a, b) => { a + b; }, - }, + } as Job, }; describe("plugins", () => { @@ -153,7 +152,7 @@ describe("plugins", () => { }); describe("with worker", () => { - let worker; + let worker: Worker; beforeEach(async () => { worker = new Worker( diff --git a/__tests__/plugins/retry.ts b/__tests__/plugins/retry.ts index b12938f6..ebc7abd3 100644 --- a/__tests__/plugins/retry.ts +++ b/__tests__/plugins/retry.ts @@ -1,10 +1,17 @@ import specHelper from "../utils/specHelper"; -import { Scheduler, Plugins, Queue, Worker } from "../../src"; - -let queue; -let scheduler; - -const jobs = { +import { + Scheduler, + Plugins, + Queue, + Worker, + Job, + ParsedFailedJobPayload, +} from "../../src"; + +let queue: Queue; +let scheduler: Scheduler; + +const jobs: { [key: string]: Job } = { brokenJob: { plugins: [Plugins.Retry], pluginOptions: { @@ -25,6 +32,7 @@ const jobs = { retryDelay: 100, }, }, + //@ts-ignore perform: () => { // no return }, @@ -198,6 +206,7 @@ describe("plugins", () => { test("can have custom retry times set", async () => { await new Promise(async (resolve) => { const customJobs = { + //@ts-ignore jobWithBackoffStrategy: { plugins: [Plugins.Retry], pluginOptions: { @@ -209,7 +218,7 @@ describe("plugins", () => { perform: function (a, b, callback) { callback(new Error("BUSTED"), null); }, - }, + } as Job, }; await queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1, 2]); @@ -347,11 +356,11 @@ describe("plugins", () => { `${specHelper.namespace}:failure-resque-retry:brokenJob:1-2` ); expect(String(retryAttempts)).toBe("0"); - failureData = JSON.parse(failureData); - expect(failureData.payload).toEqual([1, 2]); - expect(failureData.exception).toBe("Error: BUSTED"); - expect(failureData.worker).toBe("brokenJob"); - expect(failureData.queue).toBe("test_queue"); + const failure = JSON.parse(failureData) as ParsedFailedJobPayload; + expect(failure.payload).toEqual([1, 2]); + expect(failure.exception).toBe("Error: BUSTED"); + expect(failure.worker).toBe("brokenJob"); + expect(failure.queue).toBe("test_queue"); await worker.end(); resolve(null); }); diff --git a/__tests__/utils/specHelper.ts b/__tests__/utils/specHelper.ts index b08208b4..f1c7133c 100644 --- a/__tests__/utils/specHelper.ts +++ b/__tests__/utils/specHelper.ts @@ -1,4 +1,4 @@ -import * as Redis from "ioredis"; +import * as IORedis from "ioredis"; import * as NodeResque from "../../src/index"; const namespace = `resque-test-${process.env.JEST_WORKER_ID || 0}`; @@ -11,7 +11,7 @@ const SpecHelper = { queue: queue, timeout: 500, smallTimeout: 3, - redis: null, + redis: null as IORedis.Redis, connectionDetails: { pkg: pkg, host: process.env.REDIS_HOST || "127.0.0.1", @@ -26,7 +26,7 @@ const SpecHelper = { if (!this.connectionDetails.options) this.connectionDetails.options = {}; this.connectionDetails.options.db = this.connectionDetails?.options?.database; - this.redis = new Redis( + this.redis = new IORedis( this.connectionDetails.port, this.connectionDetails.host, this.connectionDetails.options @@ -59,13 +59,14 @@ const SpecHelper = { delete this.connectionDetails.redis; }, - startAll: async function (jobs) { + startAll: async function (jobs: NodeResque.Jobs) { const Worker = NodeResque.Worker; const Scheduler = NodeResque.Scheduler; const Queue = NodeResque.Queue; this.worker = new Worker( { + //@ts-ignore connection: { redis: this.redis }, queues: this.queue, timeout: this.timeout, @@ -78,6 +79,7 @@ const SpecHelper = { connection: { redis: this.redis }, timeout: this.timeout, }); + await this.scheduler.connect(); this.queue = new Queue({ connection: { redis: this.redis } }); @@ -106,6 +108,7 @@ const SpecHelper = { for (const i in this.connectionDetails) { if (i !== "redis") { + //@ts-ignore out[i] = this.connectionDetails[i]; } } diff --git a/src/core/connection.ts b/src/core/connection.ts index cf294451..a957eefd 100644 --- a/src/core/connection.ts +++ b/src/core/connection.ts @@ -9,7 +9,7 @@ interface EventListeners { } export class Connection extends EventEmitter { - options: ConnectionOptions | null; + options: ConnectionOptions; private eventListeners: EventListeners; connected: boolean; redis: IORedis.Redis | IORedis.Cluster; @@ -17,21 +17,13 @@ export class Connection extends EventEmitter { constructor(options: ConnectionOptions = {}) { super(); - const defaults = { - pkg: "ioredis", - host: "127.0.0.1", - port: 6379, - database: 0, - namespace: "resque", - options: {}, - scanCount: 10, - }; - - for (const i in defaults) { - if (options[i] === null || options[i] === undefined) { - options[i] = defaults[i]; - } - } + options.pkg = options.pkg ?? "ioredis"; + options.host = options.host ?? "127.0.0.1"; + options.port = options.port ?? 6379; + options.database = options.database ?? 0; + options.namespace = options.namespace ?? "resque"; + options.scanCount = options.scanCount ?? 10; + options.options = options.options ?? {}; this.options = options; this.eventListeners = {}; @@ -77,7 +69,7 @@ export class Connection extends EventEmitter { } } - this.eventListeners.error = (error) => { + this.eventListeners.error = (error: Error) => { this.emit("error", error); }; this.eventListeners.end = () => { @@ -114,7 +106,12 @@ export class Connection extends EventEmitter { } } - async getKeys(match: string, count: number = null, keysAry = [], cursor = 0) { + async getKeys( + match: string, + count: number = null, + keysAry: string[] = [], + cursor = 0 + ): Promise { if (count === null || count === undefined) { count = this.options.scanCount || 10; } @@ -131,10 +128,7 @@ export class Connection extends EventEmitter { keysAry = keysAry.concat(matches); } - if (newCursor === "0") { - return keysAry; - } - + if (newCursor === "0") return keysAry; return this.getKeys(match, count, keysAry, parseInt(newCursor)); } @@ -148,7 +142,7 @@ export class Connection extends EventEmitter { end() { Object.keys(this.listeners).forEach((eventName) => { - this.redis.removeListener(eventName, this.listeners[eventName]); + this.redis.removeAllListeners(eventName); }); // Only disconnect if we established the redis connection on our own. @@ -172,7 +166,7 @@ export class Connection extends EventEmitter { } else { args.unshift(this.options.namespace); } - args = args.filter((e) => { + args = args.filter((e: any) => { return String(e).trim(); }); return args.join(":"); diff --git a/src/core/multiWorker.ts b/src/core/multiWorker.ts index c24fdd3d..45e59073 100644 --- a/src/core/multiWorker.ts +++ b/src/core/multiWorker.ts @@ -75,24 +75,15 @@ export declare interface MultiWorker { } export class MultiWorker extends EventEmitter { - constructor(options, jobs) { + constructor(options: MultiWorkerOptions, jobs: Jobs) { super(); - const defaults = { - // all times in ms - minTaskProcessors: 1, - maxTaskProcessors: 10, - timeout: 5000, - checkTimeout: 500, - maxEventLoopDelay: 10, - name: os.hostname(), - }; - - for (const i in defaults) { - if (options[i] === null || options[i] === undefined) { - options[i] = defaults[i]; - } - } + options.name = options.name ?? os.hostname(); + options.minTaskProcessors = options.minTaskProcessors ?? 1; + options.maxTaskProcessors = options.maxTaskProcessors ?? 10; + options.timeout = options.timeout ?? 5000; + options.checkTimeout = options.checkTimeout ?? 500; + options.maxEventLoopDelay = options.maxEventLoopDelay ?? 10; if ( options.connection.redis && @@ -122,7 +113,7 @@ export class MultiWorker extends EventEmitter { EventLoopDelay( this.options.maxEventLoopDelay, this.options.checkTimeout, - (blocked, ms) => { + (blocked: boolean, ms: number) => { this.eventLoopBlocked = blocked; this.eventLoopDelay = ms; this.eventLoopCheckCounter++; @@ -247,7 +238,7 @@ export class MultiWorker extends EventEmitter { if (verb === "--") { this.stopInProcess = true; - const promises = []; + const promises: Promise[] = []; this.workers.forEach((worker) => { promises.push( new Promise(async (resolve) => { @@ -271,7 +262,7 @@ export class MultiWorker extends EventEmitter { } } - private async cleanupWorker(worker) { + private async cleanupWorker(worker: Worker) { [ "start", "end", @@ -286,7 +277,7 @@ export class MultiWorker extends EventEmitter { "pause", "internalError", "multiWorkerAction", - ].forEach(function (e) { + ].forEach((e) => { worker.removeAllListeners(e); }); } @@ -314,7 +305,7 @@ export class MultiWorker extends EventEmitter { return this.stop(); } - private async stopWait() { + private async stopWait(): Promise { if ( this.workers.length === 0 && this.working === false && diff --git a/src/core/plugin.ts b/src/core/plugin.ts index e2373e34..34443e6c 100644 --- a/src/core/plugin.ts +++ b/src/core/plugin.ts @@ -1,6 +1,6 @@ import { Worker } from "./worker"; import { Connection } from "./connection"; -import { Queue } from "./queue"; +import { ParsedJob, Queue } from "./queue"; export abstract class Plugin { name: string; @@ -8,15 +8,22 @@ export abstract class Plugin { queueObject: Queue; queue: string; func: string; - job: { - [key: string]: any; - }; + job: ParsedJob; args: Array; options: { [key: string]: any; }; - constructor(worker, func, queue, job, args, options) { + constructor( + worker: Queue | Worker, + func: string, + queue: string, + job: ParsedJob, + args: Array, + options: { + [key: string]: any; + } + ) { this.name = this?.constructor?.name || "Node Resque Plugin"; this.worker = worker; this.queue = queue; @@ -32,8 +39,8 @@ export abstract class Plugin { } } - abstract beforeEnqueue?(): void; - abstract afterEnqueue?(): void; - abstract beforePerform?(): void; - abstract afterPerform?(): void; + abstract beforeEnqueue?(): Promise; + abstract afterEnqueue?(): Promise; + abstract beforePerform?(): Promise; + abstract afterPerform?(): Promise; } diff --git a/src/core/pluginRunner.ts b/src/core/pluginRunner.ts index aa0d13a1..5a4d3771 100644 --- a/src/core/pluginRunner.ts +++ b/src/core/pluginRunner.ts @@ -1,12 +1,21 @@ +import { Job } from "../types/options"; +import { Worker } from "./worker"; +import { Queue } from "./queue"; +import { Plugin } from "./plugin"; + +type PluginConstructor = new (...args: any[]) => T & { + [key: string]: Plugin; +}; + export async function RunPlugins( - self, - type, - func, - queue, - job, - args, - pluginCounter? -) { + self: Queue | Worker, + type: string, + func: string, + queue: string, + job: Job, + args: Array, + pluginCounter?: number +): Promise { if (!job) return true; if (!pluginCounter) pluginCounter = 0; if ( @@ -35,21 +44,22 @@ export async function RunPlugins( } export async function RunPlugin( - self, - PluginRefrence, - type, - func, - queue, - job, - args -) { + self: Queue | Worker, + PluginReference: string | PluginConstructor, + type: string, + func: string, + queue: string, + job: Job, + args: Array +): Promise { if (!job) return true; - let pluginName = PluginRefrence; - if (typeof PluginRefrence === "function") { - pluginName = new PluginRefrence(self, func, queue, job, args, {}).name; + let pluginName: string; + if (typeof PluginReference === "function") { + // @ts-ignore + pluginName = new PluginReference(self, func, queue, job, args, {}).name; } else if (typeof pluginName === "function") { - pluginName = pluginName.name; + pluginName = pluginName["name"]; } let pluginOptions = null; @@ -63,14 +73,15 @@ export async function RunPlugin( pluginOptions = {}; } - let plugin = null; - if (typeof PluginRefrence === "string") { - const PluginConstructor = require(`./../plugins/${PluginRefrence}`)[ - PluginRefrence + let plugin: { [key: string]: Plugin }; + if (typeof PluginReference === "string") { + const PluginConstructor = require(`./../plugins/${PluginReference}`)[ + PluginReference ]; plugin = new PluginConstructor(self, func, queue, job, args, pluginOptions); - } else if (typeof PluginRefrence === "function") { - plugin = new PluginRefrence(self, func, queue, job, args, pluginOptions); + } else if (typeof PluginReference === "function") { + // @ts-ignore + plugin = new PluginReference(self, func, queue, job, args, pluginOptions); } else { throw new Error("Plugin must be the constructor name or an object"); } @@ -83,5 +94,6 @@ export async function RunPlugin( return true; } + // @ts-ignore return plugin[type](); } diff --git a/src/core/queue.ts b/src/core/queue.ts index 57845aff..23645886 100644 --- a/src/core/queue.ts +++ b/src/core/queue.ts @@ -1,10 +1,35 @@ import { EventEmitter } from "events"; import * as os from "os"; import { ErrorPayload, Jobs, ConnectionOptions } from ".."; +import { QueueOptions } from "../types/options"; import { Connection } from "./connection"; import { RunPlugins } from "./pluginRunner"; -function arrayify(o) { +export type ParsedJob = { + class: string; + queue: string; + args: Array; + pluginOptions?: { [key: string]: any }; +}; + +export type ParsedWorkerPayload = { + run_at: string; + queue: string; + worker: string; + payload: ParsedJob; +}; + +export type ParsedFailedJobPayload = { + worker: string; + queue: string; + payload: ParsedJob; + failed_at: string; + exception: string; + error: string; + backtrace: string[]; +}; + +function arrayify(o: T): T[] { if (Array.isArray(o)) { return o; } else { @@ -14,14 +39,14 @@ function arrayify(o) { export declare interface Queue { connection: Connection; - options: ConnectionOptions; + options: QueueOptions; jobs: Jobs; on(event: "error", cb: (error: Error, queue: string) => void): this; once(event: "error", cb: (error: Error, queue: string) => void): this; } export class Queue extends EventEmitter { - constructor(options, jobs = {}) { + constructor(options: QueueOptions, jobs: Jobs = {}) { super(); this.options = options; @@ -59,9 +84,7 @@ export class Queue extends EventEmitter { const job = this.jobs[func]; const toRun = await RunPlugins(this, "beforeEnqueue", func, q, job, args); - if (toRun === false) { - return toRun; - } + if (toRun === false) return toRun; await this.connection.redis.sadd(this.connection.key("queues"), q); await this.connection.redis.rpush( @@ -197,7 +220,7 @@ export class Queue extends EventEmitter { let numJobsDeleted: number = 0; for (let i = 0; i < jobs.length; i++) { const jobEncoded = jobs[i]; - const { class: jobFunc } = JSON.parse(jobEncoded); + const { class: jobFunc } = JSON.parse(jobEncoded) as ParsedJob; if (jobFunc === func) { await this.connection.redis.lrem( this.connection.key("queue", q), @@ -243,7 +266,7 @@ export class Queue extends EventEmitter { * - `timestampsForJob` is an array of integers */ async scheduledAt(q: string, func: string, args: Array = []) { - const timestamps = []; + const timestamps: number[] = []; args = arrayify(args); const search = this.encode(q, func, args); @@ -251,7 +274,7 @@ export class Queue extends EventEmitter { this.connection.key("timestamps:" + search) ); members.forEach((key) => { - timestamps.push(key.split(":")[key.split(":").length - 1]); + timestamps.push(parseInt(key.split(":")[key.split(":").length - 1])); }); return timestamps; @@ -261,7 +284,7 @@ export class Queue extends EventEmitter { * - `timestamps` is an array of integers for all timestamps which have at least one job scheduled in the future */ async timestamps() { - const results = []; + const results: number[] = []; const timestamps = await this.connection.getKeys( this.connection.key("delayed:*") ); @@ -285,7 +308,7 @@ export class Queue extends EventEmitter { -1 ); const tasks = items.map((i) => { - return JSON.parse(i); + return JSON.parse(i) as ParsedJob; }); return { tasks, rTimestamp }; } @@ -311,13 +334,13 @@ export class Queue extends EventEmitter { * - note that this operation can be very slow and very ram-heavy */ async allDelayed() { - const results = {}; + const results: { [key: string]: any[] } = {}; const timestamps = await this.timestamps(); for (const i in timestamps) { const timestamp = timestamps[i]; const { tasks, rTimestamp } = await this.delayedAt(timestamp); - results[rTimestamp * 1000] = tasks; + results[(rTimestamp * 1000).toString(10)] = tasks; } return results; @@ -329,7 +352,7 @@ export class Queue extends EventEmitter { */ async locks() { let keys: Array = []; - const data = {}; + const data: { [key: string]: string } = {}; let _keys: Array; let values = []; @@ -364,7 +387,7 @@ export class Queue extends EventEmitter { /** * - `count` is an integer. You might delete more than one lock by the name. */ - async delLock(key) { + async delLock(key: string) { return this.connection.redis.del(this.connection.key(key)); } @@ -401,7 +424,7 @@ export class Queue extends EventEmitter { /** * - returns: `{"run_at":"Fri Dec 12 2014 14:01:16 GMT-0800 (PST)","queue":"test_queue","payload":{"class":"slowJob","queue":"test_queue","args":[null]},"worker":"workerA"}` */ - async workingOn(workerName, queues) { + async workingOn(workerName: string, queues: string[]) { const fullWorkerName = workerName + ":" + queues; return this.connection.redis.get( this.connection.key("worker", fullWorkerName) @@ -412,11 +435,12 @@ export class Queue extends EventEmitter { * - returns a hash of the results of `queue.workingOn` with the worker names as keys. */ async allWorkingOn() { - const results: { [key: string]: any } = {}; + const results: { [key: string]: ParsedWorkerPayload } = {}; const workers = await this.workers(); for (const i in Object.keys(workers)) { const w = Object.keys(workers)[i]; + //@ts-ignore results[w] = "started"; let data = await this.workingOn(w, workers[w]); if (data) { @@ -500,9 +524,12 @@ export class Queue extends EventEmitter { const data = await this.allWorkingOn(); for (const i in Object.keys(data)) { const workerName = Object.keys(data)[i]; + const payload = data[workerName]; + if ( - data[workerName].run_at && - Date.now() - Date.parse(data[workerName].run_at) > age + typeof payload !== "string" && + payload.run_at && + Date.now() - Date.parse(payload.run_at) > age ) { const errorPayload = await this.forceCleanWorker(workerName); if (errorPayload && errorPayload.worker) { @@ -535,8 +562,9 @@ export class Queue extends EventEmitter { stop ); const results = data.map((i) => { - return JSON.parse(i); + return JSON.parse(i) as ParsedFailedJobPayload; }); + return results; } @@ -567,7 +595,7 @@ export class Queue extends EventEmitter { async retryStuckJobs(upperLimit = Infinity) { let start = 0; let batchSize = 100; - let failedJobs = []; + let failedJobs: ParsedFailedJobPayload[] = []; const loadFailedJobs = async () => { failedJobs = await this.failed(start, start + batchSize); diff --git a/src/core/scheduler.ts b/src/core/scheduler.ts index b3116d72..d584d67a 100644 --- a/src/core/scheduler.ts +++ b/src/core/scheduler.ts @@ -57,22 +57,14 @@ export type SchedulerEvent = | "transferredJob"; export class Scheduler extends EventEmitter { - constructor(options, jobs = {}) { + constructor(options: SchedulerOptions, jobs: Jobs = {}) { super(); - const defaults = { - timeout: 5000, // in ms - stuckWorkerTimeout: 60 * 60 * 1000, // 60 minutes in ms - leaderLockTimeout: 60 * 3, // in seconds - name: os.hostname() + ":" + process.pid, // assumes only one worker per node process - retryStuckJobs: false, - }; - - for (const i in defaults) { - if (options[i] === null || options[i] === undefined) { - options[i] = defaults[i]; - } - } + options.timeout = options.timeout ?? 5000; // in ms + options.stuckWorkerTimeout = options.stuckWorkerTimeout ?? 60 * 60 * 1000; // 60 minutes in ms + options.leaderLockTimeout = options.leaderLockTimeout ?? 60 * 3; // in seconds + options.name = options.name ?? os.hostname() + ":" + process.pid; // assumes only one worker per node process + options.retryStuckJobs = options.retryStuckJobs ?? false; this.options = options; this.name = this.options.name; @@ -135,7 +127,7 @@ export class Scheduler extends EventEmitter { } } - async poll() { + async poll(): Promise { this.processing = true; clearTimeout(this.timer); const isLeader = await this.tryForLeader(); @@ -311,7 +303,7 @@ export class Scheduler extends EventEmitter { } } - async forceCleanWorker(workerName, delta) { + async forceCleanWorker(workerName: string, delta: number) { const errorPayload = await this.queue.forceCleanWorker(workerName); this.emit("cleanStuckWorker", workerName, errorPayload, delta); } diff --git a/src/core/worker.ts b/src/core/worker.ts index 8ba1d155..00eceafd 100644 --- a/src/core/worker.ts +++ b/src/core/worker.ts @@ -4,10 +4,10 @@ import { Job, JobEmit, Jobs } from ".."; import { WorkerOptions } from "../types/options"; import { Connection } from "./connection"; import { RunPlugins } from "./pluginRunner"; -import { Queue } from "./queue"; +import { ParsedJob, Queue } from "./queue"; -function prepareJobs(jobs) { - return Object.keys(jobs).reduce(function (h, k) { +function prepareJobs(jobs: Jobs) { + return Object.keys(jobs).reduce((h: { [key: string]: any }, k) => { const job = jobs[k]; h[k] = typeof job === "function" ? { perform: job } : job; return h; @@ -19,7 +19,7 @@ export declare interface Worker { jobs: Jobs; started: boolean; name: string; - queues: Array; + queues: Array | string; queue: string; originalQueue: string | null; error: Error | null; @@ -30,7 +30,7 @@ export declare interface Worker { pollTimer: NodeJS.Timeout; endTimer: NodeJS.Timeout; pingTimer: NodeJS.Timeout; - job: Job; + job: ParsedJob; connection: Connection; queueObject: Queue; id: number; @@ -85,7 +85,7 @@ export declare interface Worker { cb: (error: Error, queue: string, job: Job | JobEmit) => void ): this; - removeAllListeners(event: WorkerEvent): this; + removeAllListeners(event: string): this; } export type WorkerEvent = @@ -102,22 +102,14 @@ export type WorkerEvent = | "pause"; export class Worker extends EventEmitter { - constructor(options, jobs = {}) { + constructor(options: WorkerOptions, jobs: Jobs = {}) { super(); - const defaults = { - name: os.hostname() + ":" + process.pid, // assumes only one worker per node process - queues: "*", - timeout: 5000, - looping: true, - id: 1, - }; - - for (const i in defaults) { - if (options[i] === undefined || options[i] === null) { - options[i] = defaults[i]; - } - } + options.name = options.name ?? os.hostname() + ":" + process.pid; // assumes only one worker per node process + options.id = options.id ?? 1; + options.queues = options.queues ?? "*"; + options.timeout = options.timeout ?? 5000; + options.looping = options.looping ?? true; this.options = options; this.jobs = prepareJobs(jobs); @@ -157,7 +149,7 @@ export class Worker extends EventEmitter { } } - private async init() { + async init() { await this.track(); await this.connection.redis.set( this.connection.key("worker", this.name, this.stringQueues(), "started"), @@ -167,7 +159,7 @@ export class Worker extends EventEmitter { this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout); } - async end() { + async end(): Promise { this.running = false; if (this.working === true) { @@ -196,7 +188,7 @@ export class Worker extends EventEmitter { this.emit("end", new Date()); } - private async poll(nQueue = 0) { + private async poll(nQueue = 0): Promise { if (!this.running) return; this.queue = this.queues[nQueue]; @@ -221,7 +213,7 @@ export class Worker extends EventEmitter { if (currentJob) { if (this.options.looping) { this.result = null; - return this.perform(currentJob); + await this.perform(currentJob); } else { return currentJob; } @@ -243,7 +235,7 @@ export class Worker extends EventEmitter { } } - private async perform(job) { + private async perform(job: ParsedJob) { this.job = job; this.error = null; let toRun; @@ -322,7 +314,7 @@ export class Worker extends EventEmitter { // #performInline is used to run a job payload directly. // If you are planning on running a job via #performInline, this worker should also not be started, nor should be using event emitters to monitor this worker. // This method will also not write to redis at all, including logging errors, modify resque's stats, etc. - async performInline(func, args = []) { + async performInline(func: string, args: any[] = []) { const q = "_direct-queue-" + this.name; let toRun; @@ -389,7 +381,7 @@ export class Worker extends EventEmitter { } } - private async succeed(job, duration: number) { + private async succeed(job: ParsedJob, duration: number) { await this.connection.redis.incr(this.connection.key("stat", "processed")); await this.connection.redis.incr( this.connection.key("stat", "processed", this.name) @@ -397,7 +389,7 @@ export class Worker extends EventEmitter { this.emit("success", this.queue, job, this.result, duration); } - private async fail(err, duration: number) { + private async fail(err: Error, duration: number) { await this.connection.redis.incr(this.connection.key("stat", "failed")); await this.connection.redis.incr( this.connection.key("stat", "failed", this.name) @@ -420,7 +412,7 @@ export class Worker extends EventEmitter { } private async getJob() { - let currentJob: { [key: string]: any } = null; + let currentJob: ParsedJob; const queueKey = this.connection.key("queue", this.queue); const workerKey = this.connection.key( "worker", @@ -430,7 +422,9 @@ export class Worker extends EventEmitter { let encodedJob: string; + //@ts-ignore if (this.connection.redis["popAndStoreJob"]) { + //@ts-ignore encodedJob = await this.connection.redis["popAndStoreJob"]( queueKey, workerKey, @@ -531,7 +525,7 @@ export class Worker extends EventEmitter { } } - private failurePayload(err, job) { + private failurePayload(err: Error, job: ParsedJob) { return { worker: this.name, queue: this.queue, @@ -548,7 +542,7 @@ export class Worker extends EventEmitter { return ["*"].join(","); } else { try { - return this.queues.join(","); + return Array.isArray(this.queues) ? this.queues.join(",") : this.queues; } catch (e) { return ""; } diff --git a/src/index.ts b/src/index.ts index a77f115d..e4db4bbb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,10 @@ export { Connection } from "./core/connection"; -export { Queue } from "./core/queue"; +export { + Queue, + ParsedJob, + ParsedWorkerPayload, + ParsedFailedJobPayload, +} from "./core/queue"; export { Scheduler } from "./core/scheduler"; export { Worker } from "./core/worker"; export { MultiWorker } from "./core/multiWorker"; diff --git a/src/plugins/DelayQueueLock.ts b/src/plugins/DelayQueueLock.ts index 300382c7..16fe2897 100644 --- a/src/plugins/DelayQueueLock.ts +++ b/src/plugins/DelayQueueLock.ts @@ -16,15 +16,15 @@ export class DelayQueueLock extends Plugin { } } - afterEnqueue() { + async afterEnqueue() { return true; } - beforePerform() { + async beforePerform() { return true; } - afterPerform() { + async afterPerform() { return true; } } diff --git a/src/plugins/JobLock.ts b/src/plugins/JobLock.ts index 27215cdc..a4ab5e71 100644 --- a/src/plugins/JobLock.ts +++ b/src/plugins/JobLock.ts @@ -2,11 +2,11 @@ import { Plugin } from ".."; export class JobLock extends Plugin { - beforeEnqueue() { + async beforeEnqueue() { return true; } - afterEnqueue() { + async afterEnqueue() { return true; } diff --git a/src/plugins/Noop.ts b/src/plugins/Noop.ts index 18977416..bb9c4913 100644 --- a/src/plugins/Noop.ts +++ b/src/plugins/Noop.ts @@ -1,7 +1,7 @@ import { Plugin } from ".."; export class Noop extends Plugin { - afterPerform() { + async afterPerform() { if (this.worker.error) { if (typeof this.options.logger === "function") { this.options.logger(this.worker.error); @@ -14,15 +14,15 @@ export class Noop extends Plugin { return true; } - beforeEnqueue() { + async beforeEnqueue() { return true; } - afterEnqueue() { + async afterEnqueue() { return true; } - beforePerform() { + async beforePerform() { return true; } } diff --git a/src/plugins/QueueLock.ts b/src/plugins/QueueLock.ts index 44c8dc8f..d19f2353 100644 --- a/src/plugins/QueueLock.ts +++ b/src/plugins/QueueLock.ts @@ -26,7 +26,7 @@ export class QueueLock extends Plugin { return true; } - afterEnqueue() { + async afterEnqueue() { return true; } @@ -36,7 +36,7 @@ export class QueueLock extends Plugin { return true; } - afterPerform() { + async afterPerform() { return true; } diff --git a/src/plugins/Retry.ts b/src/plugins/Retry.ts index b0b72d01..9c5eb5f7 100644 --- a/src/plugins/Retry.ts +++ b/src/plugins/Retry.ts @@ -2,10 +2,18 @@ // a port of some of the features in https://github.com/lantins/resque-retry import * as os from "os"; -import { Plugin } from ".."; - +import { Plugin, Worker, ParsedJob, Queue } from ".."; export class Retry extends Plugin { - constructor(worker, func, queue, job, args, options) { + constructor( + worker: Queue | Worker, + func: string, + queue: string, + job: ParsedJob, + args: Array, + options: { + [key: string]: any; + } + ) { super(worker, func, queue, job, args, options); if (!this.options.retryLimit) { @@ -19,15 +27,15 @@ export class Retry extends Plugin { } } - beforeEnqueue() { + async beforeEnqueue() { return true; } - afterEnqueue() { + async afterEnqueue() { return true; } - beforePerform() { + async beforePerform() { return true; } diff --git a/src/types/options.ts b/src/types/options.ts index 3a7e42d0..9c5eff29 100644 --- a/src/types/options.ts +++ b/src/types/options.ts @@ -6,26 +6,34 @@ export interface ConnectionOptions { host?: string; port?: number; database?: number; - namespace?: string; + namespace?: string | string[]; looping?: boolean; options?: any; redis?: IORedis.Redis | IORedis.Cluster; scanCount?: number; } +export interface QueueOptions extends ConnectionOptions { + connection?: ConnectionOptions; + queue?: string | string[]; +} + export interface WorkerOptions extends ConnectionOptions { name?: string; - queues?: Array; + queues?: Array | string; timeout?: number; looping?: boolean; + id?: number; + connection?: ConnectionOptions; } export interface SchedulerOptions extends ConnectionOptions { name?: string; timeout?: number; - leaderLockTimeout: number; - stuckWorkerTimeout: number; - retryStuckJobs: boolean; + leaderLockTimeout?: number; + stuckWorkerTimeout?: number; + retryStuckJobs?: boolean; + connection?: ConnectionOptions; } export interface MultiWorkerOptions extends ConnectionOptions { @@ -34,13 +42,13 @@ export interface MultiWorkerOptions extends ConnectionOptions { timeout?: number; maxEventLoopDelay?: number; checkTimeout?: number; - connection?: Connection; + connection?: ConnectionOptions; minTaskProcessors?: number; maxTaskProcessors?: number; } -export interface Job { +export interface Job { plugins?: string[]; pluginOptions?: { [pluginName: string]: any }; - perform: (...args: any[]) => Promise; + perform: (...args: any[]) => Promise; } diff --git a/src/utils/eventLoopDelay.ts b/src/utils/eventLoopDelay.ts index 1eed4d38..1f518485 100644 --- a/src/utils/eventLoopDelay.ts +++ b/src/utils/eventLoopDelay.ts @@ -1,6 +1,10 @@ // inspired by https://github.com/tj/node-blocked -export function EventLoopDelay(limit: number, interval: number, fn: Function) { +export function EventLoopDelay( + limit: number, + interval: number, + fn: (blocked: boolean, delay: number) => any +) { let start = process.hrtime(); const timeout = setInterval(() => { diff --git a/tsconfig.json b/tsconfig.json index 39e39573..3d46e05c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -3,9 +3,8 @@ "outDir": "./dist", "allowJs": true, "module": "commonjs", - "target": "es2018" + "target": "es2018", + "noImplicitAny": true }, - "include": [ - "./src/**/*" - ] + "include": ["./src/**/*"] }