aboutsummaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/data-validator.js4
-rw-r--r--src/lib/server.js4
-rw-r--r--src/lib/worker.js63
3 files changed, 63 insertions, 8 deletions
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 6d1d8f0..7419b34 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -69,7 +69,7 @@ function validateObjectSchema(data, schema) {
}
}
-function validateTargetsList(targets) {
+function validateTargetsListFormat(targets) {
if (!Array.isArray(targets))
throw new Error('targets must be array')
@@ -85,5 +85,5 @@ function validateTargetsList(targets) {
module.exports = {
validateObjectSchema,
- validateTargetsList
+ validateTargetsListFormat
} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 1d923a8..618ca8c 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -449,8 +449,8 @@ class Connection extends EventEmitter {
}
}
} catch (error) {
- this.logger.error('failed to parse data as JSON')
- this.logger.debug(rawMessage)
+ this.logger.error('error while parsing message:', error, rawMessage.toString('utf-8'))
+ this.logger.trace(rawMessage)
}
}
}
diff --git a/src/lib/worker.js b/src/lib/worker.js
index a271a31..b09c2f8 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -26,7 +26,7 @@ class Worker extends EventEmitter {
super()
/**
- * @type {object.<string, {slots: object.<string, Queue>}>}
+ * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>}
*/
this.targets = {}
@@ -57,7 +57,10 @@ class Worker extends EventEmitter {
this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
if (this.targets[target] === undefined)
- this.targets[target] = {slots: {}}
+ this.targets[target] = {
+ slots: {},
+ paused: false
+ }
if (this.targets[target].slots[slot] !== undefined)
throw new Error(`slot ${slot} for target ${target} has already been added`)
@@ -74,6 +77,56 @@ class Worker extends EventEmitter {
}
/**
+ * Stop queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ pauseTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (target.paused) {
+ this.logger.warn(`pauseTargets: ${targetName} is already paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`)
+ target.slots[slotName].stop()
+ }
+
+ target.paused = true
+ }
+ }
+
+ /**
+ * Start queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ continueTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (!target.paused) {
+ this.logger.warn(`continueTargets: ${targetName} is not paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`)
+ target.slots[slotName].start()
+ }
+
+ target.paused = false
+ }
+ }
+
+ /**
* Checks whether target is being served.
*
* @param {string} target
@@ -121,7 +174,7 @@ class Worker extends EventEmitter {
let targets = this.getPollTargets()
if (!targets.length) {
- this.poller.warn(`${LOGPREFIX} no targets`)
+ this.logger.warn(`${LOGPREFIX} no targets`)
return
}
@@ -527,8 +580,10 @@ class Worker extends EventEmitter {
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+ const targetPaused = this.targets[target].paused
const queue = this.targets[target].slots[slot]
- if (queue.length < queue.concurrency && this.hasPollTarget(target)) {
+
+ if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`)
this.poll()
}