summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-07 19:41:43 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-07 19:41:43 +0300
commitdb7e1be9b58ba92556d579cd4b814ae083602bc9 (patch)
tree36b8a67e7f0c87286616b61d6794ff6e58726f8e /src
parente19982e9736cebb3f52a146fbc5f0579b70827e9 (diff)
jobctl
Diffstat (limited to 'src')
-rwxr-xr-xsrc/jobctl.js430
-rwxr-xr-xsrc/jobd-master.js18
-rwxr-xr-xsrc/jobd.js69
-rw-r--r--src/lib/config.js51
-rw-r--r--src/lib/logger.js20
-rw-r--r--src/lib/request-handler.js2
-rw-r--r--src/lib/server.js30
7 files changed, 576 insertions, 44 deletions
diff --git a/src/jobctl.js b/src/jobctl.js
new file mode 100755
index 0000000..2a1fa30
--- /dev/null
+++ b/src/jobctl.js
@@ -0,0 +1,430 @@
+#!/usr/bin/env node
+const minimist = require('minimist')
+const loggerModule = require('./lib/logger')
+const config = require('./lib/config')
+const package_json = require('../package.json')
+const os = require('os')
+const path = require('path')
+const fs = require('fs/promises')
+const {Connection, RequestMessage} = require('./lib/server')
+const {isNumeric} = require('./lib/util')
+const columnify = require('columnify')
+
+const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf')
+
+const WORKER_COMMANDS = {
+ 'list-targets': workerListTargets,
+ 'memory-usage': workerMemoryUsage,
+ 'poll': workerPoll,
+ 'set-target-concurrency': workerSetTargetConcurrency,
+ 'pause': workerPause,
+ 'continue': workerContinue
+}
+
+const MASTER_COMMANDS = {
+ 'list-workers': masterListWorkers,
+ // 'list-workers-memory-usage': masterListWorkersMemoryUsage,
+ 'memory-usage': masterMemoryUsage,
+ 'poke': masterPoke,
+
+ // we can just reuse worker functions here, as they do the same
+ 'pause': workerPause,
+ 'continue': workerContinue,
+}
+
+/**
+ * @type {Logger}
+ */
+let logger
+
+/**
+ * @type {Connection}
+ */
+let connection
+
+
+main().catch(e => {
+ console.error(e)
+ process.exit(1)
+})
+
+
+async function main() {
+ const argv = await initApp('jobctl')
+ if (!argv.length)
+ usage()
+
+ const isMaster = config.get('master')
+
+ logger.info('Working mode: ' + (isMaster ? 'master' : 'worker'))
+ logger.trace('Command arguments: ', argv)
+
+ let availableCommands = isMaster ? MASTER_COMMANDS : WORKER_COMMANDS
+ let command = argv.shift()
+ if (!(command in availableCommands)) {
+ logger.error(`Unsupported command: '${command}'`)
+ process.exit(1)
+ }
+
+ let host = config.get('host')
+ let port = config.get('port')
+
+ // connect to instance
+ try {
+ connection = new Connection()
+ await connection.connect(host, port)
+
+ logger.info('Successfully connected.')
+ } catch (error) {
+ logger.error('Connection failure:', error)
+ process.exit(1)
+ }
+
+ try {
+ await availableCommands[command](argv)
+ } catch (e) {
+ logger.error(e.message)
+ }
+
+ connection.close()
+
+ // initWorker()
+ // initRequestHandler()
+ // initServer()
+ // connectToMaster()
+}
+
+async function initApp(appName) {
+ if (process.argv.length < 3)
+ usage()
+
+ process.on('SIGINT', term)
+ process.on('SIGTERM', term)
+
+ const argv = minimist(process.argv.slice(2), {
+ boolean: ['master', 'version', 'help'],
+ string: ['host', 'port', 'config', 'log-level'],
+ stopEarly: true,
+ default: {
+ config: DEFAULT_CONFIG_PATH
+ }
+ })
+
+ if (argv.help)
+ usage()
+
+ if (argv.version) {
+ console.log(package_json.version)
+ process.exit(0)
+ }
+
+ // read config
+ if (await exists(argv.config)) {
+ try {
+ config.parseJobctlConfig(argv.config, {
+ master: argv.master,
+ log_level: argv['log-level'],
+ host: argv.host,
+ port: parseInt(argv.port, 10),
+ })
+ } catch (e) {
+ console.error(`config parsing error: ${e.message}`)
+ process.exit(1)
+ }
+ }
+
+ // init logger
+ await loggerModule.init({
+ levelConsole: config.get('log_level'),
+ disableTimestamps: true
+ })
+ logger = loggerModule.getLogger(appName)
+
+ process.title = appName
+
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /* * * * * */
+ /* */
+ /* ^_^ */
+ /* */
+ /* '_' */
+ /* */
+ /* <_< */
+ /* */
+ /* >_> */
+ /* */
+ /* * * * * */
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+
+ return argv['_'] || []
+}
+
+async function workerListTargets() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const rows = []
+ const columns = [
+ 'target',
+ 'concurrency',
+ 'length',
+ 'paused'
+ ]
+ for (const target in response.data.targets) {
+ const row = [
+ target,
+ response.data.targets[target].concurrency,
+ response.data.targets[target].length,
+ response.data.targets[target].paused ? 'yes' : 'no'
+ ]
+ rows.push(row)
+ }
+
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+async function workerMemoryUsage() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const columns = ['what', 'value']
+ const rows = []
+ for (const what in response.data.memoryUsage)
+ rows.push([what, response.data.memoryUsage[what]])
+ rows.push(['pendingJobPromises', response.data.jobPromisesCount])
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+async function workerPoll(argv) {
+ return await sendCommandForTargets(argv, 'poll')
+}
+
+async function workerPause(argv) {
+ return await sendCommandForTargets(argv, 'pause')
+}
+
+async function workerContinue(argv) {
+ return await sendCommandForTargets(argv, 'continue')
+}
+
+async function workerSetTargetConcurrency(argv) {
+ if (argv.length !== 2)
+ throw new Error('Invalid number of arguments.')
+
+ let [target, concurrency] = argv
+ if (!isNumeric(concurrency))
+ throw new Error(`'concurrency' must be a number.`)
+
+ concurrency = parseInt(concurrency, 10)
+
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage('set-target-concurrency', {
+ target, concurrency
+ })
+ )
+
+ if (response.error)
+ throw new Error(`Worker error: ${response.error}`)
+
+ console.log(response.data)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+async function masterPoke(argv) {
+ return await sendCommandForTargets(argv, 'poke')
+}
+
+async function masterMemoryUsage() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const columns = ['what', 'value']
+ const rows = []
+ for (const what in response.data.memoryUsage)
+ rows.push([what, response.data.memoryUsage[what]])
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+async function masterListWorkers() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true}))
+ const columns = ['worker', 'targets', 'concurrency', 'length', 'paused']
+ const rows = []
+ for (const worker of response.data.workers) {
+ let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}`
+ let targets = Object.keys(worker.workerStatus.targets)
+ let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency)
+ let lengths = targets.map(t => worker.workerStatus.targets[t].length)
+ let pauses = targets.map(t => worker.workerStatus.targets[t].paused ? 'yes' : 'no')
+ rows.push([
+ remoteAddr,
+ targets.join("\n"),
+ concurrencies.join("\n"),
+ lengths.join("\n"),
+ pauses.join("\n")
+ ])
+ }
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+async function sendCommandForTargets(targets, command) {
+ if (!targets.length)
+ throw new Error('No targets specified.')
+
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage(command, {targets})
+ )
+
+ if (response.error)
+ throw new Error(`Worker error: ${response.error}`)
+
+ console.log(response.data)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+}
+
+
+function usage(exitCode = 0) {
+ let s = `${process.argv[1]} [OPTIONS] COMMAND
+
+Worker commands:
+ list-targets Print list of targets, their length and inner state.
+ memory-usage Print info about memory usage of the worker.
+ poll <...TARGETS> Ask worker to get tasks for specified targets.
+
+ Example:
+ $ jobctl poke t1 t2 t3
+
+ set-target-concurrency <target> <concurrency>
+ Set concurrency of the target.
+
+ pause <...TARGETS> Pause specified or all targets.
+ continue <...TARGETS> Pause specified or all targets.
+
+Master commands:
+ list-workers Print list of connected workers and their state.
+ memory-usage Print info about memory usage.
+ poke <...TARGETS> Poke specified targets.
+ pause <...TARGETS> Send pause() to all workers serving specified targets.
+ If no targets specified, just sends pause() to all
+ connected workers.
+ continue <...TARGETS> Send continue() to all workers serving specified
+ targets. If no targets specified, just sends pause()
+ to all connected workers.
+
+Options:
+ --master Connect to jobd-master instance.
+ --host Address of jobd or jobd-master instance.
+ --port Port. Default: 7080 when --master is not used,
+ 7081 otherwise.
+ --config <path> Path to config. Default: ~/.jobctl.conf
+ Required for connecting to password-protected
+ instances.
+ --log-level <level> 'error', 'warn', 'info', 'debug' or 'trace'.
+ Default: warn
+ --help: Show this help.
+ --version: Print version.
+
+Configuration file
+ Config file is required for connecting to password-protected jobd instances.
+ It can also be used to store hostname, port and log level.
+
+ Here's an example of possible ~/.jobctl.conf file:
+
+ ;password =
+ hostname = 1.2.3.4
+ port = 7080
+ log_level = warn
+ master = true
+`
+
+ console.log(s)
+ process.exit(exitCode)
+}
+
+function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ loggerModule.shutdown(function() {
+ process.exit()
+ })
+}
+
+async function exists(file) {
+ let exists
+ try {
+ await fs.stat(file)
+ exists = true
+ } catch (error) {
+ exists = false
+ }
+ return exists
+}
+
+function table(columns, rows) {
+ const maxColumnSize = {}
+ for (const c of columns)
+ maxColumnSize[c] = c.length
+
+ rows = rows.map(values => {
+ if (!Array.isArray(values))
+ throw new Error('row must be array, got', values)
+
+ let row = {}
+ for (let i = 0; i < columns.length; i++) {
+ let value = String(values[i])
+ row[columns[i]] = value
+
+ let width
+ if (value.indexOf('\n') !== -1) {
+ width = Math.max(...value.split('\n').map(s => s.length))
+ } else {
+ width = value.length
+ }
+
+ if (width > maxColumnSize[columns[i]])
+ maxColumnSize[columns[i]] = width
+ }
+
+ return row
+ })
+
+ console.log(columnify(rows, {
+ columns,
+ preserveNewLines: true,
+ columnSplitter: ' | ',
+ headingTransform: (text) => {
+ const repeat = () => '-'.repeat(maxColumnSize[text])
+ return `${text.toUpperCase()}\n${repeat()}`
+ }
+ }))
+} \ No newline at end of file
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 5839b1a..c926dc0 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -8,6 +8,8 @@ const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-va
const RequestHandler = require('./lib/request-handler')
const package_json = require('../package.json')
+const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf"
+
/**
* @type {Logger}
*/
@@ -51,7 +53,12 @@ async function initApp(appName) {
process.on('SIGINT', term)
process.on('SIGTERM', term)
- const argv = minimist(process.argv.slice(2))
+ const argv = minimist(process.argv.slice(2), {
+ boolean: ['help', 'version'],
+ default: {
+ config: DEFAULT_CONFIG_PATH
+ }
+ })
if (argv.help) {
usage()
@@ -63,9 +70,6 @@ async function initApp(appName) {
process.exit(0)
}
- if (!argv.config)
- throw new Error('--config option is required')
-
// read config
try {
config.parseMasterConfig(argv.config)
@@ -295,9 +299,9 @@ function usage() {
let s = `${process.argv[1]} OPTIONS
Options:
- --config <path>
- --help
- --version`
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
console.log(s)
}
diff --git a/src/jobd.js b/src/jobd.js
index de8807f..0d8af32 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -17,11 +17,12 @@ const {
Worker,
STATUS_MANUAL,
JOB_NOTFOUND,
- JOB_ACCEPTED,
JOB_IGNORED
} = require('./lib/worker')
const package_json = require('../package.json')
+const DEFAULT_CONFIG_PATH = "/etc/jobd.conf"
+
/**
* @type {Worker}
*/
@@ -72,7 +73,12 @@ async function initApp(appName) {
process.on('SIGINT', term)
process.on('SIGTERM', term)
- const argv = minimist(process.argv.slice(2))
+ const argv = minimist(process.argv.slice(2), {
+ boolean: ['help', 'version'],
+ default: {
+ config: DEFAULT_CONFIG_PATH
+ }
+ })
if (argv.help) {
usage()
@@ -84,9 +90,6 @@ async function initApp(appName) {
process.exit(0)
}
- if (!argv.config)
- throw new Error('--config option is required')
-
// read config
try {
config.parseWorkerConfig(argv.config)
@@ -337,6 +340,9 @@ function onSetTargetConcurrency(data, requestNo, connection) {
['concurrency', 'i', true],
['target', 's', true],
])
+
+ if (data.concurrency <= 0)
+ throw new Error('Invalid concurrency value.')
} catch (e) {
connection.send(
new ResponseMessage(requestNo)
@@ -395,42 +401,49 @@ function connectToMaster() {
return
}
- const connection = new Connection()
- connection.connect(host, port)
+ async function connect() {
+ const connection = new Connection()
+ await connection.connect(host, port)
- connection.on('connect', function() {
- connection.sendRequest(
- new RequestMessage('register-worker', {
- targets: worker.getTargets()
- })
- )
- .then(response => {
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage('register-worker', {
+ targets: worker.getTargets()
+ })
+ )
logger.debug('connectToMaster: response:', response)
- })
- .catch(error => {
+ } catch (error) {
logger.error('connectToMaster: error while awaiting response:', error)
+ }
+
+ connection.on('close', () => {
+ logger.warn(`connectToMaster: connection closed`)
+ tryToConnect()
})
- })
- connection.on('close', () => {
- logger.warn(`connectToMaster: connection closed`)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
+ }
+
+ function tryToConnect(now = false) {
setTimeout(() => {
- connectToMaster()
- }, config.get('master_reconnect_timeout') * 1000)
- })
+ connect().catch(error => {
+ logger.warn(`connectToMaster: connection failed`, error)
+ })
+ }, now ? 0 : config.get('master_reconnect_timeout') * 1000)
+ }
- connection.on('request-message', (message, connection) => {
- requestHandler.process(message, connection)
- })
+ tryToConnect(true)
}
function usage() {
let s = `${process.argv[1]} OPTIONS
Options:
- --config <path>
- --help
- --version`
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
console.log(s)
}
diff --git a/src/lib/config.js b/src/lib/config.js
index ac711fd..73a6226 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -38,7 +38,15 @@ function processScheme(source, scheme) {
case 'object':
if (typeof value !== 'object')
throw new Error(`'${key}' must be an object`)
+ break
+ case 'boolean':
+ if (value !== null) {
+ value = value.trim()
+ value = ['true', '1'].includes(value)
+ } else {
+ value = false
+ }
break
}
@@ -115,6 +123,40 @@ function parseMasterConfig(file) {
}
/**
+ * @param {string} file
+ * @param {{
+ * master: boolean,
+ * log_level: string|undefined,
+ * host: string,
+ * port: int,
+ * }} inputOptions
+ */
+function parseJobctlConfig(file, inputOptions) {
+ config = {}
+ const raw = readFile(file)
+
+ Object.assign(config, processScheme(raw, {
+ master: {type: 'boolean'},
+ password: {},
+ log_level: {default: 'warn'},
+ }))
+
+ if (inputOptions.master)
+ config.master = inputOptions.master
+ Object.assign(config, processScheme(raw, {
+ host: {default: '127.0.0.1'},
+ port: {default: config.master ? 7081 : 7080, type: 'int'}
+ }))
+
+ for (let key of ['log_level', 'host', 'port']) {
+ if (inputOptions[key])
+ config[key] = inputOptions[key]
+ }
+
+ // console.log('parseJobctlConfig [2]', config)
+}
+
+/**
* @param {string|null} key
* @return {string|number|object}
*/
@@ -131,8 +173,17 @@ function get(key = null) {
return config[key]
}
+/**
+ * @param {object} opts
+ */
+// function set(opts) {
+// Object.assign(config, opts)
+// }
+
module.exports = {
parseWorkerConfig,
parseMasterConfig,
+ parseJobctlConfig,
get,
+ // set,
} \ No newline at end of file
diff --git a/src/lib/logger.js b/src/lib/logger.js
index 54c9d54..8a44e07 100644
--- a/src/lib/logger.js
+++ b/src/lib/logger.js
@@ -3,13 +3,16 @@ const fs = require('fs/promises')
const fsConstants = require('fs').constants
const util = require('./util')
+const ALLOWED_LEVELS = ['trace', 'debug', 'info', 'warn', 'error']
+
module.exports = {
/**
* @param {string} file
* @param {string} levelFile
* @param {string} levelConsole
+ * @param {boolean} disableTimestamps
*/
- async init({file, levelFile, levelConsole}) {
+ async init({file, levelFile, levelConsole, disableTimestamps=false}) {
const categories = {
default: {
appenders: ['stdout-filter'],
@@ -17,19 +20,30 @@ module.exports = {
}
}
+ if (!ALLOWED_LEVELS.includes(levelConsole))
+ throw new Error(`Level ${levelConsole} is not allowed.`)
+
const appenders = {
stdout: {
type: 'stdout',
- level: 'trace'
+ level: 'trace',
},
'stdout-filter': {
type: 'logLevelFilter',
appender: 'stdout',
- level: levelConsole
+ level: levelConsole,
}
}
+ if (disableTimestamps)
+ appenders.stdout.layout = {
+ type: 'pattern',
+ pattern: '%[%p [%c]%] %m',
+ }
if (file) {
+ if (!ALLOWED_LEVELS.includes(levelFile))
+ throw new Error(`Level ${levelFile} is not allowed.`)
+
let exists
try {
await fs.stat(file)
diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js
index 4ab06fb..4330b6b 100644
--- a/src/lib/request-handler.js
+++ b/src/lib/request-handler.js
@@ -1,5 +1,5 @@
const {getLogger} = require('./logger')
-const {ResponseMessage} = require('./server')
+const {ResponseMessage, Connection} = require('./server')
class RequestHandler {
diff --git a/src/lib/server.js b/src/lib/server.js
index 618ca8c..051b8be 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -285,12 +285,19 @@ class Connection extends EventEmitter {
*/
this._requestPromises = {}
+ /**
+ * @type {Promise}
+ * @private
+ */
+ this._connectPromise = null
+
this._setLogger()
}
/**
* @param {string} host
* @param {number} port
+ * @return {Promise}
*/
connect(host, port) {
if (this.socket !== null)
@@ -298,14 +305,18 @@ class Connection extends EventEmitter {
this._isOutgoing = true
+ this.logger.trace(`Connecting to ${host}:${port}`)
+
this.socket = new net.Socket()
- this.socket.connect({host, port})
+ this.socket.connect(port, host)
this.remoteAddress = host
this.remotePort = port
this._setLogger()
this._setSocketEvents()
+
+ return this._connectPromise = createCallablePromise()
}
/**
@@ -616,14 +627,19 @@ class Connection extends EventEmitter {
}
for (const no in this._requestPromises) {
- this._requestPromises[no].reject(new Error('socket is closed'))
+ this._requestPromises[no].reject(new Error('Socket is closed'))
}
this._requestPromises = {}
}
onConnect = () => {
- this.logger.debug('connection established')
+ if (this._connectPromise) {
+ this._connectPromise.resolve()
+ this._connectPromise = null
+ }
+
+ this.logger.debug('Connection established.')
this.emit('connect')
}
@@ -642,12 +658,16 @@ class Connection extends EventEmitter {
onClose = (hadError) => {
this._handleClose()
- this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
+ this.logger.debug(`Socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
+ if (this._connectPromise) {
+ this._connectPromise.reject(error)
+ this._connectPromise = null
+ }
this._handleClose()
- this.logger.warn(`socket error:`, error)
+ this.logger.warn(`Socket error:`, error)
}
}