summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-03 01:52:39 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-03 01:52:39 +0300
commitbd712c4e8842f44dc546f587bd237c456a92e9d1 (patch)
treeea8e21d30a209cf55f62ec485a39c35bdd30f82c
parent097338ec1f76f3afde9dfabab09a20005fa44e56 (diff)
jobd: support pause()/continue()
-rw-r--r--README.md6
-rwxr-xr-xsrc/jobd-master.js6
-rwxr-xr-xsrc/jobd.js104
-rw-r--r--src/lib/data-validator.js4
-rw-r--r--src/lib/server.js4
-rw-r--r--src/lib/worker.js63
6 files changed, 152 insertions, 35 deletions
diff --git a/README.md b/README.md
index 5675c5f..f1f20dd 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,12 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s.
* **`poll(targets: string[])`** — get new tasks for specified `targets` from database.
If `targets` argument is not specified, get tasks for all serving targets.
+* **`pause(targets: string[])`** — pause execution of tasks of specified targets.
+ If `targets` argument is not specified, pauses all targets.
+
+* **`continue(targets: string[])`** — continue execution of tasks of specified targets.
+ If `targets` argument is not specified, continues all targets.
+
* **`status()`** — returns status of internal queues and memory usage.
* **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and
diff --git a/src/jobd-master.js b/src/jobd-master.js
index f603b92..8ba0288 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -4,7 +4,7 @@ const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
-const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator')
+const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const package_json = require('../package.json')
@@ -116,7 +116,7 @@ function onRegisterWorker(data, requestNo, connection) {
// validate data
try {
- validateTargetsList(targets)
+ validateTargetsListFormat(targets)
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
@@ -143,7 +143,7 @@ function onPoke(data, requestNo, connection) {
// validate data
try {
- validateTargetsList(targets)
+ validateTargetsListFormat(targets)
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
diff --git a/src/jobd.js b/src/jobd.js
index 0b980b7..97a1f05 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -5,7 +5,7 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
-const {validateTargetsList} = require('./lib/data-validator')
+const {validateTargetsListFormat} = require('./lib/data-validator')
const RequestHandler = require('./lib/request-handler')
const {
Server,
@@ -135,6 +135,8 @@ function initRequestHandler() {
requestHandler.set('poll', onPollRequest)
requestHandler.set('status', onStatus)
requestHandler.set('run-manual', onRunManual)
+ requestHandler.set('pause', onPause)
+ requestHandler.set('continue', onContinue)
}
function initServer() {
@@ -163,28 +165,9 @@ async function initDatabase() {
* @param {Connection} connection
*/
function onPollRequest(data, requestNo, connection) {
- // null means all targets
- let targets = null
-
- if (data.targets !== undefined) {
- targets = data.targets
-
- // validate data
- try {
- validateTargetsList(targets)
-
- for (const t of targets) {
- if (!worker.hasTarget(t))
- throw new Error(`invalid target '${t}'`)
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
- }
+ let targets
+ if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ return
worker.setPollTargets(targets)
worker.poll()
@@ -303,6 +286,80 @@ async function onRunManual(data, requestNo, connection) {
}
}
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onPause(data, requestNo, connection) {
+ let targets
+ if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ return
+
+ worker.pauseTargets(targets)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onContinue(data, requestNo, connection) {
+ let targets
+ if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ return
+
+ // continue queues
+ worker.continueTargets(targets)
+
+ // poll just in case
+ worker.poll()
+
+ // ok
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @private
+ * @param data
+ * @param requestNo
+ * @param connection
+ * @return {null|boolean|string[]}
+ */
+function validateInputTargets(data, requestNo, connection) {
+ // null means all targets
+ let targets = null
+
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ // validate data
+ try {
+ validateTargetsListFormat(targets)
+
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
+ }
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return false
+ }
+ }
+
+ return targets
+}
+
function connectToMaster() {
const port = config.get('master_port')
const host = config.get('master_host')
@@ -352,7 +409,6 @@ Options:
console.log(s)
}
-
function term() {
if (logger)
logger.info('shutdown')
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 6d1d8f0..7419b34 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -69,7 +69,7 @@ function validateObjectSchema(data, schema) {
}
}
-function validateTargetsList(targets) {
+function validateTargetsListFormat(targets) {
if (!Array.isArray(targets))
throw new Error('targets must be array')
@@ -85,5 +85,5 @@ function validateTargetsList(targets) {
module.exports = {
validateObjectSchema,
- validateTargetsList
+ validateTargetsListFormat
} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 1d923a8..618ca8c 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -449,8 +449,8 @@ class Connection extends EventEmitter {
}
}
} catch (error) {
- this.logger.error('failed to parse data as JSON')
- this.logger.debug(rawMessage)
+ this.logger.error('error while parsing message:', error, rawMessage.toString('utf-8'))
+ this.logger.trace(rawMessage)
}
}
}
diff --git a/src/lib/worker.js b/src/lib/worker.js
index a271a31..b09c2f8 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -26,7 +26,7 @@ class Worker extends EventEmitter {
super()
/**
- * @type {object.<string, {slots: object.<string, Queue>}>}
+ * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>}
*/
this.targets = {}
@@ -57,7 +57,10 @@ class Worker extends EventEmitter {
this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
if (this.targets[target] === undefined)
- this.targets[target] = {slots: {}}
+ this.targets[target] = {
+ slots: {},
+ paused: false
+ }
if (this.targets[target].slots[slot] !== undefined)
throw new Error(`slot ${slot} for target ${target} has already been added`)
@@ -74,6 +77,56 @@ class Worker extends EventEmitter {
}
/**
+ * Stop queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ pauseTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (target.paused) {
+ this.logger.warn(`pauseTargets: ${targetName} is already paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`)
+ target.slots[slotName].stop()
+ }
+
+ target.paused = true
+ }
+ }
+
+ /**
+ * Start queues associated with specified targets.
+ *
+ * @param {null|string[]} targets
+ */
+ continueTargets(targets) {
+ if (targets === null)
+ targets = this.getTargets()
+
+ for (const targetName of targets) {
+ const target = this.targets[targetName]
+ if (!target.paused) {
+ this.logger.warn(`continueTargets: ${targetName} is not paused`)
+ continue
+ }
+
+ for (const slotName in target.slots) {
+ this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`)
+ target.slots[slotName].start()
+ }
+
+ target.paused = false
+ }
+ }
+
+ /**
* Checks whether target is being served.
*
* @param {string} target
@@ -121,7 +174,7 @@ class Worker extends EventEmitter {
let targets = this.getPollTargets()
if (!targets.length) {
- this.poller.warn(`${LOGPREFIX} no targets`)
+ this.logger.warn(`${LOGPREFIX} no targets`)
return
}
@@ -527,8 +580,10 @@ class Worker extends EventEmitter {
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+ const targetPaused = this.targets[target].paused
const queue = this.targets[target].slots[slot]
- if (queue.length < queue.concurrency && this.hasPollTarget(target)) {
+
+ if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`)
this.poll()
}