diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-16 01:06:14 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-16 01:06:14 +0300 |
commit | 23c16a2c80f0614d0b31cba363bca66e1a60687b (patch) | |
tree | 6d019e63b5bc4d703d4319f444927ccc7c93f208 /src | |
parent | cbbe60df32bc591758dd98e624a9f3b3c53f1d23 (diff) |
jobd: add add-target()/remove-target(); code refactoring
Diffstat (limited to 'src')
-rwxr-xr-x | src/jobd-master.js | 214 | ||||
-rwxr-xr-x | src/jobd.js | 319 | ||||
-rw-r--r-- | src/lib/data-validator.js | 58 | ||||
-rw-r--r-- | src/lib/logger.js | 3 | ||||
-rw-r--r-- | src/lib/request-handler.js | 17 | ||||
-rw-r--r-- | src/lib/worker.js | 19 |
6 files changed, 275 insertions, 355 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js index 9827126..eeac085 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger') const config = require('./lib/config') const {Server, ResponseMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateObjectSchema, + validateInputTargetsListFormat, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const package_json = require('../package.json') const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf" @@ -112,136 +116,94 @@ function initRequestHandler() { requestHandler.set('continue', onContinue) } +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` + + console.log(s) +} + +async function term() { + if (logger) + logger.info('shutdown') + + await loggerModule.shutdown() + process.exit() +} + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + /** * @param {object} data - * @param {number} requestNo * @param {Connection} connection */ -function onRegisterWorker(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // register worker and reply with OK +async function onRegisterWorker(data, connection) { + const targets = validateInputTargets(data, null) workers.add(connection, targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection */ -function onPoke(data, requestNo, connection) { - const targets = data.targets || [] - - // validate data - try { - validateTargetsListFormat(targets) - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - // poke workers +async function onPoke(data) { + const targets = validateInputTargets(data, null) workers.poke(targets) - - // reply to user - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection * @return {Promise<*>} */ -async function onStatus(data, requestNo, connection) { +async function onStatus(data) { const info = await workers.getInfo(data.poll_workers || false) - - let status = { + return { workers: info, memoryUsage: process.memoryUsage() } - - connection.send( - new ResponseMessage(requestNo) - .setData(status) - ) } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection * @return {Promise<*>} */ -async function onRunManual(data, requestNo, connection) { +async function onRunManual(data) { const {jobs} = data - // validate data - try { - if (!Array.isArray(jobs)) - throw new Error('jobs must be array') - - for (let job of jobs) { - validateObjectSchema(job, [ - // name // type // required - ['id', 'i', true], - ['target', 's', true], - ]) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } + // validate input + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') - // run jobs on workers - const jobsData = await workers.runManual(jobs) + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) + } - // send result to the client - connection.send( - new ResponseMessage(requestNo) - .setData(jobsData) - ) + // run jobs, wait for results and send a response + return await workers.runManual(jobs) } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - +function onPause(data) { + const targets = validateInputTargets(data, null) workers.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** @@ -250,66 +212,8 @@ function onPause(data, requestNo, connection) { * @param {Connection} connection */ function onContinue(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - + const targets = validateInputTargets(data, null) workers.continueTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) -} - - -/** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} - */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - // note: we don't check target names here - // as in jobd - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets + return 'ok' } - -function usage() { - let s = `${process.argv[1]} OPTIONS - -Options: - --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH} - --help Show this help. - --version Print version.` - - console.log(s) -} - -async function term() { - if (logger) - logger.info('shutdown') - - await loggerModule.shutdown() - process.exit() -} diff --git a/src/jobd.js b/src/jobd.js index e1331d1..bb912fc 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,8 +5,11 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateInputTargetAndConcurrency, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const { Server, Connection, @@ -139,6 +142,8 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('add-target', onAddTarget) + requestHandler.set('remove-target', onRemoveTarget) requestHandler.set('set-target-concurrency', onSetTargetConcurrency) } @@ -162,69 +167,119 @@ async function initDatabase() { logger.info('db initialized') } +function connectToMaster() { + const port = config.get('master_port') + const host = config.get('master_host') + + if (!host || !port) { + logger.debug('connectToMaster: master host or port is not defined') + return + } + + async function connect() { + const connection = new Connection() + await connection.connect(host, port) + + try { + let response = await connection.sendRequest( + new RequestMessage('register-worker', { + targets: worker.getTargets() + }) + ) + logger.debug('connectToMaster: response:', response) + } catch (error) { + logger.error('connectToMaster: error while awaiting response:', error) + } + + connection.on('close', () => { + logger.warn(`connectToMaster: connection closed`) + tryToConnect() + }) + + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) + } + + function tryToConnect(now = false) { + setTimeout(() => { + connect().catch(error => { + logger.warn(`connectToMaster: connection failed`, error) + }) + }, now ? 0 : config.get('master_reconnect_timeout') * 1000) + } + + tryToConnect(true) +} + +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` + + console.log(s) +} + +async function term() { + if (logger) + logger.info('shutdown') + + await loggerModule.shutdown() + process.exit() +} + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @return {Promise<string>} */ -function onPollRequest(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return +async function onPollRequest(data) { + let targets = validateInputTargets(data, worker) worker.setPollTargets(targets) worker.poll() - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @return {Promise<object>} */ -function onStatus(data, requestNo, connection) { - connection.send( - new ResponseMessage(requestNo) - .setData({ - targets: worker.getStatus(), - jobPromisesCount: Object.keys(jobPromises).length, - memoryUsage: process.memoryUsage() - }) - ) +async function onStatus(data) { + return { + targets: worker.getStatus(), + jobPromisesCount: Object.keys(jobPromises).length, + memoryUsage: process.memoryUsage() + } } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - * @return {Promise<void>} + * @param {{ids: number[]}} data + * @return {Promise} */ -async function onRunManual(data, requestNo, connection) { +async function onRunManual(data) { let {ids: jobIds} = data jobIds = uniq(jobIds) - // if at least one of the jobs is already being run, reject - // if at least one item is not a number, reject for (const id of jobIds) { - if (typeof id !== 'number') { - connection.send( - new ResponseMessage(requestNo) - .setError(`all ids must be numbers, got ${typeof id}`) - ) - return - } + // if at least one item is not a number, reject + if (typeof id !== 'number') + throw new Error(`all ids must be numbers, got ${typeof id}`) - if (id in jobPromises) { - connection.send( - new ResponseMessage(requestNo) - .setError(`another client is already waiting for job ${id}`) - ) - return - } + // if at least one of the jobs is already being run, reject + if (id in jobPromises) + throw new Error(`another client is already waiting for job ${id}`) } // create a bunch of promises, one per job @@ -239,7 +294,7 @@ async function onRunManual(data, requestNo, connection) { const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) // wait till all jobs are done (or failed), then send a response - Promise.allSettled(promises).then(results => { + const P = Promise.allSettled(promises).then(results => { const response = {} for (let i = 0; i < results.length; i++) { @@ -262,10 +317,7 @@ async function onRunManual(data, requestNo, connection) { } } - connection.send( - new ResponseMessage(requestNo) - .setData(response) - ) + return response }) // reject all ignored / non-found jobs @@ -286,33 +338,25 @@ async function onRunManual(data, requestNo, connection) { P.reject(new Error(`job ${id} not found`)) } } + + return P } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{targets: string[]}} data */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - +async function onPause(data) { + let targets = validateInputTargets(data, worker) worker.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{targets: string[]}} data */ -function onContinue(data, requestNo, connection) { +async function onContinue(data) { let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) + if ((targets = validateInputTargets(data, worker)) === false) return // continue queues @@ -321,137 +365,32 @@ function onContinue(data, requestNo, connection) { // poll just in case worker.poll() - // ok - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{target: string, concurrency: int}} data */ -function onSetTargetConcurrency(data, requestNo, connection) { - try { - validateObjectSchema(data, [ - // name // type // required - ['concurrency', 'i', true], - ['target', 's', true], - ]) - - if (data.concurrency <= 0) - throw new Error('Invalid concurrency value.') - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - worker.setTargetConcurrency(data.target, data.concurrency) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) +async function onAddTarget(data) { + validateInputTargetAndConcurrency(data) + worker.addTarget(data.target, data.concurrency) + return 'ok' } /** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} + * @param {{target: string}} data */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets +async function onRemoveTarget(data) { + validateInputTargetAndConcurrency(data, true) + worker.removeTarget(data.target) + return 'ok' } -function connectToMaster() { - const port = config.get('master_port') - const host = config.get('master_host') - - if (!host || !port) { - logger.debug('connectToMaster: master host or port is not defined') - return - } - - async function connect() { - const connection = new Connection() - await connection.connect(host, port) - - try { - let response = await connection.sendRequest( - new RequestMessage('register-worker', { - targets: worker.getTargets() - }) - ) - logger.debug('connectToMaster: response:', response) - } catch (error) { - logger.error('connectToMaster: error while awaiting response:', error) - } - - connection.on('close', () => { - logger.warn(`connectToMaster: connection closed`) - tryToConnect() - }) - - connection.on('request-message', (message, connection) => { - requestHandler.process(message, connection) - }) - } - - function tryToConnect(now = false) { - setTimeout(() => { - connect().catch(error => { - logger.warn(`connectToMaster: connection failed`, error) - }) - }, now ? 0 : config.get('master_reconnect_timeout') * 1000) - } - - tryToConnect(true) -} - -function usage() { - let s = `${process.argv[1]} OPTIONS - -Options: - --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH} - --help Show this help. - --version Print version.` - - console.log(s) -} - -async function term() { - if (logger) - logger.info('shutdown') - - await loggerModule.shutdown() - process.exit() -} +/** + * @param {object} data + */ +async function onSetTargetConcurrency(data) { + validateInputTargetAndConcurrency(data) + worker.setTargetConcurrency(data.target, data.concurrency) + return 'ok' +}
\ No newline at end of file diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js index 7419b34..74827c1 100644 --- a/src/lib/data-validator.js +++ b/src/lib/data-validator.js @@ -11,6 +11,11 @@ const typeNames = { const logger = getLogger('data-validator') + +/**************************************/ +/** Common Functions **/ +/**************************************/ + /** * @param {string} expectedType * @param value @@ -69,7 +74,12 @@ function validateObjectSchema(data, schema) { } } -function validateTargetsListFormat(targets) { + +/********************************************/ +/** Request input data validators */ +/********************************************/ + +function validateInputTargetsListFormat(targets) { if (!Array.isArray(targets)) throw new Error('targets must be array') @@ -83,7 +93,51 @@ function validateTargetsListFormat(targets) { } } +function validateInputTargetAndConcurrency(data, onlyTarget = false) { + const schema = [ + ['target', 's', true], + ] + + if (!onlyTarget) { + schema.push( + ['concurrency', 'i', true] + ) + } + + validateObjectSchema(data, schema) + + if (!onlyTarget && data.concurrency <= 0) + throw new Error('Invalid concurrency value.') +} + +/** + * @param data + * @param {Worker|null} worker + * @return {null|string[]} + */ +function validateInputTargets(data, worker) { + // null means all targets + let targets = null + + if (data.targets !== undefined) { + targets = data.targets + + validateInputTargetsListFormat(targets) + + if (worker !== null) { + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) + } + } + } + + return targets +} + module.exports = { validateObjectSchema, - validateTargetsListFormat + validateInputTargetsListFormat, + validateInputTargetAndConcurrency, + validateInputTargets, }
\ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js index f886e0c..b71020c 100644 --- a/src/lib/logger.js +++ b/src/lib/logger.js @@ -101,10 +101,9 @@ module.exports = { }, /** - * @param cb * @return {Promise} */ - shutdown(cb) { + shutdown() { return new Promise((resolve, reject) => { log4js.shutdown(resolve) }) diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js index 4330b6b..e7f9fe2 100644 --- a/src/lib/request-handler.js +++ b/src/lib/request-handler.js @@ -35,14 +35,17 @@ class RequestHandler { if (this.handlers.has(message.requestType)) { const f = this.handlers.get(message.requestType) - const result = f(message.requestData || {}, message.requestNo, connection) + const result = f(message.requestData || {}, connection) if (result instanceof Promise) { - result.catch(error => { - this.logger.error(`${message.requestType}:`, error) - + result.then(data => { + connection.send( + new ResponseMessage(message.requestNo) + .setData(data) + ) + }).catch(error => { connection.send( new ResponseMessage(message.requestNo) - .setError('server error: ' + error?.message) + .setError(error?.message) ) }) } @@ -56,4 +59,6 @@ class RequestHandler { } -module.exports = RequestHandler
\ No newline at end of file +module.exports = { + RequestHandler +}
\ No newline at end of file diff --git a/src/lib/worker.js b/src/lib/worker.js index e53d03f..0be6a19 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -73,6 +73,25 @@ class Worker extends EventEmitter { } /** + * Deletes a queue. + * + * @param {string} target + */ + removeTarget(target) { + if (!(target in this.targets)) + throw new Error(`target '${target}' not found`) + + const {queue} = this.targets[target] + if (queue.length > 0) + throw new Error(`queue is not empty`) + + this.logger.debug(`deleteTarget: deleting target' ${target}'`) + queue.removeAllListeners() + queue.end() + delete this.targets[target] + } + + /** * @param {string} target * @param {number} concurrency */ |