diff --git a/readme.md b/readme.md index 3bc8d09..6158214 100644 --- a/readme.md +++ b/readme.md @@ -112,6 +112,8 @@ Minimum: `1` The max number of runs in the given interval of time. +Can be changed later by setting `queue.intervalCap`. If changed in the middle of a running interval, the new cap applies immediately. If the new cap is lower than the used cap, the "debt" is not carried over into the next interval. + ##### interval Type: `number`\ diff --git a/source/index.ts b/source/index.ts index 266be21..87b95ab 100644 --- a/source/index.ts +++ b/source/index.ts @@ -16,11 +16,9 @@ Promise queue with concurrency control. export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter { // eslint-disable-line @typescript-eslint/naming-convention readonly #carryoverConcurrencyCount: boolean; - readonly #isIntervalIgnored: boolean; - #intervalCount = 0; - readonly #intervalCap: number; + #intervalCap: number; #rateLimitedInInterval = false; #rateLimitFlushScheduled = false; @@ -58,6 +56,12 @@ export default class PQueue= 1)) { + throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${value?.toString() ?? ''}\` (${typeof value})`); + } + } + constructor(options?: Options) { super(); @@ -72,16 +76,13 @@ export default class PQueue; - if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) { - throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap?.toString() ?? ''}\` (${typeof options.intervalCap})`); - } + PQueue.#assertValidIntervalCap(options.intervalCap); if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) { throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval?.toString() ?? ''}\` (${typeof options.interval})`); } this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!; - this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0; this.#intervalCap = options.intervalCap; this.#interval = options.interval; this.#queue = new options.queueClass!(); @@ -98,6 +99,25 @@ export default class PQueue, QueueOpt /** The max number of runs in the given interval of time. + Can be changed later by setting `queue.intervalCap`. If changed in the middle of a running interval, the new cap applies immediately. If the new cap is lower than the used cap, the "debt" is not carried over into the next interval. + Minimum: `1`. @default Infinity diff --git a/test-d/index.test-d.ts b/test-d/index.test-d.ts index 78da841..9ff52f2 100644 --- a/test-d/index.test-d.ts +++ b/test-d/index.test-d.ts @@ -4,3 +4,6 @@ import PQueue from '../source/index.js'; const queue = new PQueue(); expectType>(queue.add(async () => '🦄')); + +expectType(queue.intervalCap); +queue.intervalCap = 5; diff --git a/test/basic.ts b/test/basic.ts index a439fc2..61bd6ae 100644 --- a/test/basic.ts +++ b/test/basic.ts @@ -1095,3 +1095,307 @@ test('pause should work when throttled', async () => { await delay(2500); }); + +test('.intervalCap - changed while not running', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 1, + interval: 500, + }); + + assert.equal(queue.intervalCap, 1); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + + await delay(300); + + assert.deepEqual(result, [1]); + + await delay(300); + + assert.deepEqual(result, [1, 2]); + + await delay(600); + + queue.intervalCap = 2; + assert.equal(queue.intervalCap, 2); + + queue.add(async () => { + result.push(3); + }); + queue.add(async () => { + result.push(4); + }); + + await delay(300); + + assert.deepEqual(result, [1, 2, 3, 4]); +}); + +test('.intervalCap - changed while running, larger than full cap', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 1, + interval: 500, + }); + + assert.equal(queue.intervalCap, 1); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + + queue.intervalCap = 2; + assert.equal(queue.intervalCap, 2); + + await delay(300); + + assert.deepEqual(result, [1, 2]); +}); + +test('.intervalCap - changed while running, larger than not full cap', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 3, + interval: 500, + }); + assert.equal(queue.intervalCap, 3); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + + await delay(200); + + assert.deepEqual(result, [1, 2]); + + queue.intervalCap = 4; + assert.equal(queue.intervalCap, 4); + + queue.add(async () => { + result.push(3); + }); + + await delay(200); + + assert.deepEqual(result, [1, 2, 3]); +}); + +test('.intervalCap - changed while running, smaller than full cap', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 3, + interval: 500, + }); + + assert.equal(queue.intervalCap, 3); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + await delay(300); + result.push(2); + }); + queue.add(async () => { + result.push(3); + }); + + await delay(100); + + assert.deepEqual(result, [1, 3]); + + queue.intervalCap = 2; + assert.equal(queue.intervalCap, 2); + + queue.add(async () => { + result.push(4); + }); + + await delay(200); + + assert.deepEqual(result, [1, 3, 2]); + + await delay(300); + + assert.deepEqual(result, [1, 3, 2, 4]); +}); + +test('.intervalCap - changed while running, smaller than not full cap and can run more', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 4, + interval: 500, + }); + + assert.equal(queue.intervalCap, 4); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + await delay(300); + result.push(2); + }); + + await delay(100); + + assert.deepEqual(result, [1]); + + queue.intervalCap = 3; + assert.equal(queue.intervalCap, 3); + + queue.add(async () => { + result.push(3); + }); + + await delay(200); + assert.deepEqual(result, [1, 3, 2]); +}); + +test('.intervalCap - changed while running, smaller than not full cap and cannot run more', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 4, + interval: 500, + }); + + assert.equal(queue.intervalCap, 4); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + await delay(200); + result.push(2); + }); + + await delay(100); + + assert.deepEqual(result, [1]); + + queue.intervalCap = 2; + assert.equal(queue.intervalCap, 2); + + queue.add(async () => { + result.push(3); + }); + + await delay(200); + assert.deepEqual(result, [1, 2]); + + await delay(300); + + assert.deepEqual(result, [1, 2, 3]); +}); + +test('.intervalCap - changed while running, larger than full cap and should run multiple more', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 2, + interval: 500, + }); + + assert.equal(queue.intervalCap, 2); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + queue.add(async () => { + await delay(200); + result.push(3); + }); + queue.add(async () => { + await delay(200); + result.push(4); + }); + + await delay(100); + + assert.deepEqual(result, [1, 2]); + + queue.intervalCap = 4; + assert.equal(queue.intervalCap, 4); + + await delay(300); + + assert.deepEqual(result, [1, 2, 3, 4]); +}); + +test('.intervalCap - removed while not running', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 1, + interval: 500, + }); + + assert.equal(queue.intervalCap, 1); + + queue.add(async () => { + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + + await delay(100); + + assert.deepEqual(result, [1]); + + queue.intervalCap = Number.POSITIVE_INFINITY; + assert.equal(queue.intervalCap, Number.POSITIVE_INFINITY); + + await delay(100); + + assert.deepEqual(result, [1, 2]); +}); + +test('.intervalCap - removed while running', async () => { + const result: number[] = []; + + const queue = new PQueue({ + intervalCap: 1, + interval: 500, + }); + + assert.equal(queue.intervalCap, 1); + + queue.add(async () => { + await delay(200); + result.push(1); + }); + queue.add(async () => { + result.push(2); + }); + + await delay(100); + + queue.intervalCap = Number.POSITIVE_INFINITY; + assert.equal(queue.intervalCap, Number.POSITIVE_INFINITY); + + await delay(200); + + assert.deepEqual(result, [2, 1]); +});