diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-05 03:01:06 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-05 03:01:06 +0300 |
commit | 613fd5fd3e7641732c54e21f771cbc2d6cc2651e (patch) | |
tree | 67df2f07c1daad45a7789acdf5a52a93606d5f15 | |
parent | a191c5c82c833c370d1e8af63e920d4d7ebf58e3 (diff) |
jobd: add set-target-concurrency request
-rw-r--r-- | README.md | 3 | ||||
-rwxr-xr-x | src/jobd.js | 30 | ||||
-rw-r--r-- | src/lib/worker.js | 11 |
3 files changed, 43 insertions, 1 deletions
@@ -67,6 +67,9 @@ for target is not enough for you, change it to fit your needs. * **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and `status` set to `manual`, and return results. +* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency + of target `target`. + ### jobd-master requests * **`register-worker(targets: string[])`** — used by a jobd instance to register diff --git a/src/jobd.js b/src/jobd.js index f8be71f..de8807f 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,7 +5,7 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateTargetsListFormat} = require('./lib/data-validator') +const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') const RequestHandler = require('./lib/request-handler') const { Server, @@ -136,6 +136,7 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('set-target-concurrency', onSetTargetConcurrency) } function initServer() { @@ -325,6 +326,33 @@ function onContinue(data, requestNo, connection) { } /** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onSetTargetConcurrency(data, requestNo, connection) { + try { + validateObjectSchema(data, [ + // name // type // required + ['concurrency', 'i', true], + ['target', 's', true], + ]) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + worker.setTargetConcurrency(data.target, data.concurrency) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** * @private * @param data * @param requestNo diff --git a/src/lib/worker.js b/src/lib/worker.js index beba7f9..e53d03f 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -73,6 +73,17 @@ class Worker extends EventEmitter { } /** + * @param {string} target + * @param {number} concurrency + */ + setTargetConcurrency(target, concurrency) { + if (!(target in this.targets)) + throw new Error(`target '${target}' not found`) + + this.targets[target].queue.concurrency = concurrency + } + + /** * Stop queues associated with specified targets. * * @param {null|string[]} targets |