summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-03 02:16:38 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-03 02:16:38 +0300
commit03cda643ad0e248902e18b1073240d15b0345d33 (patch)
tree1683eaadaa828463d8110b513b2f92222d5763a6
parentc497fd50e82cdc2928eff0bdd520db496374ba02 (diff)
jobd-master: support pause()/continue()
-rwxr-xr-xsrc/jobd-master.js69
-rw-r--r--src/lib/workers-list.js55
2 files changed, 121 insertions, 3 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 8ba0288..5839b1a 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -104,6 +104,8 @@ function initRequestHandler() {
requestHandler.set('register-worker', onRegisterWorker)
requestHandler.set('status', onStatus)
requestHandler.set('run-manual', onRunManual)
+ requestHandler.set('pause', onPause)
+ requestHandler.set('continue', onContinue)
}
/**
@@ -221,6 +223,73 @@ async function onRunManual(data, requestNo, connection) {
)
}
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onPause(data, requestNo, connection) {
+ let targets
+ if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ return
+
+ workers.pauseTargets(targets)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onContinue(data, requestNo, connection) {
+ let targets
+ if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ return
+
+ workers.continueTargets(targets)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+
+/**
+ * @private
+ * @param data
+ * @param requestNo
+ * @param connection
+ * @return {null|boolean|string[]}
+ */
+function validateInputTargets(data, requestNo, connection) {
+ // null means all targets
+ let targets = null
+
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ // validate data
+ try {
+ validateTargetsListFormat(targets)
+
+ // note: we don't check target names here
+ // as in jobd
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return false
+ }
+ }
+
+ return targets
+}
+
function usage() {
let s = `${process.argv[1]} OPTIONS
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index 41b13e2..82e660d 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -67,9 +67,6 @@ class WorkersList {
poke(targets) {
this.logger.debug('poke:', targets)
- if (!Array.isArray(targets))
- throw new Error('targets must be Array')
-
for (let t of targets)
this.targetsToPoke[t] = true
@@ -77,6 +74,20 @@ class WorkersList {
}
/**
+ * @param targets
+ * @return {object[]}
+ */
+ getWorkersByTargets(targets) {
+ const found = []
+ for (const worker of this.workers) {
+ const intrs = intersection(worker.targets, targets)
+ if (intrs.length > 0)
+ found.push(worker)
+ }
+ return found
+ }
+
+ /**
* @private
*/
_pokeWorkers = throttle(() => {
@@ -328,6 +339,40 @@ class WorkersList {
}
/**
+ * @param {null|string[]} targets
+ */
+ pauseTargets(targets) {
+ return this._pauseContinueWorkers('pause', targets)
+ }
+
+ /**
+ * @param {null|string[]} targets
+ */
+ continueTargets(targets) {
+ return this._pauseContinueWorkers('continue', targets)
+ }
+
+ /**
+ * @param {string} action
+ * @param {null|string[]} targets
+ * @private
+ */
+ _pauseContinueWorkers(action, targets) {
+ (targets === null ? this.workers : this.getWorkersByTargets(targets))
+ .map(worker => {
+ this.logger.debug(`${action}Targets: sending ${action} request to ${worker.connection.remoteAddr()}`)
+
+ let data = {}
+ if (targets !== null)
+ data.targets = intersection(worker.targets, targets)
+
+ worker.connection.sendRequest(
+ new RequestMessage(action, data)
+ ).catch(this.onWorkerRequestError.bind(this, `${action}Targets`))
+ })
+ }
+
+ /**
* @private
*/
sendPings = () => {
@@ -338,6 +383,10 @@ class WorkersList {
})
}
+ onWorkerRequestError = (from, error) => {
+ this.logger.error(`${from}:`, error)
+ }
+
}
module.exports = WorkersList \ No newline at end of file