aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-09 02:44:59 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-09 02:44:59 +0300
commitf7ca888651e737f18a0ea760326922da9d80e471 (patch)
treef61463affaf09ede1d6c720e618453775dbe065e
parentfc004fe70a89c43068b2a455c02f7a813adcc509 (diff)
improvements, see full commit message
- added new option 'always_allow_localhost' - improve jobctl, allow connecting to password-protected instances without using configuration file
-rw-r--r--jobd-master.conf.example1
-rw-r--r--jobd.conf.example1
-rwxr-xr-xsrc/jobctl.js121
-rw-r--r--src/lib/config.js105
-rw-r--r--src/lib/server.js14
5 files changed, 131 insertions, 111 deletions
diff --git a/jobd-master.conf.example b/jobd-master.conf.example
index e48b3eb..8d6f68a 100644
--- a/jobd-master.conf.example
+++ b/jobd-master.conf.example
@@ -2,6 +2,7 @@
host = 0.0.0.0
port = 7081
;password =
+always_allow_localhost = 0
ping_interval = 30 ; seconds
poke_throttle_interval = 0.5 ; seconds
diff --git a/jobd.conf.example b/jobd.conf.example
index dbb6203..786714d 100644
--- a/jobd.conf.example
+++ b/jobd.conf.example
@@ -2,6 +2,7 @@
host = 0.0.0.0
port = 7080
;password =
+always_allow_localhost = 0
master_host = 127.0.0.1
master_port = 7081
diff --git a/src/jobctl.js b/src/jobctl.js
index 2a1fa30..7948d18 100755
--- a/src/jobctl.js
+++ b/src/jobctl.js
@@ -9,6 +9,7 @@ const fs = require('fs/promises')
const {Connection, RequestMessage} = require('./lib/server')
const {isNumeric} = require('./lib/util')
const columnify = require('columnify')
+const readline = require('readline')
const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf')
@@ -102,7 +103,7 @@ async function initApp(appName) {
process.on('SIGTERM', term)
const argv = minimist(process.argv.slice(2), {
- boolean: ['master', 'version', 'help'],
+ boolean: ['master', 'version', 'help', 'password'],
string: ['host', 'port', 'config', 'log-level'],
stopEarly: true,
default: {
@@ -121,18 +122,32 @@ async function initApp(appName) {
// read config
if (await exists(argv.config)) {
try {
- config.parseJobctlConfig(argv.config, {
- master: argv.master,
- log_level: argv['log-level'],
- host: argv.host,
- port: parseInt(argv.port, 10),
- })
+ config.parseJobctlConfig(argv.config)
} catch (e) {
console.error(`config parsing error: ${e.message}`)
process.exit(1)
}
}
+ if (argv.master || config.get('master') === null)
+ config.set('master', argv.master)
+
+ for (let key of ['log-level', 'host', 'port']) {
+ if (key in argv)
+ config.set(key.replace('-', '_'), argv[key])
+ }
+
+ if (config.get('port') === null)
+ config.set('port', config.get('master') ? 7081 : 7080)
+
+ if (config.get('log_level') === null)
+ config.set('log_level', 'warn')
+
+ if (argv.password) {
+ let password = await question('Enter password: ')
+ config.set('password', password)
+ }
+
// init logger
await loggerModule.init({
levelConsole: config.get('log_level'),
@@ -167,9 +182,22 @@ async function initApp(appName) {
return argv['_'] || []
}
+/**
+ * @param {string} name
+ * @param {null|object} data
+ * @return {Promise<object>}
+ */
+async function request(name, data = null) {
+ let req = new RequestMessage(name, data)
+ let response = await connection.sendRequest(req)
+ if (response.error)
+ throw new Error(`Worker error: ${response.error}`)
+ return response.data
+}
+
async function workerListTargets() {
try {
- let response = await connection.sendRequest(new RequestMessage('status'))
+ let response = await request('status')
const rows = []
const columns = [
'target',
@@ -177,12 +205,12 @@ async function workerListTargets() {
'length',
'paused'
]
- for (const target in response.data.targets) {
+ for (const target in response.targets) {
const row = [
target,
- response.data.targets[target].concurrency,
- response.data.targets[target].length,
- response.data.targets[target].paused ? 'yes' : 'no'
+ response.targets[target].concurrency,
+ response.targets[target].length,
+ response.targets[target].paused ? 'yes' : 'no'
]
rows.push(row)
}
@@ -196,12 +224,12 @@ async function workerListTargets() {
async function workerMemoryUsage() {
try {
- let response = await connection.sendRequest(new RequestMessage('status'))
+ let response = await request('status')
const columns = ['what', 'value']
const rows = []
- for (const what in response.data.memoryUsage)
- rows.push([what, response.data.memoryUsage[what]])
- rows.push(['pendingJobPromises', response.data.jobPromisesCount])
+ for (const what in response.memoryUsage)
+ rows.push([what, response.memoryUsage[what]])
+ rows.push(['pendingJobPromises', response.jobPromisesCount])
table(columns, rows)
} catch (error) {
logger.error(error.message)
@@ -232,16 +260,8 @@ async function workerSetTargetConcurrency(argv) {
concurrency = parseInt(concurrency, 10)
try {
- let response = await connection.sendRequest(
- new RequestMessage('set-target-concurrency', {
- target, concurrency
- })
- )
-
- if (response.error)
- throw new Error(`Worker error: ${response.error}`)
-
- console.log(response.data)
+ let response = await request('set-target-concurrency', {target, concurrency})
+ console.log(response)
} catch (error) {
logger.error(error.message)
logger.trace(error)
@@ -254,11 +274,11 @@ async function masterPoke(argv) {
async function masterMemoryUsage() {
try {
- let response = await connection.sendRequest(new RequestMessage('status'))
+ let response = await request('status')
const columns = ['what', 'value']
const rows = []
- for (const what in response.data.memoryUsage)
- rows.push([what, response.data.memoryUsage[what]])
+ for (const what in response.memoryUsage)
+ rows.push([what, response.memoryUsage[what]])
table(columns, rows)
} catch (error) {
logger.error(error.message)
@@ -268,10 +288,10 @@ async function masterMemoryUsage() {
async function masterListWorkers() {
try {
- let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true}))
+ let response = await request('status', {poll_workers: true})
const columns = ['worker', 'targets', 'concurrency', 'length', 'paused']
const rows = []
- for (const worker of response.data.workers) {
+ for (const worker of response.workers) {
let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}`
let targets = Object.keys(worker.workerStatus.targets)
let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency)
@@ -297,14 +317,8 @@ async function sendCommandForTargets(targets, command) {
throw new Error('No targets specified.')
try {
- let response = await connection.sendRequest(
- new RequestMessage(command, {targets})
- )
-
- if (response.error)
- throw new Error(`Worker error: ${response.error}`)
-
- console.log(response.data)
+ let response = await request(command, {targets})
+ //console.log(response)
} catch (error) {
logger.error(error.message)
logger.trace(error)
@@ -346,18 +360,14 @@ Options:
--port Port. Default: 7080 when --master is not used,
7081 otherwise.
--config <path> Path to config. Default: ~/.jobctl.conf
- Required for connecting to password-protected
- instances.
+ --password Ask for a password before launching a command.
--log-level <level> 'error', 'warn', 'info', 'debug' or 'trace'.
Default: warn
--help: Show this help.
--version: Print version.
-Configuration file
- Config file is required for connecting to password-protected jobd instances.
- It can also be used to store hostname, port and log level.
-
- Here's an example of possible ~/.jobctl.conf file:
+Configuration file
+ Example of possible ~/.jobctl.conf file:
;password =
hostname = 1.2.3.4
@@ -427,4 +437,21 @@ function table(columns, rows) {
return `${text.toUpperCase()}\n${repeat()}`
}
}))
-} \ No newline at end of file
+}
+
+/**
+ * @param prompt
+ * @return {Promise<string>}
+ */
+function question(prompt) {
+ const rl = readline.createInterface({
+ input: process.stdin,
+ output: process.stdout
+ })
+ return new Promise((resolve, reject) => {
+ rl.question(prompt, (answer) => {
+ rl.close()
+ resolve(answer)
+ })
+ })
+}
diff --git a/src/lib/config.js b/src/lib/config.js
index 73a6226..289a79f 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -2,7 +2,7 @@ const fs = require('fs')
const ini = require('ini')
const {isNumeric} = require('./util')
-let config = null
+let config = {}
function readFile(file) {
if (!fs.existsSync(file))
@@ -22,32 +22,34 @@ function processScheme(source, scheme) {
let value = source[key] ?? opts.default ?? null
- switch (opts.type) {
- case 'int':
- if (!isNumeric(value))
- throw new Error(`'${key}' must be an integer`)
- value = parseInt(value, 10)
- break
-
- case 'float':
- if (!isNumeric(value))
- throw new Error(`'${key}' must be a float`)
- value = parseFloat(value)
- break
-
- case 'object':
- if (typeof value !== 'object')
- throw new Error(`'${key}' must be an object`)
- break
-
- case 'boolean':
- if (value !== null) {
- value = value.trim()
- value = ['true', '1'].includes(value)
- } else {
- value = false
- }
- break
+ if (value !== null) {
+ switch (opts.type) {
+ case 'int':
+ if (!isNumeric(value))
+ throw new Error(`'${key}' must be an integer`)
+ value = parseInt(value, 10)
+ break
+
+ case 'float':
+ if (!isNumeric(value))
+ throw new Error(`'${key}' must be a float`)
+ value = parseFloat(value)
+ break
+
+ case 'object':
+ if (typeof value !== 'object')
+ throw new Error(`'${key}' must be an object`)
+ break
+
+ case 'boolean':
+ if (typeof value === 'string') {
+ value = value.trim()
+ value = ['true', '1'].includes(value)
+ } else {
+ value = !!value
+ }
+ break
+ }
}
result[key] = value
@@ -64,6 +66,7 @@ function parseWorkerConfig(file) {
host: {required: true},
port: {required: true, type: 'int'},
password: {},
+ always_allow_localhost: {type: 'boolean', default: false},
master_host: {},
master_port: {type: 'int', default: 0},
@@ -111,6 +114,7 @@ function parseMasterConfig(file) {
host: {required: true},
port: {required: true, type: 'int'},
password: {},
+ always_allow_localhost: {type: 'boolean', default: false},
ping_interval: {default: 30, type: 'int'},
poke_throttle_interval: {default: 0.5, type: 'float'},
@@ -124,14 +128,8 @@ function parseMasterConfig(file) {
/**
* @param {string} file
- * @param {{
- * master: boolean,
- * log_level: string|undefined,
- * host: string,
- * port: int,
- * }} inputOptions
*/
-function parseJobctlConfig(file, inputOptions) {
+function parseJobctlConfig(file) {
config = {}
const raw = readFile(file)
@@ -141,17 +139,17 @@ function parseJobctlConfig(file, inputOptions) {
log_level: {default: 'warn'},
}))
- if (inputOptions.master)
- config.master = inputOptions.master
+ // if (inputOptions.master)
+ // config.master = inputOptions.master
Object.assign(config, processScheme(raw, {
host: {default: '127.0.0.1'},
- port: {default: config.master ? 7081 : 7080, type: 'int'}
+ port: {/*default: config.master ? 7081 : 7080,*/ type: 'int'}
}))
- for (let key of ['log_level', 'host', 'port']) {
- if (inputOptions[key])
- config[key] = inputOptions[key]
- }
+ // for (let key of ['log_level', 'host', 'port']) {
+ // if (inputOptions[key])
+ // config[key] = inputOptions[key]
+ // }
// console.log('parseJobctlConfig [2]', config)
}
@@ -164,26 +162,27 @@ function get(key = null) {
if (key === null)
return config
- if (typeof config !== 'object')
- throw new Error(`config is not loaded`)
-
- if (!(key in config))
- throw new Error(`config: ${key} not found`)
-
- return config[key]
+ return config[key] || null
}
/**
- * @param {object} opts
+ * @param {object|string} key
+ * @param value
*/
-// function set(opts) {
-// Object.assign(config, opts)
-// }
+function set(key, value) {
+ if (!config)
+ config = {}
+ if (typeof key === 'object') {
+ Object.assign(config, key)
+ } else {
+ config[key] = value
+ }
+}
module.exports = {
parseWorkerConfig,
parseMasterConfig,
parseJobctlConfig,
get,
- // set,
+ set,
} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 051b8be..8ed2be9 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -261,13 +261,7 @@ class Connection extends EventEmitter {
* @type {boolean}
* @private
*/
- this._isAuthorized = config.get('password') === ''
-
- /**
- * @type {boolean}
- * @private
- */
- this._textConversationAllowed = false
+ this._isAuthorized = !config.get('password')
/**
* @type {boolean}
@@ -328,10 +322,8 @@ class Connection extends EventEmitter {
this.remoteAddress = socket.remoteAddress
this.remotePort = socket.remotePort
- if (this.remoteAddress === '127.0.0.1') {
+ if (this.remoteAddress === '127.0.0.1' && config.get('always_allow_localhost') === true)
this._isAuthorized = true
- this._textConversationAllowed = true
- }
this._setLogger()
this._setSocketEvents()
@@ -548,7 +540,7 @@ class Connection extends EventEmitter {
// send password once (when talking to jobd-master)
if (!this._isAuthorized) {
- message.setPassword(config.get('password'))
+ message.setPassword(config.get('password') || '')
this._isAuthorized = true
}