From 4cc2e3029fedca4f48f4f112282a7e381b6292e4 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 3 Mar 2021 01:05:25 +0300 Subject: refactor code and set process.title --- src/jobd.js | 354 +++++++++++++++++++++++++++++++----------------------------- 1 file changed, 185 insertions(+), 169 deletions(-) (limited to 'src/jobd.js') diff --git a/src/jobd.js b/src/jobd.js index 4605137..0b980b7 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -6,6 +6,7 @@ const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') const {validateTargetsList} = require('./lib/data-validator') +const RequestHandler = require('./lib/request-handler') const { Server, Connection, @@ -36,6 +37,11 @@ let logger */ let server +/** + * @type {RequestHandler} + */ +let requestHandler + /** * @type {object.} */ @@ -49,6 +55,15 @@ main().catch(e => { async function main() { + await initApp('jobd') + await initDatabase() + initWorker() + initRequestHandler() + initServer() + connectToMaster() +} + +async function initApp(appName) { if (process.argv.length < 3) { usage() process.exit(0) @@ -85,18 +100,12 @@ async function main() { levelFile: config.get('log_level_file'), levelConsole: config.get('log_level_console'), }) - logger = loggerModule.getLogger('jobd') + logger = loggerModule.getLogger(appName) - // init database - try { - await db.init() - } catch (error) { - logger.error('failed to connect to MySQL', error) - process.exit(1) - } - logger.info('db initialized') + process.title = appName +} - // init queue +function initWorker() { worker = new Worker() for (let targetName in config.get('targets')) { let slots = config.get('targets')[targetName].slots @@ -119,186 +128,192 @@ async function main() { logger.warn(`job-done: jobPromises[${data.id}] is undefined`) } }) - logger.info('queue initialized') +} - // start server +function initRequestHandler() { + requestHandler = new RequestHandler() + requestHandler.set('poll', onPollRequest) + requestHandler.set('status', onStatus) + requestHandler.set('run-manual', onRunManual) +} + +function initServer() { server = new Server() server.on('new-connection', (connection) => { - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) }) server.start(config.get('port'), config.get('host')) - logger.info('server started') - - // connect to master - if (config.get('master_port') && config.get('master_host')) - connectToMaster() } +async function initDatabase() { + try { + await db.init() + } catch (error) { + logger.error('failed to connect to MySQL', error) + process.exit(1) + } + logger.info('db initialized') +} /** - * @param {RequestMessage|ResponseMessage} message + * @param {object} data + * @param {number} requestNo * @param {Connection} connection - * @return {Promise<*>} */ -async function onRequestMessage(message, connection) { - try { - logger.info('onMessage:', message) - - switch (message.requestType) { - case 'poll': { - // null means all - let targets = null - - if (message.requestData?.targets !== undefined) { - targets = message.requestData?.targets - - // validate data - try { - validateTargetsList(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(e.message) - ) - return - } - } - - worker.setPollTargets(targets) - worker.poll() - - connection.send( - new ResponseMessage(message.requestNo) - .setData('ok') - ) - break - } +function onPollRequest(data, requestNo, connection) { + // null means all targets + let targets = null - case 'status': { - const qs = worker.getStatus() - connection.send( - new ResponseMessage(message.requestNo) - .setData({ - queue: qs, - jobDoneAwaitersCount: Object.keys(jobPromises).length, - memoryUsage: process.memoryUsage() - }) - ) - break - } + if (data.targets !== undefined) { + targets = data.targets + + // validate data + try { + validateTargetsList(targets) - case 'run-manual': { - let {ids: jobIds} = message.requestData - jobIds = uniq(jobIds) - - // if at least one of the jobs is already being run, reject - // if at least one item is not a number, reject - for (const id of jobIds) { - if (typeof id !== 'number') { - connection.send( - new ResponseMessage(message.requestNo) - .setError(`all ids must be numbers, got ${typeof id}`) - ) - return - } - - if (id in jobPromises) { - connection.send( - new ResponseMessage(message.requestNo) - .setError(`another client is already waiting for job ${id}`) - ) - return - } - } - - // create a bunch of promises, one per job - let promises = [] - for (const id of jobIds) { - const P = createCallablePromise() - jobPromises[id] = P - promises.push(P) - } - - // get jobs from database and enqueue for execution - const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) - - // wait till all jobs are done (or failed), then send a response - Promise.allSettled(promises).then(results => { - const response = {} - - for (let i = 0; i < results.length; i++) { - let jobId = jobIds[i] - let result = results[i] - - if (result.status === 'fulfilled') { - if (!('jobs' in response)) - response.jobs = {} - - if (result.value?.id !== undefined) - delete result.value.id - - response.jobs[jobId] = result.value - } else if (result.status === 'rejected') { - if (!('errors' in response)) - response.errors = {} - - response.errors[jobId] = result.reason?.message - } - } - - connection.send( - new ResponseMessage(message.requestNo) - .setData(response) - ) - }) - - // reject all ignored / non-found jobs - for (const [id, value] of results.entries()) { - if (!(id in jobPromises)) { - this.logger.error(`run-manual: ${id} not found in jobPromises`) - continue - } - - if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { - const P = jobPromises[id] - delete jobPromises[id] - - if (value.result === JOB_IGNORED) - P.reject(new Error(value.reason)) - - else if (value.result === JOB_NOTFOUND) - P.reject(new Error(`job ${id} not found`)) - } - } - - break + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) } + } catch (e) { + connection.send( + new ResponseMessage(requestNo) + .setError(e.message) + ) + return + } + } + + worker.setPollTargets(targets) + worker.poll() - default: - connection.send( - new ResponseMessage(message.requestNo) - .setError(`unknown request type: '${message.requestType}'`) - ) - break + connection.send( + new ResponseMessage(requestNo) + .setData('ok') + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + */ +function onStatus(data, requestNo, connection) { + const qs = worker.getStatus() + connection.send( + new ResponseMessage(requestNo) + .setData({ + queue: qs, + jobPromisesCount: Object.keys(jobPromises).length, + memoryUsage: process.memoryUsage() + }) + ) +} + +/** + * @param {object} data + * @param {number} requestNo + * @param {Connection} connection + * @return {Promise} + */ +async function onRunManual(data, requestNo, connection) { + let {ids: jobIds} = data + jobIds = uniq(jobIds) + + // if at least one of the jobs is already being run, reject + // if at least one item is not a number, reject + for (const id of jobIds) { + if (typeof id !== 'number') { + connection.send( + new ResponseMessage(requestNo) + .setError(`all ids must be numbers, got ${typeof id}`) + ) + return } - } catch (error) { - logger.error(`error while handling message:`, message, error) + + if (id in jobPromises) { + connection.send( + new ResponseMessage(requestNo) + .setError(`another client is already waiting for job ${id}`) + ) + return + } + } + + // create a bunch of promises, one per job + let promises = [] + for (const id of jobIds) { + const P = createCallablePromise() + jobPromises[id] = P + promises.push(P) + } + + // get jobs from database and enqueue for execution + const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) + + // wait till all jobs are done (or failed), then send a response + Promise.allSettled(promises).then(results => { + const response = {} + + for (let i = 0; i < results.length; i++) { + let jobId = jobIds[i] + let result = results[i] + + if (result.status === 'fulfilled') { + if (!('jobs' in response)) + response.jobs = {} + + if (result.value?.id !== undefined) + delete result.value.id + + response.jobs[jobId] = result.value + } else if (result.status === 'rejected') { + if (!('errors' in response)) + response.errors = {} + + response.errors[jobId] = result.reason?.message + } + } + connection.send( - new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + new ResponseMessage(requestNo) + .setData(response) ) + }) + + // reject all ignored / non-found jobs + for (const [id, value] of results.entries()) { + if (!(id in jobPromises)) { + this.logger.error(`run-manual: ${id} not found in jobPromises`) + continue + } + + if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { + const P = jobPromises[id] + delete jobPromises[id] + + if (value.result === JOB_IGNORED) + P.reject(new Error(value.reason)) + + else if (value.result === JOB_NOTFOUND) + P.reject(new Error(`job ${id} not found`)) + } } } - function connectToMaster() { + const port = config.get('master_port') + const host = config.get('master_host') + + if (!host || port) { + logger.debug('connectToMaster: master host or port is not defined') + return + } + const connection = new Connection() - connection.connect(config.get('master_host'), config.get('master_port')) + connection.connect(host, port) connection.on('connect', function() { connection.sendRequest( @@ -321,10 +336,11 @@ function connectToMaster() { }, config.get('master_reconnect_timeout') * 1000) }) - connection.on('request-message', onRequestMessage) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) } - function usage() { let s = `${process.argv[1]} OPTIONS -- cgit v1.2.3