aboutsummaryrefslogtreecommitdiff
path: root/src/jobd-master.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-16 01:06:14 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-16 01:06:14 +0300
commit23c16a2c80f0614d0b31cba363bca66e1a60687b (patch)
tree6d019e63b5bc4d703d4319f444927ccc7c93f208 /src/jobd-master.js
parentcbbe60df32bc591758dd98e624a9f3b3c53f1d23 (diff)
jobd: add add-target()/remove-target(); code refactoring
Diffstat (limited to 'src/jobd-master.js')
-rwxr-xr-xsrc/jobd-master.js214
1 files changed, 59 insertions, 155 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 9827126..eeac085 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
-const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
-const RequestHandler = require('./lib/request-handler')
+const {
+ validateObjectSchema,
+ validateInputTargetsListFormat,
+ validateInputTargets
+} = require('./lib/data-validator')
+const {RequestHandler} = require('./lib/request-handler')
const package_json = require('../package.json')
const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf"
@@ -112,136 +116,94 @@ function initRequestHandler() {
requestHandler.set('continue', onContinue)
}
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
+
+ console.log(s)
+}
+
+async function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ await loggerModule.shutdown()
+ process.exit()
+}
+
+
+
+/****************************************/
+/** **/
+/** Request handlers **/
+/** **/
+/****************************************/
+
/**
* @param {object} data
- * @param {number} requestNo
* @param {Connection} connection
*/
-function onRegisterWorker(data, requestNo, connection) {
- const targets = data.targets || []
-
- // validate data
- try {
- validateTargetsListFormat(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- // register worker and reply with OK
+async function onRegisterWorker(data, connection) {
+ const targets = validateInputTargets(data, null)
workers.add(connection, targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
*/
-function onPoke(data, requestNo, connection) {
- const targets = data.targets || []
-
- // validate data
- try {
- validateTargetsListFormat(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- // poke workers
+async function onPoke(data) {
+ const targets = validateInputTargets(data, null)
workers.poke(targets)
-
- // reply to user
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
* @return {Promise<*>}
*/
-async function onStatus(data, requestNo, connection) {
+async function onStatus(data) {
const info = await workers.getInfo(data.poll_workers || false)
-
- let status = {
+ return {
workers: info,
memoryUsage: process.memoryUsage()
}
-
- connection.send(
- new ResponseMessage(requestNo)
- .setData(status)
- )
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
* @return {Promise<*>}
*/
-async function onRunManual(data, requestNo, connection) {
+async function onRunManual(data) {
const {jobs} = data
- // validate data
- try {
- if (!Array.isArray(jobs))
- throw new Error('jobs must be array')
-
- for (let job of jobs) {
- validateObjectSchema(job, [
- // name // type // required
- ['id', 'i', true],
- ['target', 's', true],
- ])
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
+ // validate input
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
- // run jobs on workers
- const jobsData = await workers.runManual(jobs)
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['target', 's', true],
+ ])
+ }
- // send result to the client
- connection.send(
- new ResponseMessage(requestNo)
- .setData(jobsData)
- )
+ // run jobs, wait for results and send a response
+ return await workers.runManual(jobs)
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
*/
-function onPause(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+function onPause(data) {
+ const targets = validateInputTargets(data, null)
workers.pauseTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
@@ -250,66 +212,8 @@ function onPause(data, requestNo, connection) {
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+ const targets = validateInputTargets(data, null)
workers.continueTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
-}
-
-
-/**
- * @private
- * @param data
- * @param requestNo
- * @param connection
- * @return {null|boolean|string[]}
- */
-function validateInputTargets(data, requestNo, connection) {
- // null means all targets
- let targets = null
-
- if (data.targets !== undefined) {
- targets = data.targets
-
- // validate data
- try {
- validateTargetsListFormat(targets)
-
- // note: we don't check target names here
- // as in jobd
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return false
- }
- }
-
- return targets
+ return 'ok'
}
-
-function usage() {
- let s = `${process.argv[1]} OPTIONS
-
-Options:
- --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
- --help Show this help.
- --version Print version.`
-
- console.log(s)
-}
-
-async function term() {
- if (logger)
- logger.info('shutdown')
-
- await loggerModule.shutdown()
- process.exit()
-}