aboutsummaryrefslogtreecommitdiff
path: root/src/lib/worker.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/worker.js')
-rw-r--r--src/lib/worker.js63
1 files changed, 59 insertions, 4 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js
index a271a31..b09c2f8 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -26,7 +26,7 @@ class Worker extends EventEmitter {
super()
/**
- * @type {object.<string, {slots: object.<string, Queue>}>}
+ * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>}
*/
this.targets = {}
@@ -57,7 +57,10 @@ class Worker extends EventEmitter {
this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
if (this.targets[target] === undefined)
- this.targets[target] = {slots: {}}
+ this.targets[target] = {
+ slots: {},
+ paused: false
+ }
if (this.targets[target].slots[slot] !== undefined)
throw new Error(`slot ${slot} for target ${target} has already been added`)
@@ -74,6 +77,56 @@ class Worker extends EventEmitter {
}
/**
+ * Stop queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ pauseTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (target.paused) {
+ this.logger.warn(`pauseTargets: ${targetName} is already paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`)
+ target.slots[slotName].stop()
+ }
+
+ target.paused = true
+ }
+ }
+
+ /**
+ * Start queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ continueTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (!target.paused) {
+ this.logger.warn(`continueTargets: ${targetName} is not paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`)
+ target.slots[slotName].start()
+ }
+
+ target.paused = false
+ }
+ }
+
+ /**
* Checks whether target is being served.
*
* @param {string} target
@@ -121,7 +174,7 @@ class Worker extends EventEmitter {
let targets = this.getPollTargets()
if (!targets.length) {
- this.poller.warn(`${LOGPREFIX} no targets`)
+ this.logger.warn(`${LOGPREFIX} no targets`)
return
}
@@ -527,8 +580,10 @@ class Worker extends EventEmitter {
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+ const targetPaused = this.targets[target].paused
const queue = this.targets[target].slots[slot]
- if (queue.length < queue.concurrency && this.hasPollTarget(target)) {
+
+ if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`)
this.poll()
}