summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md3
-rwxr-xr-xsrc/jobd.js30
-rw-r--r--src/lib/worker.js11
3 files changed, 43 insertions, 1 deletions
diff --git a/README.md b/README.md
index f26ac4e..66e6286 100644
--- a/README.md
+++ b/README.md
@@ -67,6 +67,9 @@ for target is not enough for you, change it to fit your needs.
* **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and
`status` set to `manual`, and return results.
+* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency
+ of target `target`.
+
### jobd-master requests
* **`register-worker(targets: string[])`** — used by a jobd instance to register
diff --git a/src/jobd.js b/src/jobd.js
index f8be71f..de8807f 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -5,7 +5,7 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
-const {validateTargetsListFormat} = require('./lib/data-validator')
+const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const {
Server,
@@ -136,6 +136,7 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
+ requestHandler.set('set-target-concurrency', onSetTargetConcurrency)
}
function initServer() {
@@ -325,6 +326,33 @@ function onContinue(data, requestNo, connection) {
}
/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onSetTargetConcurrency(data, requestNo, connection) {
+ try {
+ validateObjectSchema(data, [
+ // name // type // required
+ ['concurrency', 'i', true],
+ ['target', 's', true],
+ ])
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ worker.setTargetConcurrency(data.target, data.concurrency)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
* @private
* @param data
* @param requestNo
diff --git a/src/lib/worker.js b/src/lib/worker.js
index beba7f9..e53d03f 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -73,6 +73,17 @@ class Worker extends EventEmitter {
}
/**
+ * @param {string} target
+ * @param {number} concurrency
+ */
+ setTargetConcurrency(target, concurrency) {
+ if (!(target in this.targets))
+ throw new Error(`target '${target}' not found`)
+
+ this.targets[target].queue.concurrency = concurrency
+ }
+
+ /**
* Stop queues associated with specified targets.
*
* @param {null|string[]} targets