diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:52:39 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:52:39 +0300 |
commit | bd712c4e8842f44dc546f587bd237c456a92e9d1 (patch) | |
tree | ea8e21d30a209cf55f62ec485a39c35bdd30f82c /src/lib | |
parent | 097338ec1f76f3afde9dfabab09a20005fa44e56 (diff) |
jobd: support pause()/continue()
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/data-validator.js | 4 | ||||
-rw-r--r-- | src/lib/server.js | 4 | ||||
-rw-r--r-- | src/lib/worker.js | 63 |
3 files changed, 63 insertions, 8 deletions
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js index 6d1d8f0..7419b34 100644 --- a/src/lib/data-validator.js +++ b/src/lib/data-validator.js @@ -69,7 +69,7 @@ function validateObjectSchema(data, schema) { } } -function validateTargetsList(targets) { +function validateTargetsListFormat(targets) { if (!Array.isArray(targets)) throw new Error('targets must be array') @@ -85,5 +85,5 @@ function validateTargetsList(targets) { module.exports = { validateObjectSchema, - validateTargetsList + validateTargetsListFormat }
\ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index 1d923a8..618ca8c 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -449,8 +449,8 @@ class Connection extends EventEmitter { } } } catch (error) { - this.logger.error('failed to parse data as JSON') - this.logger.debug(rawMessage) + this.logger.error('error while parsing message:', error, rawMessage.toString('utf-8')) + this.logger.trace(rawMessage) } } } diff --git a/src/lib/worker.js b/src/lib/worker.js index a271a31..b09c2f8 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object.<string, {slots: object.<string, Queue>}>} + * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>} */ this.targets = {} @@ -57,7 +57,10 @@ class Worker extends EventEmitter { this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) if (this.targets[target] === undefined) - this.targets[target] = {slots: {}} + this.targets[target] = { + slots: {}, + paused: false + } if (this.targets[target].slots[slot] !== undefined) throw new Error(`slot ${slot} for target ${target} has already been added`) @@ -74,6 +77,56 @@ class Worker extends EventEmitter { } /** + * Stop queues associated with specified targets. + * + * @param {null|string[]} targets + */ + pauseTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (target.paused) { + this.logger.warn(`pauseTargets: ${targetName} is already paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`) + target.slots[slotName].stop() + } + + target.paused = true + } + } + + /** + * Start queues associated with specified targets. + * + * @param {null|string[]} targets + */ + continueTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (!target.paused) { + this.logger.warn(`continueTargets: ${targetName} is not paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`) + target.slots[slotName].start() + } + + target.paused = false + } + } + + /** * Checks whether target is being served. * * @param {string} target @@ -121,7 +174,7 @@ class Worker extends EventEmitter { let targets = this.getPollTargets() if (!targets.length) { - this.poller.warn(`${LOGPREFIX} no targets`) + this.logger.warn(`${LOGPREFIX} no targets`) return } @@ -527,8 +580,10 @@ class Worker extends EventEmitter { onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const targetPaused = this.targets[target].paused const queue = this.targets[target].slots[slot] - if (queue.length < queue.concurrency && this.hasPollTarget(target)) { + + if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } |