From 23c16a2c80f0614d0b31cba363bca66e1a60687b Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 16 Mar 2021 01:06:14 +0300 Subject: jobd: add add-target()/remove-target(); code refactoring --- src/jobd-master.js | 214 +++++++++++++++-------------------------------------- 1 file changed, 59 insertions(+), 155 deletions(-) (limited to 'src/jobd-master.js') diff --git a/src/jobd-master.js b/src/jobd-master.js index 9827126..eeac085 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger') const config = require('./lib/config') const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateObjectSchema, + validateInputTargetsListFormat, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const package_json = require('../package.json') const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf" @@ -112,136 +116,94 @@ function initRequestHandler() { requestHandler.set('continue', onContinue) } +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` + + console.log(s) +} + +async function term() { + if (logger) + logger.info('shutdown') + + await loggerModule.shutdown() + process.exit() +} + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + /** * @param {object} data - * @param {number} requestNo * @param {Connection} connection */ -function onRegisterWorker(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // register worker and reply with OK +async function onRegisterWorker(data, connection) { + const targets = validateInputTargets(data, null) workers.add(connection, targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection */ -function onPoke(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // poke workers +async function onPoke(data) { + const targets = validateInputTargets(data, null) workers.poke(targets) - - // reply to user - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection * @return {Promise<*>} */ -async function onStatus(data, requestNo, connection) { +async function onStatus(data) { const info = await workers.getInfo(data.poll_workers || false) - - let status = { + return { workers: info, memoryUsage: process.memoryUsage() } - - connection.send( - new ResponseMessage(requestNo) - .setData(status) - ) } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection * @return {Promise<*>} */ -async function onRunManual(data, requestNo, connection) { +async function onRunManual(data) { const {jobs} = data - // validate data - try { - if (!Array.isArray(jobs)) - throw new Error('jobs must be array') - - for (let job of jobs) { - validateObjectSchema(job, [ - // name // type // required - ['id', 'i', true], - ['target', 's', true], - ]) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } + // validate input + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') - // run jobs on workers - const jobsData = await workers.runManual(jobs) + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) + } - // send result to the client - connection.send( - new ResponseMessage(requestNo) - .setData(jobsData) - ) + // run jobs, wait for results and send a response + return await workers.runManual(jobs) } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - +function onPause(data) { + const targets = validateInputTargets(data, null) workers.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** @@ -250,66 +212,8 @@ function onPause(data, requestNo, connection) { * @param {Connection} connection */ function onContinue(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - + const targets = validateInputTargets(data, null) workers.continueTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - - -/** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} - */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - // note: we don't check target names here - // as in jobd - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets + return 'ok' } - -function usage() { - let s = `${process.argv[1]} OPTIONS - -Options: - --config Path to config. Default: ${DEFAULT_CONFIG_PATH} - --help Show this help. - --version Print version.` - - console.log(s) -} - -async function term() { - if (logger) - logger.info('shutdown') - - await loggerModule.shutdown() - process.exit() -} -- cgit v1.2.3