aboutsummaryrefslogtreecommitdiff
path: root/src/jobd.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-xsrc/jobd.js319
1 files changed, 129 insertions, 190 deletions
diff --git a/src/jobd.js b/src/jobd.js
index e1331d1..bb912fc 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -5,8 +5,11 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
-const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
-const RequestHandler = require('./lib/request-handler')
+const {
+ validateInputTargetAndConcurrency,
+ validateInputTargets
+} = require('./lib/data-validator')
+const {RequestHandler} = require('./lib/request-handler')
const {
Server,
Connection,
@@ -139,6 +142,8 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
+ requestHandler.set('add-target', onAddTarget)
+ requestHandler.set('remove-target', onRemoveTarget)
requestHandler.set('set-target-concurrency', onSetTargetConcurrency)
}
@@ -162,69 +167,119 @@ async function initDatabase() {
logger.info('db initialized')
}
+function connectToMaster() {
+ const port = config.get('master_port')
+ const host = config.get('master_host')
+
+ if (!host || !port) {
+ logger.debug('connectToMaster: master host or port is not defined')
+ return
+ }
+
+ async function connect() {
+ const connection = new Connection()
+ await connection.connect(host, port)
+
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage('register-worker', {
+ targets: worker.getTargets()
+ })
+ )
+ logger.debug('connectToMaster: response:', response)
+ } catch (error) {
+ logger.error('connectToMaster: error while awaiting response:', error)
+ }
+
+ connection.on('close', () => {
+ logger.warn(`connectToMaster: connection closed`)
+ tryToConnect()
+ })
+
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
+ }
+
+ function tryToConnect(now = false) {
+ setTimeout(() => {
+ connect().catch(error => {
+ logger.warn(`connectToMaster: connection failed`, error)
+ })
+ }, now ? 0 : config.get('master_reconnect_timeout') * 1000)
+ }
+
+ tryToConnect(true)
+}
+
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
+
+ console.log(s)
+}
+
+async function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ await loggerModule.shutdown()
+ process.exit()
+}
+
+
+
+/****************************************/
+/** **/
+/** Request handlers **/
+/** **/
+/****************************************/
+
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @return {Promise<string>}
*/
-function onPollRequest(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
+async function onPollRequest(data) {
+ let targets = validateInputTargets(data, worker)
worker.setPollTargets(targets)
worker.poll()
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @return {Promise<object>}
*/
-function onStatus(data, requestNo, connection) {
- connection.send(
- new ResponseMessage(requestNo)
- .setData({
- targets: worker.getStatus(),
- jobPromisesCount: Object.keys(jobPromises).length,
- memoryUsage: process.memoryUsage()
- })
- )
+async function onStatus(data) {
+ return {
+ targets: worker.getStatus(),
+ jobPromisesCount: Object.keys(jobPromises).length,
+ memoryUsage: process.memoryUsage()
+ }
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
- * @return {Promise<void>}
+ * @param {{ids: number[]}} data
+ * @return {Promise}
*/
-async function onRunManual(data, requestNo, connection) {
+async function onRunManual(data) {
let {ids: jobIds} = data
jobIds = uniq(jobIds)
- // if at least one of the jobs is already being run, reject
- // if at least one item is not a number, reject
for (const id of jobIds) {
- if (typeof id !== 'number') {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(`all ids must be numbers, got ${typeof id}`)
- )
- return
- }
+ // if at least one item is not a number, reject
+ if (typeof id !== 'number')
+ throw new Error(`all ids must be numbers, got ${typeof id}`)
- if (id in jobPromises) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(`another client is already waiting for job ${id}`)
- )
- return
- }
+ // if at least one of the jobs is already being run, reject
+ if (id in jobPromises)
+ throw new Error(`another client is already waiting for job ${id}`)
}
// create a bunch of promises, one per job
@@ -239,7 +294,7 @@ async function onRunManual(data, requestNo, connection) {
const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
// wait till all jobs are done (or failed), then send a response
- Promise.allSettled(promises).then(results => {
+ const P = Promise.allSettled(promises).then(results => {
const response = {}
for (let i = 0; i < results.length; i++) {
@@ -262,10 +317,7 @@ async function onRunManual(data, requestNo, connection) {
}
}
- connection.send(
- new ResponseMessage(requestNo)
- .setData(response)
- )
+ return response
})
// reject all ignored / non-found jobs
@@ -286,33 +338,25 @@ async function onRunManual(data, requestNo, connection) {
P.reject(new Error(`job ${id} not found`))
}
}
+
+ return P
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{targets: string[]}} data
*/
-function onPause(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+async function onPause(data) {
+ let targets = validateInputTargets(data, worker)
worker.pauseTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{targets: string[]}} data
*/
-function onContinue(data, requestNo, connection) {
+async function onContinue(data) {
let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ if ((targets = validateInputTargets(data, worker)) === false)
return
// continue queues
@@ -321,137 +365,32 @@ function onContinue(data, requestNo, connection) {
// poll just in case
worker.poll()
- // ok
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{target: string, concurrency: int}} data
*/
-function onSetTargetConcurrency(data, requestNo, connection) {
- try {
- validateObjectSchema(data, [
- // name // type // required
- ['concurrency', 'i', true],
- ['target', 's', true],
- ])
-
- if (data.concurrency <= 0)
- throw new Error('Invalid concurrency value.')
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- worker.setTargetConcurrency(data.target, data.concurrency)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+async function onAddTarget(data) {
+ validateInputTargetAndConcurrency(data)
+ worker.addTarget(data.target, data.concurrency)
+ return 'ok'
}
/**
- * @private
- * @param data
- * @param requestNo
- * @param connection
- * @return {null|boolean|string[]}
+ * @param {{target: string}} data
*/
-function validateInputTargets(data, requestNo, connection) {
- // null means all targets
- let targets = null
-
- if (data.targets !== undefined) {
- targets = data.targets
-
- // validate data
- try {
- validateTargetsListFormat(targets)
-
- for (const t of targets) {
- if (!worker.hasTarget(t))
- throw new Error(`invalid target '${t}'`)
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return false
- }
- }
-
- return targets
+async function onRemoveTarget(data) {
+ validateInputTargetAndConcurrency(data, true)
+ worker.removeTarget(data.target)
+ return 'ok'
}
-function connectToMaster() {
- const port = config.get('master_port')
- const host = config.get('master_host')
-
- if (!host || !port) {
- logger.debug('connectToMaster: master host or port is not defined')
- return
- }
-
- async function connect() {
- const connection = new Connection()
- await connection.connect(host, port)
-
- try {
- let response = await connection.sendRequest(
- new RequestMessage('register-worker', {
- targets: worker.getTargets()
- })
- )
- logger.debug('connectToMaster: response:', response)
- } catch (error) {
- logger.error('connectToMaster: error while awaiting response:', error)
- }
-
- connection.on('close', () => {
- logger.warn(`connectToMaster: connection closed`)
- tryToConnect()
- })
-
- connection.on('request-message', (message, connection) => {
- requestHandler.process(message, connection)
- })
- }
-
- function tryToConnect(now = false) {
- setTimeout(() => {
- connect().catch(error => {
- logger.warn(`connectToMaster: connection failed`, error)
- })
- }, now ? 0 : config.get('master_reconnect_timeout') * 1000)
- }
-
- tryToConnect(true)
-}
-
-function usage() {
- let s = `${process.argv[1]} OPTIONS
-
-Options:
- --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
- --help Show this help.
- --version Print version.`
-
- console.log(s)
-}
-
-async function term() {
- if (logger)
- logger.info('shutdown')
-
- await loggerModule.shutdown()
- process.exit()
-}
+/**
+ * @param {object} data
+ */
+async function onSetTargetConcurrency(data) {
+ validateInputTargetAndConcurrency(data)
+ worker.setTargetConcurrency(data.target, data.concurrency)
+ return 'ok'
+} \ No newline at end of file