From 03cda643ad0e248902e18b1073240d15b0345d33 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 3 Mar 2021 02:16:38 +0300 Subject: jobd-master: support pause()/continue() --- src/jobd-master.js | 69 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib/workers-list.js | 55 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 121 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/jobd-master.js b/src/jobd-master.js index 8ba0288..5839b1a 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -104,6 +104,8 @@ function initRequestHandler() { requestHandler.set('register-worker', onRegisterWorker) requestHandler.set('status', onStatus) requestHandler.set('run-manual', onRunManual) + requestHandler.set('pause', onPause) + requestHandler.set('continue', onContinue) } /** @@ -221,6 +223,73 @@ async function onRunManual(data, requestNo, connection) { ) } +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onPause(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + workers.pauseTargets(targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onContinue(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + workers.continueTargets(targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + + +/** + * @private + * @param data + * @param requestNo + * @param connection + * @return {null|boolean|string[]} + */ +function validateInputTargets(data, requestNo, connection) { + // null means all targets + let targets = null + + if (data.targets !== undefined) { + targets = data.targets + + // validate data + try { + validateTargetsListFormat(targets) + + // note: we don't check target names here + // as in jobd + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return false + } + } + + return targets +} + function usage() { let s = `${process.argv[1]} OPTIONS diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 41b13e2..82e660d 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -67,15 +67,26 @@ class WorkersList { poke(targets) { this.logger.debug('poke:', targets) - if (!Array.isArray(targets)) - throw new Error('targets must be Array') - for (let t of targets) this.targetsToPoke[t] = true this._pokeWorkers() } + /** + * @param targets + * @return {object[]} + */ + getWorkersByTargets(targets) { + const found = [] + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + if (intrs.length > 0) + found.push(worker) + } + return found + } + /** * @private */ @@ -327,6 +338,40 @@ class WorkersList { return response } + /** + * @param {null|string[]} targets + */ + pauseTargets(targets) { + return this._pauseContinueWorkers('pause', targets) + } + + /** + * @param {null|string[]} targets + */ + continueTargets(targets) { + return this._pauseContinueWorkers('continue', targets) + } + + /** + * @param {string} action + * @param {null|string[]} targets + * @private + */ + _pauseContinueWorkers(action, targets) { + (targets === null ? this.workers : this.getWorkersByTargets(targets)) + .map(worker => { + this.logger.debug(`${action}Targets: sending ${action} request to ${worker.connection.remoteAddr()}`) + + let data = {} + if (targets !== null) + data.targets = intersection(worker.targets, targets) + + worker.connection.sendRequest( + new RequestMessage(action, data) + ).catch(this.onWorkerRequestError.bind(this, `${action}Targets`)) + }) + } + /** * @private */ @@ -338,6 +383,10 @@ class WorkersList { }) } + onWorkerRequestError = (from, error) => { + this.logger.error(`${from}:`, error) + } + } module.exports = WorkersList \ No newline at end of file -- cgit v1.2.3