diff options
Diffstat (limited to 'src/jobd-master.js')
-rwxr-xr-x | src/jobd-master.js | 254 |
1 files changed, 138 insertions, 116 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js index 8628764..f603b92 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -2,9 +2,10 @@ const minimist = require('minimist') const loggerModule = require('./lib/logger') const config = require('./lib/config') -const {Server, ResponseMessage, RequestMessage} = require('./lib/server') +const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator') +const RequestHandler = require('./lib/request-handler') const package_json = require('../package.json') /** @@ -22,6 +23,11 @@ let server */ let workers +/** + * @type {RequestHandler} + */ +let requestHandler + main().catch(e => { console.error(e) @@ -30,6 +36,13 @@ main().catch(e => { async function main() { + await initApp('jobd-master') + initWorkers() + initRequestHandler() + initServer() +} + +async function initApp(appName) { if (process.argv.length < 3) { usage() process.exit(0) @@ -66,140 +79,149 @@ async function main() { levelFile: config.get('log_level_file'), levelConsole: config.get('log_level_console'), }) - logger = loggerModule.getLogger('jobd-master') + logger = loggerModule.getLogger(appName) - workers = new WorkersList() + process.title = appName +} - // start server +function initServer() { server = new Server() server.on('new-connection', (connection) => { - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) }) server.start(config.get('port'), config.get('host')) - logger.info('server started') +} + +function initWorkers() { + workers = new WorkersList() +} + +function initRequestHandler() { + requestHandler = new RequestHandler() + requestHandler.set('poke', onPoke) + requestHandler.set('register-worker', onRegisterWorker) + requestHandler.set('status', onStatus) + requestHandler.set('run-manual', onRunManual) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onRegisterWorker(data, requestNo, connection) { + const targets = data.targets || [] + + // validate data + try { + validateTargetsList(targets) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + // register worker and reply with OK + workers.add(connection, targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) } /** - * @param {RequestMessage|ResponseMessage} message + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onPoke(data, requestNo, connection) { + const targets = data.targets || [] + + // validate data + try { + validateTargetsList(targets) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + // poke workers + workers.poke(targets) + + // reply to user + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo * @param {Connection} connection * @return {Promise<*>} */ -async function onRequestMessage(message, connection) { +async function onStatus(data, requestNo, connection) { + const info = await workers.getInfo(data.poll_workers || false) + + let status = { + workers: info, + memoryUsage: process.memoryUsage() + } + + connection.send( + new ResponseMessage(requestNo) + .setData(status) + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + * @return {Promise<*>} + */ +async function onRunManual(data, requestNo, connection) { + const {jobs} = data + + // validate data try { - logger.info('onMessage:', message) - - switch (message.requestType) { - case 'register-worker': { - const targets = message.requestData?.targets || [] - - // validate data - try { - validateTargetsList(targets) - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - workers.add(connection, targets) - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } - - case 'poke': { - const targets = message.requestData?.targets || [] - - // validate data - try { - validateTargetsList(targets) - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - // poke workers - workers.poke(targets) - - // reply to user - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } - - case 'status': - const info = await workers.getInfo(message.requestData?.poll_workers || false) - - let status = { - workers: info, - memoryUsage: process.memoryUsage() - } - - connection.send( - new ResponseMessage(message.requestNo) - .setData(status) - ) - - break - - case 'run-manual': - const jobs = message.requestData.jobs - - // validate data - try { - if (!Array.isArray(jobs)) - throw new Error('jobs must be array') - - for (let job of jobs) { - validateObjectSchema(job, [ - // name // type // required - ['id', 'i', true], - ['target', 's', true], - ]) - } - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - // run jobs on workers - const data = await workers.runManual(jobs) - - // send result to the client - connection.send( - new ResponseMessage(message.requestNo) - .setData(data) - ) - break - - default: - connection.send( - new ResponseMessage(message.requestNo) - .setError(`unknown request type: '${message.requestType}'`) - ) - break + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) } - } catch (error) { - logger.error(`error while handling message:`, message, error) + } catch (e) { connection.send( - new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + new ResponseMessage(requestNo) + .setError(e.message) ) + return } + + // run jobs on workers + const jobsData = await workers.runManual(jobs) + + // send result to the client + connection.send( + new ResponseMessage(requestNo) + .setData(jobsData) + ) } + function usage() { let s = `${process.argv[1]} OPTIONS |