From db7e1be9b58ba92556d579cd4b814ae083602bc9 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 7 Mar 2021 19:41:43 +0300 Subject: jobctl --- src/jobctl.js | 430 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 430 insertions(+) create mode 100755 src/jobctl.js (limited to 'src/jobctl.js') diff --git a/src/jobctl.js b/src/jobctl.js new file mode 100755 index 0000000..2a1fa30 --- /dev/null +++ b/src/jobctl.js @@ -0,0 +1,430 @@ +#!/usr/bin/env node +const minimist = require('minimist') +const loggerModule = require('./lib/logger') +const config = require('./lib/config') +const package_json = require('../package.json') +const os = require('os') +const path = require('path') +const fs = require('fs/promises') +const {Connection, RequestMessage} = require('./lib/server') +const {isNumeric} = require('./lib/util') +const columnify = require('columnify') + +const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf') + +const WORKER_COMMANDS = { + 'list-targets': workerListTargets, + 'memory-usage': workerMemoryUsage, + 'poll': workerPoll, + 'set-target-concurrency': workerSetTargetConcurrency, + 'pause': workerPause, + 'continue': workerContinue +} + +const MASTER_COMMANDS = { + 'list-workers': masterListWorkers, + // 'list-workers-memory-usage': masterListWorkersMemoryUsage, + 'memory-usage': masterMemoryUsage, + 'poke': masterPoke, + + // we can just reuse worker functions here, as they do the same + 'pause': workerPause, + 'continue': workerContinue, +} + +/** + * @type {Logger} + */ +let logger + +/** + * @type {Connection} + */ +let connection + + +main().catch(e => { + console.error(e) + process.exit(1) +}) + + +async function main() { + const argv = await initApp('jobctl') + if (!argv.length) + usage() + + const isMaster = config.get('master') + + logger.info('Working mode: ' + (isMaster ? 'master' : 'worker')) + logger.trace('Command arguments: ', argv) + + let availableCommands = isMaster ? MASTER_COMMANDS : WORKER_COMMANDS + let command = argv.shift() + if (!(command in availableCommands)) { + logger.error(`Unsupported command: '${command}'`) + process.exit(1) + } + + let host = config.get('host') + let port = config.get('port') + + // connect to instance + try { + connection = new Connection() + await connection.connect(host, port) + + logger.info('Successfully connected.') + } catch (error) { + logger.error('Connection failure:', error) + process.exit(1) + } + + try { + await availableCommands[command](argv) + } catch (e) { + logger.error(e.message) + } + + connection.close() + + // initWorker() + // initRequestHandler() + // initServer() + // connectToMaster() +} + +async function initApp(appName) { + if (process.argv.length < 3) + usage() + + process.on('SIGINT', term) + process.on('SIGTERM', term) + + const argv = minimist(process.argv.slice(2), { + boolean: ['master', 'version', 'help'], + string: ['host', 'port', 'config', 'log-level'], + stopEarly: true, + default: { + config: DEFAULT_CONFIG_PATH + } + }) + + if (argv.help) + usage() + + if (argv.version) { + console.log(package_json.version) + process.exit(0) + } + + // read config + if (await exists(argv.config)) { + try { + config.parseJobctlConfig(argv.config, { + master: argv.master, + log_level: argv['log-level'], + host: argv.host, + port: parseInt(argv.port, 10), + }) + } catch (e) { + console.error(`config parsing error: ${e.message}`) + process.exit(1) + } + } + + // init logger + await loggerModule.init({ + levelConsole: config.get('log_level'), + disableTimestamps: true + }) + logger = loggerModule.getLogger(appName) + + process.title = appName + + /// /// /// + /// \\\ \\\ + /// /// /// + /// \\\ \\\ + /// /// /// + /* * * * * */ + /* */ + /* ^_^ */ + /* */ + /* '_' */ + /* */ + /* <_< */ + /* */ + /* >_> */ + /* */ + /* * * * * */ + /// /// /// + /// \\\ \\\ + /// /// /// + /// \\\ \\\ + /// /// /// + + return argv['_'] || [] +} + +async function workerListTargets() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const rows = [] + const columns = [ + 'target', + 'concurrency', + 'length', + 'paused' + ] + for (const target in response.data.targets) { + const row = [ + target, + response.data.targets[target].concurrency, + response.data.targets[target].length, + response.data.targets[target].paused ? 'yes' : 'no' + ] + rows.push(row) + } + + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function workerMemoryUsage() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const columns = ['what', 'value'] + const rows = [] + for (const what in response.data.memoryUsage) + rows.push([what, response.data.memoryUsage[what]]) + rows.push(['pendingJobPromises', response.data.jobPromisesCount]) + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function workerPoll(argv) { + return await sendCommandForTargets(argv, 'poll') +} + +async function workerPause(argv) { + return await sendCommandForTargets(argv, 'pause') +} + +async function workerContinue(argv) { + return await sendCommandForTargets(argv, 'continue') +} + +async function workerSetTargetConcurrency(argv) { + if (argv.length !== 2) + throw new Error('Invalid number of arguments.') + + let [target, concurrency] = argv + if (!isNumeric(concurrency)) + throw new Error(`'concurrency' must be a number.`) + + concurrency = parseInt(concurrency, 10) + + try { + let response = await connection.sendRequest( + new RequestMessage('set-target-concurrency', { + target, concurrency + }) + ) + + if (response.error) + throw new Error(`Worker error: ${response.error}`) + + console.log(response.data) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function masterPoke(argv) { + return await sendCommandForTargets(argv, 'poke') +} + +async function masterMemoryUsage() { + try { + let response = await connection.sendRequest(new RequestMessage('status')) + const columns = ['what', 'value'] + const rows = [] + for (const what in response.data.memoryUsage) + rows.push([what, response.data.memoryUsage[what]]) + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function masterListWorkers() { + try { + let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true})) + const columns = ['worker', 'targets', 'concurrency', 'length', 'paused'] + const rows = [] + for (const worker of response.data.workers) { + let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}` + let targets = Object.keys(worker.workerStatus.targets) + let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency) + let lengths = targets.map(t => worker.workerStatus.targets[t].length) + let pauses = targets.map(t => worker.workerStatus.targets[t].paused ? 'yes' : 'no') + rows.push([ + remoteAddr, + targets.join("\n"), + concurrencies.join("\n"), + lengths.join("\n"), + pauses.join("\n") + ]) + } + table(columns, rows) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + +async function sendCommandForTargets(targets, command) { + if (!targets.length) + throw new Error('No targets specified.') + + try { + let response = await connection.sendRequest( + new RequestMessage(command, {targets}) + ) + + if (response.error) + throw new Error(`Worker error: ${response.error}`) + + console.log(response.data) + } catch (error) { + logger.error(error.message) + logger.trace(error) + } +} + + +function usage(exitCode = 0) { + let s = `${process.argv[1]} [OPTIONS] COMMAND + +Worker commands: + list-targets Print list of targets, their length and inner state. + memory-usage Print info about memory usage of the worker. + poll <...TARGETS> Ask worker to get tasks for specified targets. + + Example: + $ jobctl poke t1 t2 t3 + + set-target-concurrency + Set concurrency of the target. + + pause <...TARGETS> Pause specified or all targets. + continue <...TARGETS> Pause specified or all targets. + +Master commands: + list-workers Print list of connected workers and their state. + memory-usage Print info about memory usage. + poke <...TARGETS> Poke specified targets. + pause <...TARGETS> Send pause() to all workers serving specified targets. + If no targets specified, just sends pause() to all + connected workers. + continue <...TARGETS> Send continue() to all workers serving specified + targets. If no targets specified, just sends pause() + to all connected workers. + +Options: + --master Connect to jobd-master instance. + --host Address of jobd or jobd-master instance. + --port Port. Default: 7080 when --master is not used, + 7081 otherwise. + --config Path to config. Default: ~/.jobctl.conf + Required for connecting to password-protected + instances. + --log-level 'error', 'warn', 'info', 'debug' or 'trace'. + Default: warn + --help: Show this help. + --version: Print version. + +Configuration file + Config file is required for connecting to password-protected jobd instances. + It can also be used to store hostname, port and log level. + + Here's an example of possible ~/.jobctl.conf file: + + ;password = + hostname = 1.2.3.4 + port = 7080 + log_level = warn + master = true +` + + console.log(s) + process.exit(exitCode) +} + +function term() { + if (logger) + logger.info('shutdown') + + loggerModule.shutdown(function() { + process.exit() + }) +} + +async function exists(file) { + let exists + try { + await fs.stat(file) + exists = true + } catch (error) { + exists = false + } + return exists +} + +function table(columns, rows) { + const maxColumnSize = {} + for (const c of columns) + maxColumnSize[c] = c.length + + rows = rows.map(values => { + if (!Array.isArray(values)) + throw new Error('row must be array, got', values) + + let row = {} + for (let i = 0; i < columns.length; i++) { + let value = String(values[i]) + row[columns[i]] = value + + let width + if (value.indexOf('\n') !== -1) { + width = Math.max(...value.split('\n').map(s => s.length)) + } else { + width = value.length + } + + if (width > maxColumnSize[columns[i]]) + maxColumnSize[columns[i]] = width + } + + return row + }) + + console.log(columnify(rows, { + columns, + preserveNewLines: true, + columnSplitter: ' | ', + headingTransform: (text) => { + const repeat = () => '-'.repeat(maxColumnSize[text]) + return `${text.toUpperCase()}\n${repeat()}` + } + })) +} \ No newline at end of file -- cgit v1.2.3