From db7e1be9b58ba92556d579cd4b814ae083602bc9 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 7 Mar 2021 19:41:43 +0300 Subject: jobctl --- src/jobctl.js | 430 +++++++++++++++++++++++++++++++++++++++++++++ src/jobd-master.js | 18 +- src/jobd.js | 69 +++++--- src/lib/config.js | 51 ++++++ src/lib/logger.js | 20 ++- src/lib/request-handler.js | 2 +- src/lib/server.js | 30 +++- 7 files changed, 576 insertions(+), 44 deletions(-) create mode 100755 src/jobctl.js (limited to 'src') diff --git a/src/jobctl.js b/src/jobctl.js new file mode 100755 index 0000000..2a1fa30 --- /dev/null +++ b/src/jobctl.js @@ -0,0 +1,430 @@ +#!/usr/bin/env node +const minimist = require('minimist') +const loggerModule = require('./lib/logger') +const config = require('./lib/config') +const package_json = require('../package.json') +const os = require('os') +const path = require('path') +const fs = require('fs/promises') +const {Connection, RequestMessage} = require('./lib/server') +const {isNumeric} = require('./lib/util') +const columnify = require('columnify') + +const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf') + +const WORKER_COMMANDS = { + 'list-targets': workerListTargets, + 'memory-usage': workerMemoryUsage, + 'poll': workerPoll, + 'set-target-concurrency': workerSetTargetConcurrency, + 'pause': workerPause, + 'continue': workerContinue +} + +const MASTER_COMMANDS = { + 'list-workers': masterListWorkers, + // 'list-workers-memory-usage': masterListWorkersMemoryUsage, + 'memory-usage': masterMemoryUsage, + 'poke': masterPoke, + + // we can just reuse worker functions here, as they do the same + 'pause': workerPause, + 'continue': workerContinue, +} + +/** + * @type {Logger} + */ +let logger + +/** + * @type {Connection} + */ +let connection + + +main().catch(e => { + console.error(e) + process.exit(1) +}) + + +async function main() { + const argv = await initApp('jobctl') + if (!argv.length) + usage() + + const isMaster = config.get('master') + + logger.info('Working mode: ' + (isMaster ? 'master' : 'worker')) + logger.trace('Command arguments: ', argv) + + let availableCommands = isMaster ? MASTER_COMMANDS : WORKER_COMMANDS + let command = argv.shift() + if (!(command in availableCommands)) { + logger.error(`Unsupported command: '${command}'`) + process.exit(1) + } + + let host = config.get('host') + let port = config.get('port') + + // connect to instance + try { + connection = new Connection() + await connection.connect(host, port) + + logger.info('Successfully connected.') + } catch (error) { + logger.error('Connection failure:', error) + process.exit(1) + } + + try { + await availableCommands[command](argv) + } catch (e) { + logger.error(e.message) + } + + connection.close() + + // initWorker() + // initRequestHandler() + // initServer() + // connectToMaster() +} + +async function initApp(appName) { + if (process.argv.length < 3) + usage() + + process.on('SIGINT', term) + process.on('SIGTERM', term) + + const argv = minimist(process.argv.slice(2), { + boolean: ['master', 'version', 'help'], + string: ['host', 'port', 'config', 'log-level'], + stopEarly: true, + default: { + config: DEFAULT_CONFIG_PATH + } + }) + + if (argv.help) + usage() + + if (argv.version) { + console.log(package_json.version) + process.exit(0) + } + + // read config + if (await exists(argv.config)) { + try { + config.parseJobctlConfig(argv.config, { + master: argv.master, + log_level: argv['log-level'], + host: argv.host, + port: parseInt(argv.port, 10), + }) + } catch (e) { + console.error(`config parsing error: ${e.message}`) + process.exit(1) + } + } + + // init logger + await loggerModule.init({ + levelConsole: config.get('log_level'), + disableTimestamps: true + }) + logger = loggerModule.getLogger(appName) + + process.title = appName + + /// /// /// + /// \\\ \\\ + /// /// /// + /// \\\ \\\ + /// /// /// + /* * * * * */ + /* */ + /* ^_^ */ + /* */ + /* '_' */ + /* */ + /* <_< */ + /* */ + /* >_> */ + /* */ + /* * * * * */ + /// /// /// + /// \\\ \\\ + /// /// /// + /// \\\ \\\ + /// /// /// + + return argv['_'] || [] +} + +async function workerListTargets() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const rows = [] + const columns = [ + 'target', + 'concurrency', + 'length', + 'paused' + ] + for (const target in response.data.targets) { + const row = [ + target, + response.data.targets[target].concurrency, + response.data.targets[target].length, + response.data.targets[target].paused ? 'yes' : 'no' + ] + rows.push(row) + } + + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function workerMemoryUsage() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const columns = ['what', 'value'] + const rows = [] + for (const what in response.data.memoryUsage) + rows.push([what, response.data.memoryUsage[what]]) + rows.push(['pendingJobPromises', response.data.jobPromisesCount]) + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function workerPoll(argv) { + return await sendCommandForTargets(argv, 'poll') +} + +async function workerPause(argv) { + return await sendCommandForTargets(argv, 'pause') +} + +async function workerContinue(argv) { + return await sendCommandForTargets(argv, 'continue') +} + +async function workerSetTargetConcurrency(argv) { + if (argv.length !== 2) + throw new Error('Invalid number of arguments.') + + let [target, concurrency] = argv + if (!isNumeric(concurrency)) + throw new Error(`'concurrency' must be a number.`) + + concurrency = parseInt(concurrency, 10) + + try { + let response = await connection.sendRequest( + new RequestMessage('set-target-concurrency', { + target, concurrency + }) + ) + + if (response.error) + throw new Error(`Worker error: ${response.error}`) + + console.log(response.data) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function masterPoke(argv) { + return await sendCommandForTargets(argv, 'poke') +} + +async function masterMemoryUsage() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const columns = ['what', 'value'] + const rows = [] + for (const what in response.data.memoryUsage) + rows.push([what, response.data.memoryUsage[what]]) + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function masterListWorkers() { + try { + let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true})) + const columns = ['worker', 'targets', 'concurrency', 'length', 'paused'] + const rows = [] + for (const worker of response.data.workers) { + let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}` + let targets = Object.keys(worker.workerStatus.targets) + let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency) + let lengths = targets.map(t => worker.workerStatus.targets[t].length) + let pauses = targets.map(t => worker.workerStatus.targets[t].paused ? 'yes' : 'no') + rows.push([ + remoteAddr, + targets.join("\n"), + concurrencies.join("\n"), + lengths.join("\n"), + pauses.join("\n") + ]) + } + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function sendCommandForTargets(targets, command) { + if (!targets.length) + throw new Error('No targets specified.') + + try { + let response = await connection.sendRequest( + new RequestMessage(command, {targets}) + ) + + if (response.error) + throw new Error(`Worker error: ${response.error}`) + + console.log(response.data) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + + +function usage(exitCode = 0) { + let s = `${process.argv[1]} [OPTIONS] COMMAND + +Worker commands: + list-targets Print list of targets, their length and inner state. + memory-usage Print info about memory usage of the worker. + poll <...TARGETS> Ask worker to get tasks for specified targets. + + Example: + $ jobctl poke t1 t2 t3 + + set-target-concurrency + Set concurrency of the target. + + pause <...TARGETS> Pause specified or all targets. + continue <...TARGETS> Pause specified or all targets. + +Master commands: + list-workers Print list of connected workers and their state. + memory-usage Print info about memory usage. + poke <...TARGETS> Poke specified targets. + pause <...TARGETS> Send pause() to all workers serving specified targets. + If no targets specified, just sends pause() to all + connected workers. + continue <...TARGETS> Send continue() to all workers serving specified + targets. If no targets specified, just sends pause() + to all connected workers. + +Options: + --master Connect to jobd-master instance. + --host Address of jobd or jobd-master instance. + --port Port. Default: 7080 when --master is not used, + 7081 otherwise. + --config Path to config. Default: ~/.jobctl.conf + Required for connecting to password-protected + instances. + --log-level 'error', 'warn', 'info', 'debug' or 'trace'. + Default: warn + --help: Show this help. + --version: Print version. + +Configuration file + Config file is required for connecting to password-protected jobd instances. + It can also be used to store hostname, port and log level. + + Here's an example of possible ~/.jobctl.conf file: + + ;password = + hostname = 1.2.3.4 + port = 7080 + log_level = warn + master = true +` + + console.log(s) + process.exit(exitCode) +} + +function term() { + if (logger) + logger.info('shutdown') + + loggerModule.shutdown(function() { + process.exit() + }) +} + +async function exists(file) { + let exists + try { + await fs.stat(file) + exists = true + } catch (error) { + exists = false + } + return exists +} + +function table(columns, rows) { + const maxColumnSize = {} + for (const c of columns) + maxColumnSize[c] = c.length + + rows = rows.map(values => { + if (!Array.isArray(values)) + throw new Error('row must be array, got', values) + + let row = {} + for (let i = 0; i < columns.length; i++) { + let value = String(values[i]) + row[columns[i]] = value + + let width + if (value.indexOf('\n') !== -1) { + width = Math.max(...value.split('\n').map(s => s.length)) + } else { + width = value.length + } + + if (width > maxColumnSize[columns[i]]) + maxColumnSize[columns[i]] = width + } + + return row + }) + + console.log(columnify(rows, { + columns, + preserveNewLines: true, + columnSplitter: ' | ', + headingTransform: (text) => { + const repeat = () => '-'.repeat(maxColumnSize[text]) + return `${text.toUpperCase()}\n${repeat()}` + } + })) +} \ No newline at end of file diff --git a/src/jobd-master.js b/src/jobd-master.js index 5839b1a..c926dc0 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -8,6 +8,8 @@ const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-va const RequestHandler = require('./lib/request-handler') const package_json = require('../package.json') +const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf" + /** * @type {Logger} */ @@ -51,7 +53,12 @@ async function initApp(appName) { process.on('SIGINT', term) process.on('SIGTERM', term) - const argv = minimist(process.argv.slice(2)) + const argv = minimist(process.argv.slice(2), { + boolean: ['help', 'version'], + default: { + config: DEFAULT_CONFIG_PATH + } + }) if (argv.help) { usage() @@ -63,9 +70,6 @@ async function initApp(appName) { process.exit(0) } - if (!argv.config) - throw new Error('--config option is required') - // read config try { config.parseMasterConfig(argv.config) @@ -295,9 +299,9 @@ function usage() { let s = `${process.argv[1]} OPTIONS Options: - --config - --help - --version` + --config Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` console.log(s) } diff --git a/src/jobd.js b/src/jobd.js index de8807f..0d8af32 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -17,11 +17,12 @@ const { Worker, STATUS_MANUAL, JOB_NOTFOUND, - JOB_ACCEPTED, JOB_IGNORED } = require('./lib/worker') const package_json = require('../package.json') +const DEFAULT_CONFIG_PATH = "/etc/jobd.conf" + /** * @type {Worker} */ @@ -72,7 +73,12 @@ async function initApp(appName) { process.on('SIGINT', term) process.on('SIGTERM', term) - const argv = minimist(process.argv.slice(2)) + const argv = minimist(process.argv.slice(2), { + boolean: ['help', 'version'], + default: { + config: DEFAULT_CONFIG_PATH + } + }) if (argv.help) { usage() @@ -84,9 +90,6 @@ async function initApp(appName) { process.exit(0) } - if (!argv.config) - throw new Error('--config option is required') - // read config try { config.parseWorkerConfig(argv.config) @@ -337,6 +340,9 @@ function onSetTargetConcurrency(data, requestNo, connection) { ['concurrency', 'i', true], ['target', 's', true], ]) + + if (data.concurrency <= 0) + throw new Error('Invalid concurrency value.') } catch (e) { connection.send( new ResponseMessage(requestNo) @@ -395,42 +401,49 @@ function connectToMaster() { return } - const connection = new Connection() - connection.connect(host, port) + async function connect() { + const connection = new Connection() + await connection.connect(host, port) - connection.on('connect', function() { - connection.sendRequest( - new RequestMessage('register-worker', { - targets: worker.getTargets() - }) - ) - .then(response => { + try { + let response = await connection.sendRequest( + new RequestMessage('register-worker', { + targets: worker.getTargets() + }) + ) logger.debug('connectToMaster: response:', response) - }) - .catch(error => { + } catch (error) { logger.error('connectToMaster: error while awaiting response:', error) + } + + connection.on('close', () => { + logger.warn(`connectToMaster: connection closed`) + tryToConnect() }) - }) - connection.on('close', () => { - logger.warn(`connectToMaster: connection closed`) + connection.on('request-message', (message, connection) => { + requestHandler.process(message, connection) + }) + } + + function tryToConnect(now = false) { setTimeout(() => { - connectToMaster() - }, config.get('master_reconnect_timeout') * 1000) - }) + connect().catch(error => { + logger.warn(`connectToMaster: connection failed`, error) + }) + }, now ? 0 : config.get('master_reconnect_timeout') * 1000) + } - connection.on('request-message', (message, connection) => { - requestHandler.process(message, connection) - }) + tryToConnect(true) } function usage() { let s = `${process.argv[1]} OPTIONS Options: - --config - --help - --version` + --config Path to config. Default: ${DEFAULT_CONFIG_PATH} + --help Show this help. + --version Print version.` console.log(s) } diff --git a/src/lib/config.js b/src/lib/config.js index ac711fd..73a6226 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -38,7 +38,15 @@ function processScheme(source, scheme) { case 'object': if (typeof value !== 'object') throw new Error(`'${key}' must be an object`) + break + case 'boolean': + if (value !== null) { + value = value.trim() + value = ['true', '1'].includes(value) + } else { + value = false + } break } @@ -114,6 +122,40 @@ function parseMasterConfig(file) { Object.assign(config, processScheme(raw, scheme)) } +/** + * @param {string} file + * @param {{ + * master: boolean, + * log_level: string|undefined, + * host: string, + * port: int, + * }} inputOptions + */ +function parseJobctlConfig(file, inputOptions) { + config = {} + const raw = readFile(file) + + Object.assign(config, processScheme(raw, { + master: {type: 'boolean'}, + password: {}, + log_level: {default: 'warn'}, + })) + + if (inputOptions.master) + config.master = inputOptions.master + Object.assign(config, processScheme(raw, { + host: {default: '127.0.0.1'}, + port: {default: config.master ? 7081 : 7080, type: 'int'} + })) + + for (let key of ['log_level', 'host', 'port']) { + if (inputOptions[key]) + config[key] = inputOptions[key] + } + + // console.log('parseJobctlConfig [2]', config) +} + /** * @param {string|null} key * @return {string|number|object} @@ -131,8 +173,17 @@ function get(key = null) { return config[key] } +/** + * @param {object} opts + */ +// function set(opts) { +// Object.assign(config, opts) +// } + module.exports = { parseWorkerConfig, parseMasterConfig, + parseJobctlConfig, get, + // set, } \ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js index 54c9d54..8a44e07 100644 --- a/src/lib/logger.js +++ b/src/lib/logger.js @@ -3,13 +3,16 @@ const fs = require('fs/promises') const fsConstants = require('fs').constants const util = require('./util') +const ALLOWED_LEVELS = ['trace', 'debug', 'info', 'warn', 'error'] + module.exports = { /** * @param {string} file * @param {string} levelFile * @param {string} levelConsole + * @param {boolean} disableTimestamps */ - async init({file, levelFile, levelConsole}) { + async init({file, levelFile, levelConsole, disableTimestamps=false}) { const categories = { default: { appenders: ['stdout-filter'], @@ -17,19 +20,30 @@ module.exports = { } } + if (!ALLOWED_LEVELS.includes(levelConsole)) + throw new Error(`Level ${levelConsole} is not allowed.`) + const appenders = { stdout: { type: 'stdout', - level: 'trace' + level: 'trace', }, 'stdout-filter': { type: 'logLevelFilter', appender: 'stdout', - level: levelConsole + level: levelConsole, } } + if (disableTimestamps) + appenders.stdout.layout = { + type: 'pattern', + pattern: '%[%p [%c]%] %m', + } if (file) { + if (!ALLOWED_LEVELS.includes(levelFile)) + throw new Error(`Level ${levelFile} is not allowed.`) + let exists try { await fs.stat(file) diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js index 4ab06fb..4330b6b 100644 --- a/src/lib/request-handler.js +++ b/src/lib/request-handler.js @@ -1,5 +1,5 @@ const {getLogger} = require('./logger') -const {ResponseMessage} = require('./server') +const {ResponseMessage, Connection} = require('./server') class RequestHandler { diff --git a/src/lib/server.js b/src/lib/server.js index 618ca8c..051b8be 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -285,12 +285,19 @@ class Connection extends EventEmitter { */ this._requestPromises = {} + /** + * @type {Promise} + * @private + */ + this._connectPromise = null + this._setLogger() } /** * @param {string} host * @param {number} port + * @return {Promise} */ connect(host, port) { if (this.socket !== null) @@ -298,14 +305,18 @@ class Connection extends EventEmitter { this._isOutgoing = true + this.logger.trace(`Connecting to ${host}:${port}`) + this.socket = new net.Socket() - this.socket.connect({host, port}) + this.socket.connect(port, host) this.remoteAddress = host this.remotePort = port this._setLogger() this._setSocketEvents() + + return this._connectPromise = createCallablePromise() } /** @@ -616,14 +627,19 @@ class Connection extends EventEmitter { } for (const no in this._requestPromises) { - this._requestPromises[no].reject(new Error('socket is closed')) + this._requestPromises[no].reject(new Error('Socket is closed')) } this._requestPromises = {} } onConnect = () => { - this.logger.debug('connection established') + if (this._connectPromise) { + this._connectPromise.resolve() + this._connectPromise = null + } + + this.logger.debug('Connection established.') this.emit('connect') } @@ -642,12 +658,16 @@ class Connection extends EventEmitter { onClose = (hadError) => { this._handleClose() - this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + this.logger.debug(`Socket closed` + (hadError ? ` with error` : '')) } onError = (error) => { + if (this._connectPromise) { + this._connectPromise.reject(error) + this._connectPromise = null + } this._handleClose() - this.logger.warn(`socket error:`, error) + this.logger.warn(`Socket error:`, error) } } -- cgit v1.2.3