diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:52:39 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:52:39 +0300 |
commit | bd712c4e8842f44dc546f587bd237c456a92e9d1 (patch) | |
tree | ea8e21d30a209cf55f62ec485a39c35bdd30f82c | |
parent | 097338ec1f76f3afde9dfabab09a20005fa44e56 (diff) |
jobd: support pause()/continue()
-rw-r--r-- | README.md | 6 | ||||
-rwxr-xr-x | src/jobd-master.js | 6 | ||||
-rwxr-xr-x | src/jobd.js | 104 | ||||
-rw-r--r-- | src/lib/data-validator.js | 4 | ||||
-rw-r--r-- | src/lib/server.js | 4 | ||||
-rw-r--r-- | src/lib/worker.js | 63 |
6 files changed, 152 insertions, 35 deletions
@@ -56,6 +56,12 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s. * **`poll(targets: string[])`** — get new tasks for specified `targets` from database. If `targets` argument is not specified, get tasks for all serving targets. +* **`pause(targets: string[])`** — pause execution of tasks of specified targets. + If `targets` argument is not specified, pauses all targets. + +* **`continue(targets: string[])`** — continue execution of tasks of specified targets. + If `targets` argument is not specified, continues all targets. + * **`status()`** — returns status of internal queues and memory usage. * **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and diff --git a/src/jobd-master.js b/src/jobd-master.js index f603b92..8ba0288 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -4,7 +4,7 @@ const loggerModule = require('./lib/logger') const config = require('./lib/config') const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') -const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator') +const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') const RequestHandler = require('./lib/request-handler') const package_json = require('../package.json') @@ -116,7 +116,7 @@ function onRegisterWorker(data, requestNo, connection) { // validate data try { - validateTargetsList(targets) + validateTargetsListFormat(targets) } catch (e) { connection.send( new ResponseMessage(requestNo) @@ -143,7 +143,7 @@ function onPoke(data, requestNo, connection) { // validate data try { - validateTargetsList(targets) + validateTargetsListFormat(targets) } catch (e) { connection.send( new ResponseMessage(requestNo) diff --git a/src/jobd.js b/src/jobd.js index 0b980b7..97a1f05 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,7 +5,7 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateTargetsList} = require('./lib/data-validator') +const {validateTargetsListFormat} = require('./lib/data-validator') const RequestHandler = require('./lib/request-handler') const { Server, @@ -135,6 +135,8 @@ function initRequestHandler() { requestHandler.set('poll', onPollRequest) requestHandler.set('status', onStatus) requestHandler.set('run-manual', onRunManual) + requestHandler.set('pause', onPause) + requestHandler.set('continue', onContinue) } function initServer() { @@ -163,28 +165,9 @@ async function initDatabase() { * @param {Connection} connection */ function onPollRequest(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsList(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - } + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return worker.setPollTargets(targets) worker.poll() @@ -303,6 +286,80 @@ async function onRunManual(data, requestNo, connection) { } } +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onPause(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + worker.pauseTargets(targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onContinue(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + // continue queues + worker.continueTargets(targets) + + // poll just in case + worker.poll() + + // ok + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @private + * @param data + * @param requestNo + * @param connection + * @return {null|boolean|string[]} + */ +function validateInputTargets(data, requestNo, connection) { + // null means all targets + let targets = null + + if (data.targets !== undefined) { + targets = data.targets + + // validate data + try { + validateTargetsListFormat(targets) + + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) + } + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return false + } + } + + return targets +} + function connectToMaster() { const port = config.get('master_port') const host = config.get('master_host') @@ -352,7 +409,6 @@ Options: console.log(s) } - function term() { if (logger) logger.info('shutdown') diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js index 6d1d8f0..7419b34 100644 --- a/src/lib/data-validator.js +++ b/src/lib/data-validator.js @@ -69,7 +69,7 @@ function validateObjectSchema(data, schema) { } } -function validateTargetsList(targets) { +function validateTargetsListFormat(targets) { if (!Array.isArray(targets)) throw new Error('targets must be array') @@ -85,5 +85,5 @@ function validateTargetsList(targets) { module.exports = { validateObjectSchema, - validateTargetsList + validateTargetsListFormat }
\ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index 1d923a8..618ca8c 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -449,8 +449,8 @@ class Connection extends EventEmitter { } } } catch (error) { - this.logger.error('failed to parse data as JSON') - this.logger.debug(rawMessage) + this.logger.error('error while parsing message:', error, rawMessage.toString('utf-8')) + this.logger.trace(rawMessage) } } } diff --git a/src/lib/worker.js b/src/lib/worker.js index a271a31..b09c2f8 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object.<string, {slots: object.<string, Queue>}>} + * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>} */ this.targets = {} @@ -57,7 +57,10 @@ class Worker extends EventEmitter { this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) if (this.targets[target] === undefined) - this.targets[target] = {slots: {}} + this.targets[target] = { + slots: {}, + paused: false + } if (this.targets[target].slots[slot] !== undefined) throw new Error(`slot ${slot} for target ${target} has already been added`) @@ -74,6 +77,56 @@ class Worker extends EventEmitter { } /** + * Stop queues associated with specified targets. + * + * @param {null|string[]} targets + */ + pauseTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (target.paused) { + this.logger.warn(`pauseTargets: ${targetName} is already paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`) + target.slots[slotName].stop() + } + + target.paused = true + } + } + + /** + * Start queues associated with specified targets. + * + * @param {null|string[]} targets + */ + continueTargets(targets) { + if (targets === null) + targets = this.getTargets() + + for (const targetName of targets) { + const target = this.targets[targetName] + if (!target.paused) { + this.logger.warn(`continueTargets: ${targetName} is not paused`) + continue + } + + for (const slotName in target.slots) { + this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`) + target.slots[slotName].start() + } + + target.paused = false + } + } + + /** * Checks whether target is being served. * * @param {string} target @@ -121,7 +174,7 @@ class Worker extends EventEmitter { let targets = this.getPollTargets() if (!targets.length) { - this.poller.warn(`${LOGPREFIX} no targets`) + this.logger.warn(`${LOGPREFIX} no targets`) return } @@ -527,8 +580,10 @@ class Worker extends EventEmitter { onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const targetPaused = this.targets[target].paused const queue = this.targets[target].slots[slot] - if (queue.length < queue.concurrency && this.hasPollTarget(target)) { + + if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } |