diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:05:25 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-03 01:05:25 +0300 |
commit | 4cc2e3029fedca4f48f4f112282a7e381b6292e4 (patch) | |
tree | 27f18feaabb6fd649e04f2a36898166db0e5f91a /src | |
parent | 04b28698f2267f324354abb44c4508ac2c5c1332 (diff) |
refactor code and set process.title
Diffstat (limited to 'src')
-rwxr-xr-x | src/jobd-master.js | 254 | ||||
-rwxr-xr-x | src/jobd.js | 354 | ||||
-rw-r--r-- | src/lib/request-handler.js | 59 | ||||
-rw-r--r-- | src/lib/server.js | 4 |
4 files changed, 384 insertions, 287 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js index 8628764..f603b92 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -2,9 +2,10 @@ const minimist = require('minimist') const loggerModule = require('./lib/logger') const config = require('./lib/config') -const {Server, ResponseMessage, RequestMessage} = require('./lib/server') +const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator') +const RequestHandler = require('./lib/request-handler') const package_json = require('../package.json') /** @@ -22,6 +23,11 @@ let server */ let workers +/** + * @type {RequestHandler} + */ +let requestHandler + main().catch(e => { console.error(e) @@ -30,6 +36,13 @@ main().catch(e => { async function main() { + await initApp('jobd-master') + initWorkers() + initRequestHandler() + initServer() +} + +async function initApp(appName) { if (process.argv.length < 3) { usage() process.exit(0) @@ -66,140 +79,149 @@ async function main() { levelFile: config.get('log_level_file'), levelConsole: config.get('log_level_console'), }) - logger = loggerModule.getLogger('jobd-master') + logger = loggerModule.getLogger(appName) - workers = new WorkersList() + process.title = appName +} - // start server +function initServer() { server = new Server() server.on('new-connection', (connection) => { - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) }) server.start(config.get('port'), config.get('host')) - logger.info('server started') +} + +function initWorkers() { + workers = new WorkersList() +} + +function initRequestHandler() { + requestHandler = new RequestHandler() + requestHandler.set('poke', onPoke) + requestHandler.set('register-worker', onRegisterWorker) + requestHandler.set('status', onStatus) + requestHandler.set('run-manual', onRunManual) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onRegisterWorker(data, requestNo, connection) { + const targets = data.targets || [] + + // validate data + try { + validateTargetsList(targets) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + // register worker and reply with OK + workers.add(connection, targets) + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) } /** - * @param {RequestMessage|ResponseMessage} message + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onPoke(data, requestNo, connection) { + const targets = data.targets || [] + + // validate data + try { + validateTargetsList(targets) + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + + // poke workers + workers.poke(targets) + + // reply to user + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo * @param {Connection} connection * @return {Promise<*>} */ -async function onRequestMessage(message, connection) { +async function onStatus(data, requestNo, connection) { + const info = await workers.getInfo(data.poll_workers || false) + + let status = { + workers: info, + memoryUsage: process.memoryUsage() + } + + connection.send( + new ResponseMessage(requestNo) + .setData(status) + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + * @return {Promise<*>} + */ +async function onRunManual(data, requestNo, connection) { + const {jobs} = data + + // validate data try { - logger.info('onMessage:', message) - - switch (message.requestType) { - case 'register-worker': { - const targets = message.requestData?.targets || [] - - // validate data - try { - validateTargetsList(targets) - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - workers.add(connection, targets) - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } - - case 'poke': { - const targets = message.requestData?.targets || [] - - // validate data - try { - validateTargetsList(targets) - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - // poke workers - workers.poke(targets) - - // reply to user - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } - - case 'status': - const info = await workers.getInfo(message.requestData?.poll_workers || false) - - let status = { - workers: info, - memoryUsage: process.memoryUsage() - } - - connection.send( - new ResponseMessage(message.requestNo) - .setData(status) - ) - - break - - case 'run-manual': - const jobs = message.requestData.jobs - - // validate data - try { - if (!Array.isArray(jobs)) - throw new Error('jobs must be array') - - for (let job of jobs) { - validateObjectSchema(job, [ - // name // type // required - ['id', 'i', true], - ['target', 's', true], - ]) - } - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - - // run jobs on workers - const data = await workers.runManual(jobs) - - // send result to the client - connection.send( - new ResponseMessage(message.requestNo) - .setData(data) - ) - break - - default: - connection.send( - new ResponseMessage(message.requestNo) - .setError(`unknown request type: '${message.requestType}'`) - ) - break + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) } - } catch (error) { - logger.error(`error while handling message:`, message, error) + } catch (e) { connection.send( - new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + new ResponseMessage(requestNo) + .setError(e.message) ) + return } + + // run jobs on workers + const jobsData = await workers.runManual(jobs) + + // send result to the client + connection.send( + new ResponseMessage(requestNo) + .setData(jobsData) + ) } + function usage() { let s = `${process.argv[1]} OPTIONS diff --git a/src/jobd.js b/src/jobd.js index 4605137..0b980b7 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -6,6 +6,7 @@ const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') const {validateTargetsList} = require('./lib/data-validator') +const RequestHandler = require('./lib/request-handler') const { Server, Connection, @@ -37,6 +38,11 @@ let logger let server /** + * @type {RequestHandler} + */ +let requestHandler + +/** * @type {object.<string, Promise>} */ let jobPromises = {} @@ -49,6 +55,15 @@ main().catch(e => { async function main() { + await initApp('jobd') + await initDatabase() + initWorker() + initRequestHandler() + initServer() + connectToMaster() +} + +async function initApp(appName) { if (process.argv.length < 3) { usage() process.exit(0) @@ -85,18 +100,12 @@ async function main() { levelFile: config.get('log_level_file'), levelConsole: config.get('log_level_console'), }) - logger = loggerModule.getLogger('jobd') + logger = loggerModule.getLogger(appName) - // init database - try { - await db.init() - } catch (error) { - logger.error('failed to connect to MySQL', error) - process.exit(1) - } - logger.info('db initialized') + process.title = appName +} - // init queue +function initWorker() { worker = new Worker() for (let targetName in config.get('targets')) { let slots = config.get('targets')[targetName].slots @@ -119,186 +128,192 @@ async function main() { logger.warn(`job-done: jobPromises[${data.id}] is undefined`) } }) - logger.info('queue initialized') +} - // start server +function initRequestHandler() { + requestHandler = new RequestHandler() + requestHandler.set('poll', onPollRequest) + requestHandler.set('status', onStatus) + requestHandler.set('run-manual', onRunManual) +} + +function initServer() { server = new Server() server.on('new-connection', (connection) => { - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) }) server.start(config.get('port'), config.get('host')) - logger.info('server started') - - // connect to master - if (config.get('master_port') && config.get('master_host')) - connectToMaster() } +async function initDatabase() { + try { + await db.init() + } catch (error) { + logger.error('failed to connect to MySQL', error) + process.exit(1) + } + logger.info('db initialized') +} /** - * @param {RequestMessage|ResponseMessage} message + * @param {object} data + * @param {number} requestNo * @param {Connection} connection - * @return {Promise<*>} */ -async function onRequestMessage(message, connection) { - try { - logger.info('onMessage:', message) - - switch (message.requestType) { - case 'poll': { - // null means all - let targets = null - - if (message.requestData?.targets !== undefined) { - targets = message.requestData?.targets - - // validate data - try { - validateTargetsList(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - } - - worker.setPollTargets(targets) - worker.poll() - - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } +function onPollRequest(data, requestNo, connection) { + // null means all targets + let targets = null - case 'status': { - const qs = worker.getStatus() - connection.send( - new ResponseMessage(message.requestNo) - .setData({ - queue: qs, - jobDoneAwaitersCount: Object.keys(jobPromises).length, - memoryUsage: process.memoryUsage() - }) - ) - break - } + if (data.targets !== undefined) { + targets = data.targets + + // validate data + try { + validateTargetsList(targets) - case 'run-manual': { - let {ids: jobIds} = message.requestData - jobIds = uniq(jobIds) - - // if at least one of the jobs is already being run, reject - // if at least one item is not a number, reject - for (const id of jobIds) { - if (typeof id !== 'number') { - connection.send( - new ResponseMessage(message.requestNo) - .setError(`all ids must be numbers, got ${typeof id}`) - ) - return - } - - if (id in jobPromises) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(`another client is already waiting for job ${id}`) - ) - return - } - } - - // create a bunch of promises, one per job - let promises = [] - for (const id of jobIds) { - const P = createCallablePromise() - jobPromises[id] = P - promises.push(P) - } - - // get jobs from database and enqueue for execution - const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) - - // wait till all jobs are done (or failed), then send a response - Promise.allSettled(promises).then(results => { - const response = {} - - for (let i = 0; i < results.length; i++) { - let jobId = jobIds[i] - let result = results[i] - - if (result.status === 'fulfilled') { - if (!('jobs' in response)) - response.jobs = {} - - if (result.value?.id !== undefined) - delete result.value.id - - response.jobs[jobId] = result.value - } else if (result.status === 'rejected') { - if (!('errors' in response)) - response.errors = {} - - response.errors[jobId] = result.reason?.message - } - } - - connection.send( - new ResponseMessage(message.requestNo) - .setData(response) - ) - }) - - // reject all ignored / non-found jobs - for (const [id, value] of results.entries()) { - if (!(id in jobPromises)) { - this.logger.error(`run-manual: ${id} not found in jobPromises`) - continue - } - - if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { - const P = jobPromises[id] - delete jobPromises[id] - - if (value.result === JOB_IGNORED) - P.reject(new Error(value.reason)) - - else if (value.result === JOB_NOTFOUND) - P.reject(new Error(`job ${id} not found`)) - } - } - - break + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) } + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + } + + worker.setPollTargets(targets) + worker.poll() - default: - connection.send( - new ResponseMessage(message.requestNo) - .setError(`unknown request type: '${message.requestType}'`) - ) - break + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onStatus(data, requestNo, connection) { + const qs = worker.getStatus() + connection.send( + new ResponseMessage(requestNo) + .setData({ + queue: qs, + jobPromisesCount: Object.keys(jobPromises).length, + memoryUsage: process.memoryUsage() + }) + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + * @return {Promise<void>} + */ +async function onRunManual(data, requestNo, connection) { + let {ids: jobIds} = data + jobIds = uniq(jobIds) + + // if at least one of the jobs is already being run, reject + // if at least one item is not a number, reject + for (const id of jobIds) { + if (typeof id !== 'number') { + connection.send( + new ResponseMessage(requestNo) + .setError(`all ids must be numbers, got ${typeof id}`) + ) + return } - } catch (error) { - logger.error(`error while handling message:`, message, error) + + if (id in jobPromises) { + connection.send( + new ResponseMessage(requestNo) + .setError(`another client is already waiting for job ${id}`) + ) + return + } + } + + // create a bunch of promises, one per job + let promises = [] + for (const id of jobIds) { + const P = createCallablePromise() + jobPromises[id] = P + promises.push(P) + } + + // get jobs from database and enqueue for execution + const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) + + // wait till all jobs are done (or failed), then send a response + Promise.allSettled(promises).then(results => { + const response = {} + + for (let i = 0; i < results.length; i++) { + let jobId = jobIds[i] + let result = results[i] + + if (result.status === 'fulfilled') { + if (!('jobs' in response)) + response.jobs = {} + + if (result.value?.id !== undefined) + delete result.value.id + + response.jobs[jobId] = result.value + } else if (result.status === 'rejected') { + if (!('errors' in response)) + response.errors = {} + + response.errors[jobId] = result.reason?.message + } + } + connection.send( - new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + new ResponseMessage(requestNo) + .setData(response) ) + }) + + // reject all ignored / non-found jobs + for (const [id, value] of results.entries()) { + if (!(id in jobPromises)) { + this.logger.error(`run-manual: ${id} not found in jobPromises`) + continue + } + + if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { + const P = jobPromises[id] + delete jobPromises[id] + + if (value.result === JOB_IGNORED) + P.reject(new Error(value.reason)) + + else if (value.result === JOB_NOTFOUND) + P.reject(new Error(`job ${id} not found`)) + } } } - function connectToMaster() { + const port = config.get('master_port') + const host = config.get('master_host') + + if (!host || port) { + logger.debug('connectToMaster: master host or port is not defined') + return + } + const connection = new Connection() - connection.connect(config.get('master_host'), config.get('master_port')) + connection.connect(host, port) connection.on('connect', function() { connection.sendRequest( @@ -321,10 +336,11 @@ function connectToMaster() { }, config.get('master_reconnect_timeout') * 1000) }) - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) } - function usage() { let s = `${process.argv[1]} OPTIONS diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js new file mode 100644 index 0000000..4ab06fb --- /dev/null +++ b/src/lib/request-handler.js @@ -0,0 +1,59 @@ +const {getLogger} = require('./logger') +const {ResponseMessage} = require('./server') + +class RequestHandler { + + constructor() { + /** + * @type {Map<string, Function>} + */ + this.handlers = new Map() + + /** + * @type {Logger} + */ + this.logger = getLogger('RequestHandler') + } + + /** + * @param {string} requestType + * @param {Function} handler + */ + set(requestType, handler) { + if (this.handlers.has(requestType)) + throw new Error(`handler for '${requestType}' has already been set`) + + this.handlers.set(requestType, handler) + } + + /** + * @param {RequestMessage} message + * @param {Connection} connection + */ + process(message, connection) { + this.logger.info('process:', message) + + if (this.handlers.has(message.requestType)) { + const f = this.handlers.get(message.requestType) + const result = f(message.requestData || {}, message.requestNo, connection) + if (result instanceof Promise) { + result.catch(error => { + this.logger.error(`${message.requestType}:`, error) + + connection.send( + new ResponseMessage(message.requestNo) + .setError('server error: ' + error?.message) + ) + }) + } + } else { + connection.send( + new ResponseMessage(message.requestNo) + .setError(`unknown request type: '${message.requestType}'`) + ) + } + } + +} + +module.exports = RequestHandler
\ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index 81c2c84..1d923a8 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -50,7 +50,7 @@ class ResponseMessage extends Message { this.error = null /** - * @type {null|string|number|object|array} + * @type {null|object} */ this.data = null } @@ -108,7 +108,7 @@ class RequestMessage extends Message { this.requestType = type /** - * @type any + * @type {null|string|number|object|array} */ this.requestData = data |