path: root/src/jobctl.js
diff options
Diffstat (limited to 'src/jobctl.js')
1 files changed, 430 insertions, 0 deletions
diff --git a/src/jobctl.js b/src/jobctl.js
new file mode 100755
index 0000000..2a1fa30
--- /dev/null
+++ b/src/jobctl.js
@@ -0,0 +1,430 @@
+#!/usr/bin/env node
+const minimist = require('minimist')
+const loggerModule = require('./lib/logger')
+const config = require('./lib/config')
+const package_json = require('../package.json')
+const os = require('os')
+const path = require('path')
+const fs = require('fs/promises')
+const {Connection, RequestMessage} = require('./lib/server')
+const {isNumeric} = require('./lib/util')
+const columnify = require('columnify')
+const DEFAULT_CONFIG_PATH = path.join(os.homedir(), '.jobctl.conf')
+ 'list-targets': workerListTargets,
+ 'memory-usage': workerMemoryUsage,
+ 'poll': workerPoll,
+ 'set-target-concurrency': workerSetTargetConcurrency,
+ 'pause': workerPause,
+ 'continue': workerContinue
+ 'list-workers': masterListWorkers,
+ // 'list-workers-memory-usage': masterListWorkersMemoryUsage,
+ 'memory-usage': masterMemoryUsage,
+ 'poke': masterPoke,
+ // we can just reuse worker functions here, as they do the same
+ 'pause': workerPause,
+ 'continue': workerContinue,
+ * @type {Logger}
+ */
+let logger
+ * @type {Connection}
+ */
+let connection
+main().catch(e => {
+ console.error(e)
+ process.exit(1)
+async function main() {
+ const argv = await initApp('jobctl')
+ if (!argv.length)
+ usage()
+ const isMaster = config.get('master')
+ logger.info('Working mode: ' + (isMaster ? 'master' : 'worker'))
+ logger.trace('Command arguments: ', argv)
+ let availableCommands = isMaster ? MASTER_COMMANDS : WORKER_COMMANDS
+ let command = argv.shift()
+ if (!(command in availableCommands)) {
+ logger.error(`Unsupported command: '${command}'`)
+ process.exit(1)
+ }
+ let host = config.get('host')
+ let port = config.get('port')
+ // connect to instance
+ try {
+ connection = new Connection()
+ await connection.connect(host, port)
+ logger.info('Successfully connected.')
+ } catch (error) {
+ logger.error('Connection failure:', error)
+ process.exit(1)
+ }
+ try {
+ await availableCommands[command](argv)
+ } catch (e) {
+ logger.error(e.message)
+ }
+ connection.close()
+ // initWorker()
+ // initRequestHandler()
+ // initServer()
+ // connectToMaster()
+async function initApp(appName) {
+ if (process.argv.length < 3)
+ usage()
+ process.on('SIGINT', term)
+ process.on('SIGTERM', term)
+ const argv = minimist(process.argv.slice(2), {
+ boolean: ['master', 'version', 'help'],
+ string: ['host', 'port', 'config', 'log-level'],
+ stopEarly: true,
+ default: {
+ }
+ })
+ if (argv.help)
+ usage()
+ if (argv.version) {
+ console.log(package_json.version)
+ process.exit(0)
+ }
+ // read config
+ if (await exists(argv.config)) {
+ try {
+ config.parseJobctlConfig(argv.config, {
+ master: argv.master,
+ log_level: argv['log-level'],
+ host: argv.host,
+ port: parseInt(argv.port, 10),
+ })
+ } catch (e) {
+ console.error(`config parsing error: ${e.message}`)
+ process.exit(1)
+ }
+ }
+ // init logger
+ await loggerModule.init({
+ levelConsole: config.get('log_level'),
+ disableTimestamps: true
+ })
+ logger = loggerModule.getLogger(appName)
+ process.title = appName
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /* * * * * */
+ /* */
+ /* ^_^ */
+ /* */
+ /* '_' */
+ /* */
+ /* <_< */
+ /* */
+ /* >_> */
+ /* */
+ /* * * * * */
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ /// \\\ \\\
+ /// /// ///
+ return argv['_'] || []
+async function workerListTargets() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const rows = []
+ const columns = [
+ 'target',
+ 'concurrency',
+ 'length',
+ 'paused'
+ ]
+ for (const target in response.data.targets) {
+ const row = [
+ target,
+ response.data.targets[target].concurrency,
+ response.data.targets[target].length,
+ response.data.targets[target].paused ? 'yes' : 'no'
+ ]
+ rows.push(row)
+ }
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+async function workerMemoryUsage() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const columns = ['what', 'value']
+ const rows = []
+ for (const what in response.data.memoryUsage)
+ rows.push([what, response.data.memoryUsage[what]])
+ rows.push(['pendingJobPromises', response.data.jobPromisesCount])
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+async function workerPoll(argv) {
+ return await sendCommandForTargets(argv, 'poll')
+async function workerPause(argv) {
+ return await sendCommandForTargets(argv, 'pause')
+async function workerContinue(argv) {
+ return await sendCommandForTargets(argv, 'continue')
+async function workerSetTargetConcurrency(argv) {
+ if (argv.length !== 2)
+ throw new Error('Invalid number of arguments.')
+ let [target, concurrency] = argv
+ if (!isNumeric(concurrency))
+ throw new Error(`'concurrency' must be a number.`)
+ concurrency = parseInt(concurrency, 10)
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage('set-target-concurrency', {
+ target, concurrency
+ })
+ )
+ if (response.error)
+ throw new Error(`Worker error: ${response.error}`)
+ console.log(response.data)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+async function masterPoke(argv) {
+ return await sendCommandForTargets(argv, 'poke')
+async function masterMemoryUsage() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status'))
+ const columns = ['what', 'value']
+ const rows = []
+ for (const what in response.data.memoryUsage)
+ rows.push([what, response.data.memoryUsage[what]])
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+async function masterListWorkers() {
+ try {
+ let response = await connection.sendRequest(new RequestMessage('status', {poll_workers: true}))
+ const columns = ['worker', 'targets', 'concurrency', 'length', 'paused']
+ const rows = []
+ for (const worker of response.data.workers) {
+ let remoteAddr = `${worker.remoteAddr}:${worker.remotePort}`
+ let targets = Object.keys(worker.workerStatus.targets)
+ let concurrencies = targets.map(t => worker.workerStatus.targets[t].concurrency)
+ let lengths = targets.map(t => worker.workerStatus.targets[t].length)
+ let pauses = targets.map(t => worker.workerStatus.targets[t].paused ? 'yes' : 'no')
+ rows.push([
+ remoteAddr,
+ targets.join("\n"),
+ concurrencies.join("\n"),
+ lengths.join("\n"),
+ pauses.join("\n")
+ ])
+ }
+ table(columns, rows)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+async function sendCommandForTargets(targets, command) {
+ if (!targets.length)
+ throw new Error('No targets specified.')
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage(command, {targets})
+ )
+ if (response.error)
+ throw new Error(`Worker error: ${response.error}`)
+ console.log(response.data)
+ } catch (error) {
+ logger.error(error.message)
+ logger.trace(error)
+ }
+function usage(exitCode = 0) {
+ let s = `${process.argv[1]} [OPTIONS] COMMAND
+Worker commands:
+ list-targets Print list of targets, their length and inner state.
+ memory-usage Print info about memory usage of the worker.
+ poll <...TARGETS> Ask worker to get tasks for specified targets.
+ Example:
+ $ jobctl poke t1 t2 t3
+ set-target-concurrency <target> <concurrency>
+ Set concurrency of the target.
+ pause <...TARGETS> Pause specified or all targets.
+ continue <...TARGETS> Pause specified or all targets.
+Master commands:
+ list-workers Print list of connected workers and their state.
+ memory-usage Print info about memory usage.
+ poke <...TARGETS> Poke specified targets.
+ pause <...TARGETS> Send pause() to all workers serving specified targets.
+ If no targets specified, just sends pause() to all
+ connected workers.
+ continue <...TARGETS> Send continue() to all workers serving specified
+ targets. If no targets specified, just sends pause()
+ to all connected workers.
+ --master Connect to jobd-master instance.
+ --host Address of jobd or jobd-master instance.
+ --port Port. Default: 7080 when --master is not used,
+ 7081 otherwise.
+ --config <path> Path to config. Default: ~/.jobctl.conf
+ Required for connecting to password-protected
+ instances.
+ --log-level <level> 'error', 'warn', 'info', 'debug' or 'trace'.
+ Default: warn
+ --help: Show this help.
+ --version: Print version.
+Configuration file
+ Config file is required for connecting to password-protected jobd instances.
+ It can also be used to store hostname, port and log level.
+ Here's an example of possible ~/.jobctl.conf file:
+ ;password =
+ hostname =
+ port = 7080
+ log_level = warn
+ master = true
+ console.log(s)
+ process.exit(exitCode)
+function term() {
+ if (logger)
+ logger.info('shutdown')
+ loggerModule.shutdown(function() {
+ process.exit()
+ })
+async function exists(file) {
+ let exists
+ try {
+ await fs.stat(file)
+ exists = true
+ } catch (error) {
+ exists = false
+ }
+ return exists
+function table(columns, rows) {
+ const maxColumnSize = {}
+ for (const c of columns)
+ maxColumnSize[c] = c.length
+ rows = rows.map(values => {
+ if (!Array.isArray(values))
+ throw new Error('row must be array, got', values)
+ let row = {}
+ for (let i = 0; i < columns.length; i++) {
+ let value = String(values[i])
+ row[columns[i]] = value
+ let width
+ if (value.indexOf('\n') !== -1) {
+ width = Math.max(...value.split('\n').map(s => s.length))
+ } else {
+ width = value.length
+ }
+ if (width > maxColumnSize[columns[i]])
+ maxColumnSize[columns[i]] = width
+ }
+ return row
+ })
+ console.log(columnify(rows, {
+ columns,
+ preserveNewLines: true,
+ columnSplitter: ' | ',
+ headingTransform: (text) => {
+ const repeat = () => '-'.repeat(maxColumnSize[text])
+ return `${text.toUpperCase()}\n${repeat()}`
+ }
+ }))
+} \ No newline at end of file