From 23c16a2c80f0614d0b31cba363bca66e1a60687b Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 16 Mar 2021 01:06:14 +0300 Subject: jobd: add add-target()/remove-target(); code refactoring --- src/jobd.js | 319 ++++++++++++++++++++++++------------------------------------ 1 file changed, 129 insertions(+), 190 deletions(-) (limited to 'src/jobd.js') diff --git a/src/jobd.js b/src/jobd.js index e1331d1..bb912fc 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,8 +5,11 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') -const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator') -const RequestHandler = require('./lib/request-handler') +const { + validateInputTargetAndConcurrency, + validateInputTargets +} = require('./lib/data-validator') +const {RequestHandler} = require('./lib/request-handler') const { Server, Connection, @@ -139,6 +142,8 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('add-target', onAddTarget) + requestHandler.set('remove-target', onRemoveTarget) requestHandler.set('set-target-concurrency', onSetTargetConcurrency) } @@ -162,69 +167,119 @@ async function initDatabase() { logger.info('db initialized') } +function connectToMaster() { + const port = config.get('master_port') + const host = config.get('master_host') + + if (!host || !port) { + logger.debug('connectToMaster: master host or port is not defined') + return + } + + async function connect() { + const connection = new Connection() + await connection.connect(host, port) + + try { + let response = await connection.sendRequest( + new RequestMessage('register-worker', { + targets: worker.getTargets() + }) + ) + logger.debug('connectToMaster: response:', response) + } catch (error) { + logger.error('connectToMaster: error while awaiting response:', error) + } + + connection.on('close', () => { + logger.warn(`connectToMaster: connection closed`) + tryToConnect() + }) + + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) + } + + function tryToConnect(now = false) { + setTimeout(() => { + connect().catch(error => { + logger.warn(`connectToMaster: connection failed`, error) + }) + }, now ? 0 : config.get('master_reconnect_timeout') * 1000) + } + + tryToConnect(true) +} + +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` + + console.log(s) +} + +async function term() { + if (logger) + logger.info('shutdown') + + await loggerModule.shutdown() + process.exit() +} + + + +/****************************************/ +/** **/ +/** Request handlers **/ +/** **/ +/****************************************/ + /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @return {Promise} */ -function onPollRequest(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return +async function onPollRequest(data) { + let targets = validateInputTargets(data, worker) worker.setPollTargets(targets) worker.poll() - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @return {Promise} */ -function onStatus(data, requestNo, connection) { - connection.send( - new ResponseMessage(requestNo) - .setData({ - targets: worker.getStatus(), - jobPromisesCount: Object.keys(jobPromises).length, - memoryUsage: process.memoryUsage() - }) - ) +async function onStatus(data) { + return { + targets: worker.getStatus(), + jobPromisesCount: Object.keys(jobPromises).length, + memoryUsage: process.memoryUsage() + } } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection - * @return {Promise} + * @param {{ids: number[]}} data + * @return {Promise} */ -async function onRunManual(data, requestNo, connection) { +async function onRunManual(data) { let {ids: jobIds} = data jobIds = uniq(jobIds) - // if at least one of the jobs is already being run, reject - // if at least one item is not a number, reject for (const id of jobIds) { - if (typeof id !== 'number') { - connection.send( - new ResponseMessage(requestNo) - .setError(`all ids must be numbers, got ${typeof id}`) - ) - return - } + // if at least one item is not a number, reject + if (typeof id !== 'number') + throw new Error(`all ids must be numbers, got ${typeof id}`) - if (id in jobPromises) { - connection.send( - new ResponseMessage(requestNo) - .setError(`another client is already waiting for job ${id}`) - ) - return - } + // if at least one of the jobs is already being run, reject + if (id in jobPromises) + throw new Error(`another client is already waiting for job ${id}`) } // create a bunch of promises, one per job @@ -239,7 +294,7 @@ async function onRunManual(data, requestNo, connection) { const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) // wait till all jobs are done (or failed), then send a response - Promise.allSettled(promises).then(results => { + const P = Promise.allSettled(promises).then(results => { const response = {} for (let i = 0; i < results.length; i++) { @@ -262,10 +317,7 @@ async function onRunManual(data, requestNo, connection) { } } - connection.send( - new ResponseMessage(requestNo) - .setData(response) - ) + return response }) // reject all ignored / non-found jobs @@ -286,33 +338,25 @@ async function onRunManual(data, requestNo, connection) { P.reject(new Error(`job ${id} not found`)) } } + + return P } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{targets: string[]}} data */ -function onPause(data, requestNo, connection) { - let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) - return - +async function onPause(data) { + let targets = validateInputTargets(data, worker) worker.pauseTargets(targets) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{targets: string[]}} data */ -function onContinue(data, requestNo, connection) { +async function onContinue(data) { let targets - if ((targets = validateInputTargets(data, requestNo, connection)) === false) + if ((targets = validateInputTargets(data, worker)) === false) return // continue queues @@ -321,137 +365,32 @@ function onContinue(data, requestNo, connection) { // poll just in case worker.poll() - // ok - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) + return 'ok' } /** - * @param {object} data - * @param {number} requestNo - * @param {Connection} connection + * @param {{target: string, concurrency: int}} data */ -function onSetTargetConcurrency(data, requestNo, connection) { - try { - validateObjectSchema(data, [ - // name // type // required - ['concurrency', 'i', true], - ['target', 's', true], - ]) - - if (data.concurrency <= 0) - throw new Error('Invalid concurrency value.') - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return - } - - worker.setTargetConcurrency(data.target, data.concurrency) - connection.send( - new ResponseMessage(requestNo) - .setData('ok') - ) +async function onAddTarget(data) { + validateInputTargetAndConcurrency(data) + worker.addTarget(data.target, data.concurrency) + return 'ok' } /** - * @private - * @param data - * @param requestNo - * @param connection - * @return {null|boolean|string[]} + * @param {{target: string}} data */ -function validateInputTargets(data, requestNo, connection) { - // null means all targets - let targets = null - - if (data.targets !== undefined) { - targets = data.targets - - // validate data - try { - validateTargetsListFormat(targets) - - for (const t of targets) { - if (!worker.hasTarget(t)) - throw new Error(`invalid target '${t}'`) - } - } catch (e) { - connection.send( - new ResponseMessage(requestNo) - .setError(e.message) - ) - return false - } - } - - return targets +async function onRemoveTarget(data) { + validateInputTargetAndConcurrency(data, true) + worker.removeTarget(data.target) + return 'ok' } -function connectToMaster() { - const port = config.get('master_port') - const host = config.get('master_host') - - if (!host || !port) { - logger.debug('connectToMaster: master host or port is not defined') - return - } - - async function connect() { - const connection = new Connection() - await connection.connect(host, port) - - try { - let response = await connection.sendRequest( - new RequestMessage('register-worker', { - targets: worker.getTargets() - }) - ) - logger.debug('connectToMaster: response:', response) - } catch (error) { - logger.error('connectToMaster: error while awaiting response:', error) - } - - connection.on('close', () => { - logger.warn(`connectToMaster: connection closed`) - tryToConnect() - }) - - connection.on('request-message', (message, connection) => { - requestHandler.process(message, connection) - }) - } - - function tryToConnect(now = false) { - setTimeout(() => { - connect().catch(error => { - logger.warn(`connectToMaster: connection failed`, error) - }) - }, now ? 0 : config.get('master_reconnect_timeout') * 1000) - } - - tryToConnect(true) -} - -function usage() { - let s = `${process.argv[1]} OPTIONS - -Options: - --config Path to config. Default: ${DEFAULT_CONFIG_PATH} - --help Show this help. - --version Print version.` - - console.log(s) -} - -async function term() { - if (logger) - logger.info('shutdown') - - await loggerModule.shutdown() - process.exit() -} +/** + * @param {object} data + */ +async function onSetTargetConcurrency(data) { + validateInputTargetAndConcurrency(data) + worker.setTargetConcurrency(data.target, data.concurrency) + return 'ok' +} \ No newline at end of file -- cgit v1.2.3