diff options
-rwxr-xr-x | src/jobd-master.js | 69 | ||||
-rw-r--r-- | src/lib/workers-list.js | 55 |
2 files changed, 121 insertions, 3 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js index 8ba0288..5839b1a 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -104,6 +104,8 @@ function initRequestHandler() { requestHandler.set('register-worker', onRegisterWorker) requestHandler.set('status', onStatus) requestHandler.set('run-manual', onRunManual) + requestHandler.set('pause', onPause) + requestHandler.set('continue', onContinue) } /** @@ -221,6 +223,73 @@ async function onRunManual(data, requestNo, connection) { ) } +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onPause(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + workers.pauseTargets(targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onContinue(data, requestNo, connection) { + let targets + if ((targets = validateInputTargets(data, requestNo, connection)) === false) + return + + workers.continueTargets(targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + + +/** + * @private + * @param data + * @param requestNo + * @param connection + * @return {null|boolean|string[]} + */ +function validateInputTargets(data, requestNo, connection) { + // null means all targets + let targets = null + + if (data.targets !== undefined) { + targets = data.targets + + // validate data + try { + validateTargetsListFormat(targets) + + // note: we don't check target names here + // as in jobd + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return false + } + } + + return targets +} + function usage() { let s = `${process.argv[1]} OPTIONS diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 41b13e2..82e660d 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -67,9 +67,6 @@ class WorkersList { poke(targets) { this.logger.debug('poke:', targets) - if (!Array.isArray(targets)) - throw new Error('targets must be Array') - for (let t of targets) this.targetsToPoke[t] = true @@ -77,6 +74,20 @@ class WorkersList { } /** + * @param targets + * @return {object[]} + */ + getWorkersByTargets(targets) { + const found = [] + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + if (intrs.length > 0) + found.push(worker) + } + return found + } + + /** * @private */ _pokeWorkers = throttle(() => { @@ -328,6 +339,40 @@ class WorkersList { } /** + * @param {null|string[]} targets + */ + pauseTargets(targets) { + return this._pauseContinueWorkers('pause', targets) + } + + /** + * @param {null|string[]} targets + */ + continueTargets(targets) { + return this._pauseContinueWorkers('continue', targets) + } + + /** + * @param {string} action + * @param {null|string[]} targets + * @private + */ + _pauseContinueWorkers(action, targets) { + (targets === null ? this.workers : this.getWorkersByTargets(targets)) + .map(worker => { + this.logger.debug(`${action}Targets: sending ${action} request to ${worker.connection.remoteAddr()}`) + + let data = {} + if (targets !== null) + data.targets = intersection(worker.targets, targets) + + worker.connection.sendRequest( + new RequestMessage(action, data) + ).catch(this.onWorkerRequestError.bind(this, `${action}Targets`)) + }) + } + + /** * @private */ sendPings = () => { @@ -338,6 +383,10 @@ class WorkersList { }) } + onWorkerRequestError = (from, error) => { + this.logger.error(`${from}:`, error) + } + } module.exports = WorkersList
\ No newline at end of file |