From 613fd5fd3e7641732c54e21f771cbc2d6cc2651e Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Fri, 5 Mar 2021 03:01:06 +0300 Subject: jobd: add set-target-concurrency request --- README.md | 3 +++ src/jobd.js | 30 +++++++++++++++++++++++++++++- src/lib/worker.js | 11 +++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f26ac4e..66e6286 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,9 @@ for target is not enough for you, change it to fit your needs. * **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and `status` set to `manual`, and return results. +* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency + of target `target`. + ### jobd-master requests * **`register-worker(targets: string[])`** — used by a jobd instance to register diff --git a/src/jobd.js b/src/jobd.js index f8be71f..de8807f 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,7 +5,7 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateTargetsListFormat} = require('./lib/data-validator') +const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') const RequestHandler = require('./lib/request-handler') const { Server, @@ -136,6 +136,7 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('set-target-concurrency', onSetTargetConcurrency) } function initServer() { @@ -324,6 +325,33 @@ function onContinue(data, requestNo, connection) { ) } +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onSetTargetConcurrency(data, requestNo, connection) { + try { + validateObjectSchema(data, [ + // name // type // required + ['concurrency', 'i', true], + ['target', 's', true], + ]) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + worker.setTargetConcurrency(data.target, data.concurrency) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + /** * @private * @param data diff --git a/src/lib/worker.js b/src/lib/worker.js index beba7f9..e53d03f 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -72,6 +72,17 @@ class Worker extends EventEmitter { } } + /** + * @param {string} target + * @param {number} concurrency + */ + setTargetConcurrency(target, concurrency) { + if (!(target in this.targets)) + throw new Error(`target '${target}' not found`) + + this.targets[target].queue.concurrency = concurrency + } + /** * Stop queues associated with specified targets. * -- cgit v1.2.3