diff options
Diffstat (limited to 'src/lib/worker.js')
-rw-r--r-- | src/lib/worker.js | 63 |
1 files changed, 59 insertions, 4 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js index a271a31..b09c2f8 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object.<string, {slots: object.<string, Queue>}>} + * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>} */ this.targets = {} @@ -57,7 +57,10 @@ class Worker extends EventEmitter { this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) if (this.targets[target] === undefined) - this.targets[target] = {slots: {}} + this.targets[target] = { + slots: {}, + paused: false + } if (this.targets[target].slots[slot] !== undefined) throw new Error(`slot ${slot} for target ${target} has already been added`) @@ -74,6 +77,56 @@ class Worker extends EventEmitter { } /** + * Stop queues associated with specified targets. + * + * @param {null|string[]} targets + */ + pauseTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (target.paused) { + this.logger.warn(`pauseTargets: ${targetName} is already paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`) + target.slots[slotName].stop() + } + + target.paused = true + } + } + + /** + * Start queues associated with specified targets. + * + * @param {null|string[]} targets + */ + continueTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (!target.paused) { + this.logger.warn(`continueTargets: ${targetName} is not paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`) + target.slots[slotName].start() + } + + target.paused = false + } + } + + /** * Checks whether target is being served. * * @param {string} target @@ -121,7 +174,7 @@ class Worker extends EventEmitter { let targets = this.getPollTargets() if (!targets.length) { - this.poller.warn(`${LOGPREFIX} no targets`) + this.logger.warn(`${LOGPREFIX} no targets`) return } @@ -527,8 +580,10 @@ class Worker extends EventEmitter { onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const targetPaused = this.targets[target].paused const queue = this.targets[target].slots[slot] - if (queue.length < queue.concurrency && this.hasPollTarget(target)) { + + if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } |