diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 02:16:38 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 02:16:38 +0300 |
commit | 03cda643ad0e248902e18b1073240d15b0345d33 (patch) | |
tree | 1683eaadaa828463d8110b513b2f92222d5763a6 /src/lib | |
parent | c497fd50e82cdc2928eff0bdd520db496374ba02 (diff) |
jobd-master: support pause()/continue()
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/workers-list.js | 55 |
1 files changed, 52 insertions, 3 deletions
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 41b13e2..82e660d 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -67,9 +67,6 @@ class WorkersList { poke(targets) { this.logger.debug('poke:', targets) - if (!Array.isArray(targets)) - throw new Error('targets must be Array') - for (let t of targets) this.targetsToPoke[t] = true @@ -77,6 +74,20 @@ class WorkersList { } /** + * @param targets + * @return {object[]} + */ + getWorkersByTargets(targets) { + const found = [] + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + if (intrs.length > 0) + found.push(worker) + } + return found + } + + /** * @private */ _pokeWorkers = throttle(() => { @@ -328,6 +339,40 @@ class WorkersList { } /** + * @param {null|string[]} targets + */ + pauseTargets(targets) { + return this._pauseContinueWorkers('pause', targets) + } + + /** + * @param {null|string[]} targets + */ + continueTargets(targets) { + return this._pauseContinueWorkers('continue', targets) + } + + /** + * @param {string} action + * @param {null|string[]} targets + * @private + */ + _pauseContinueWorkers(action, targets) { + (targets === null ? this.workers : this.getWorkersByTargets(targets)) + .map(worker => { + this.logger.debug(`${action}Targets: sending ${action} request to ${worker.connection.remoteAddr()}`) + + let data = {} + if (targets !== null) + data.targets = intersection(worker.targets, targets) + + worker.connection.sendRequest( + new RequestMessage(action, data) + ).catch(this.onWorkerRequestError.bind(this, `${action}Targets`)) + }) + } + + /** * @private */ sendPings = () => { @@ -338,6 +383,10 @@ class WorkersList { }) } + onWorkerRequestError = (from, error) => { + this.logger.error(`${from}:`, error) + } + } module.exports = WorkersList
\ No newline at end of file |