From f7ca888651e737f18a0ea760326922da9d80e471 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 9 Mar 2021 02:44:59 +0300 Subject: improvements, see full commit message - added new option 'always_allow_localhost' - improve jobctl, allow connecting to password-protected instances without using configuration file --- jobd-master.conf.example | 1 + jobd.conf.example | 1 + src/jobctl.js | 121 +++++++++++++++++++++++++++++------------------ src/lib/config.js | 105 ++++++++++++++++++++-------------------- src/lib/server.js | 14 ++---- 5 files changed, 131 insertions(+), 111 deletions(-) diff --git a/jobd-master.conf.example b/jobd-master.conf.example index e48b3eb..8d6f68a 100644 --- a/jobd-master.conf.example +++ b/jobd-master.conf.example @@ -2,6 +2,7 @@ host = 0.0.0.0 port = 7081 ;password = +always_allow_localhost = 0 ping_interval = 30 ; seconds poke_throttle_interval = 0.5 ; seconds diff --git a/jobd.conf.example b/jobd.conf.example index dbb6203..786714d 100644 --- a/jobd.conf.example +++ b/jobd.conf.example @@ -2,6 +2,7 @@ host = 0.0.0.0 port = 7080 ;password = +always_allow_localhost = 0 master_host = 127.0.0.1 master_port = 7081 diff --git a/src/jobctl.js b/src/jobctl.js index 2a1fa30..7948d18 100755 --- a/src/jobctl.js +++ b/src/jobctl.js @@ -9,6 +9,7 @@ const fs = require('fs/promises') const {Connection, RequestMessage} = require('./lib/server') const {isNumeric} = require('./lib/util') const columnify = require('columnify') +const readline = require('readline') const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf') @@ -102,7 +103,7 @@ async function initApp(appName) { process.on('SIGTERM', term) const argv = minimist(process.argv.slice(2), { - boolean: ['master', 'version', 'help'], + boolean: ['master', 'version', 'help', 'password'], string: ['host', 'port', 'config', 'log-level'], stopEarly: true, default: { @@ -121,18 +122,32 @@ async function initApp(appName) { // read config if (await exists(argv.config)) { try { - config.parseJobctlConfig(argv.config, { - master: argv.master, - log_level: argv['log-level'], - host: argv.host, - port: parseInt(argv.port, 10), - }) + config.parseJobctlConfig(argv.config) } catch (e) { console.error(`config parsing error: ${e.message}`) process.exit(1) } } + if (argv.master || config.get('master') === null) + config.set('master', argv.master) + + for (let key of ['log-level', 'host', 'port']) { + if (key in argv) + config.set(key.replace('-', '_'), argv[key]) + } + + if (config.get('port') === null) + config.set('port', config.get('master') ? 7081 : 7080) + + if (config.get('log_level') === null) + config.set('log_level', 'warn') + + if (argv.password) { + let password = await question('Enter password: ') + config.set('password', password) + } + // init logger await loggerModule.init({ levelConsole: config.get('log_level'), @@ -167,9 +182,22 @@ async function initApp(appName) { return argv['_'] || [] } +/** + * @param {string} name + * @param {null|object} data + * @return {Promise} + */ +async function request(name, data = null) { + let req = new RequestMessage(name, data) + let response = await connection.sendRequest(req) + if (response.error) + throw new Error(`Worker error: ${response.error}`) + return response.data +} + async function workerListTargets() { try { - let response = await connection.sendRequest(new RequestMessage('status')) + let response = await request('status') const rows = [] const columns = [ 'target', @@ -177,12 +205,12 @@ async function workerListTargets() { 'length', 'paused' ] - for (const target in response.data.targets) { + for (const target in response.targets) { const row = [ target, - response.data.targets[target].concurrency, - response.data.targets[target].length, - response.data.targets[target].paused ? 'yes' : 'no' + response.targets[target].concurrency, + response.targets[target].length, + response.targets[target].paused ? 'yes' : 'no' ] rows.push(row) } @@ -196,12 +224,12 @@ async function workerListTargets() { async function workerMemoryUsage() { try { - let response = await connection.sendRequest(new RequestMessage('status')) + let response = await request('status') const columns = ['what', 'value'] const rows = [] - for (const what in response.data.memoryUsage) - rows.push([what, response.data.memoryUsage[what]]) - rows.push(['pendingJobPromises', response.data.jobPromisesCount]) + for (const what in response.memoryUsage) + rows.push([what, response.memoryUsage[what]]) + rows.push(['pendingJobPromises', response.jobPromisesCount]) table(columns, rows) } catch (error) { logger.error(error.message) @@ -232,16 +260,8 @@ async function workerSetTargetConcurrency(argv) { concurrency = parseInt(concurrency, 10) try { - let response = await connection.sendRequest( - new RequestMessage('set-target-concurrency', { - target, concurrency - }) - ) - - if (response.error) - throw new Error(`Worker error: ${response.error}`) - - console.log(response.data) + let response = await request('set-target-concurrency', {target, concurrency}) + console.log(response) } catch (error) { logger.error(error.message) logger.trace(error) @@ -254,11 +274,11 @@ async function masterPoke(argv) { async function masterMemoryUsage() { try { - let response = await connection.sendRequest(new RequestMessage('status')) + let response = await request('status') const columns = ['what', 'value'] const rows = [] - for (const what in response.data.memoryUsage) - rows.push([what, response.data.memoryUsage[what]]) + for (const what in response.memoryUsage) + rows.push([what, response.memoryUsage[what]]) table(columns, rows) } catch (error) { logger.error(error.message) @@ -268,10 +288,10 @@ async function masterMemoryUsage() { async function masterListWorkers() { try { - let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true})) + let response = await request('status', {poll_workers: true}) const columns = ['worker', 'targets', 'concurrency', 'length', 'paused'] const rows = [] - for (const worker of response.data.workers) { + for (const worker of response.workers) { let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}` let targets = Object.keys(worker.workerStatus.targets) let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency) @@ -297,14 +317,8 @@ async function sendCommandForTargets(targets, command) { throw new Error('No targets specified.') try { - let response = await connection.sendRequest( - new RequestMessage(command, {targets}) - ) - - if (response.error) - throw new Error(`Worker error: ${response.error}`) - - console.log(response.data) + let response = await request(command, {targets}) + //console.log(response) } catch (error) { logger.error(error.message) logger.trace(error) @@ -346,18 +360,14 @@ Options: --port Port. Default: 7080 when --master is not used, 7081 otherwise. --config Path to config. Default: ~/.jobctl.conf - Required for connecting to password-protected - instances. + --password Ask for a password before launching a command. --log-level 'error', 'warn', 'info', 'debug' or 'trace'. Default: warn --help: Show this help. --version: Print version. -Configuration file - Config file is required for connecting to password-protected jobd instances. - It can also be used to store hostname, port and log level. - - Here's an example of possible ~/.jobctl.conf file: +Configuration file + Example of possible ~/.jobctl.conf file: ;password = hostname = 1.2.3.4 @@ -427,4 +437,21 @@ function table(columns, rows) { return `${text.toUpperCase()}\n${repeat()}` } })) -} \ No newline at end of file +} + +/** + * @param prompt + * @return {Promise} + */ +function question(prompt) { + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout + }) + return new Promise((resolve, reject) => { + rl.question(prompt, (answer) => { + rl.close() + resolve(answer) + }) + }) +} diff --git a/src/lib/config.js b/src/lib/config.js index 73a6226..289a79f 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -2,7 +2,7 @@ const fs = require('fs') const ini = require('ini') const {isNumeric} = require('./util') -let config = null +let config = {} function readFile(file) { if (!fs.existsSync(file)) @@ -22,32 +22,34 @@ function processScheme(source, scheme) { let value = source[key] ?? opts.default ?? null - switch (opts.type) { - case 'int': - if (!isNumeric(value)) - throw new Error(`'${key}' must be an integer`) - value = parseInt(value, 10) - break - - case 'float': - if (!isNumeric(value)) - throw new Error(`'${key}' must be a float`) - value = parseFloat(value) - break - - case 'object': - if (typeof value !== 'object') - throw new Error(`'${key}' must be an object`) - break - - case 'boolean': - if (value !== null) { - value = value.trim() - value = ['true', '1'].includes(value) - } else { - value = false - } - break + if (value !== null) { + switch (opts.type) { + case 'int': + if (!isNumeric(value)) + throw new Error(`'${key}' must be an integer`) + value = parseInt(value, 10) + break + + case 'float': + if (!isNumeric(value)) + throw new Error(`'${key}' must be a float`) + value = parseFloat(value) + break + + case 'object': + if (typeof value !== 'object') + throw new Error(`'${key}' must be an object`) + break + + case 'boolean': + if (typeof value === 'string') { + value = value.trim() + value = ['true', '1'].includes(value) + } else { + value = !!value + } + break + } } result[key] = value @@ -64,6 +66,7 @@ function parseWorkerConfig(file) { host: {required: true}, port: {required: true, type: 'int'}, password: {}, + always_allow_localhost: {type: 'boolean', default: false}, master_host: {}, master_port: {type: 'int', default: 0}, @@ -111,6 +114,7 @@ function parseMasterConfig(file) { host: {required: true}, port: {required: true, type: 'int'}, password: {}, + always_allow_localhost: {type: 'boolean', default: false}, ping_interval: {default: 30, type: 'int'}, poke_throttle_interval: {default: 0.5, type: 'float'}, @@ -124,14 +128,8 @@ function parseMasterConfig(file) { /** * @param {string} file - * @param {{ - * master: boolean, - * log_level: string|undefined, - * host: string, - * port: int, - * }} inputOptions */ -function parseJobctlConfig(file, inputOptions) { +function parseJobctlConfig(file) { config = {} const raw = readFile(file) @@ -141,17 +139,17 @@ function parseJobctlConfig(file, inputOptions) { log_level: {default: 'warn'}, })) - if (inputOptions.master) - config.master = inputOptions.master + // if (inputOptions.master) + // config.master = inputOptions.master Object.assign(config, processScheme(raw, { host: {default: '127.0.0.1'}, - port: {default: config.master ? 7081 : 7080, type: 'int'} + port: {/*default: config.master ? 7081 : 7080,*/ type: 'int'} })) - for (let key of ['log_level', 'host', 'port']) { - if (inputOptions[key]) - config[key] = inputOptions[key] - } + // for (let key of ['log_level', 'host', 'port']) { + // if (inputOptions[key]) + // config[key] = inputOptions[key] + // } // console.log('parseJobctlConfig [2]', config) } @@ -164,26 +162,27 @@ function get(key = null) { if (key === null) return config - if (typeof config !== 'object') - throw new Error(`config is not loaded`) - - if (!(key in config)) - throw new Error(`config: ${key} not found`) - - return config[key] + return config[key] || null } /** - * @param {object} opts + * @param {object|string} key + * @param value */ -// function set(opts) { -// Object.assign(config, opts) -// } +function set(key, value) { + if (!config) + config = {} + if (typeof key === 'object') { + Object.assign(config, key) + } else { + config[key] = value + } +} module.exports = { parseWorkerConfig, parseMasterConfig, parseJobctlConfig, get, - // set, + set, } \ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index 051b8be..8ed2be9 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -261,13 +261,7 @@ class Connection extends EventEmitter { * @type {boolean} * @private */ - this._isAuthorized = config.get('password') === '' - - /** - * @type {boolean} - * @private - */ - this._textConversationAllowed = false + this._isAuthorized = !config.get('password') /** * @type {boolean} @@ -328,10 +322,8 @@ class Connection extends EventEmitter { this.remoteAddress = socket.remoteAddress this.remotePort = socket.remotePort - if (this.remoteAddress === '127.0.0.1') { + if (this.remoteAddress === '127.0.0.1' && config.get('always_allow_localhost') === true) this._isAuthorized = true - this._textConversationAllowed = true - } this._setLogger() this._setSocketEvents() @@ -548,7 +540,7 @@ class Connection extends EventEmitter { // send password once (when talking to jobd-master) if (!this._isAuthorized) { - message.setPassword(config.get('password')) + message.setPassword(config.get('password') || '') this._isAuthorized = true } -- cgit v1.2.3