summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-07 19:41:43 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-07 19:41:43 +0300
commitdb7e1be9b58ba92556d579cd4b814ae083602bc9 (patch)
tree36b8a67e7f0c87286616b61d6794ff6e58726f8e /src/lib
parente19982e9736cebb3f52a146fbc5f0579b70827e9 (diff)
jobctl
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/config.js51
-rw-r--r--src/lib/logger.js20
-rw-r--r--src/lib/request-handler.js2
-rw-r--r--src/lib/server.js30
4 files changed, 94 insertions, 9 deletions
diff --git a/src/lib/config.js b/src/lib/config.js
index ac711fd..73a6226 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -38,7 +38,15 @@ function processScheme(source, scheme) {
case 'object':
if (typeof value !== 'object')
throw new Error(`'${key}' must be an object`)
+ break
+ case 'boolean':
+ if (value !== null) {
+ value = value.trim()
+ value = ['true', '1'].includes(value)
+ } else {
+ value = false
+ }
break
}
@@ -115,6 +123,40 @@ function parseMasterConfig(file) {
}
/**
+ * @param {string} file
+ * @param {{
+ * master: boolean,
+ * log_level: string|undefined,
+ * host: string,
+ * port: int,
+ * }} inputOptions
+ */
+function parseJobctlConfig(file, inputOptions) {
+ config = {}
+ const raw = readFile(file)
+
+ Object.assign(config, processScheme(raw, {
+ master: {type: 'boolean'},
+ password: {},
+ log_level: {default: 'warn'},
+ }))
+
+ if (inputOptions.master)
+ config.master = inputOptions.master
+ Object.assign(config, processScheme(raw, {
+ host: {default: '127.0.0.1'},
+ port: {default: config.master ? 7081 : 7080, type: 'int'}
+ }))
+
+ for (let key of ['log_level', 'host', 'port']) {
+ if (inputOptions[key])
+ config[key] = inputOptions[key]
+ }
+
+ // console.log('parseJobctlConfig [2]', config)
+}
+
+/**
* @param {string|null} key
* @return {string|number|object}
*/
@@ -131,8 +173,17 @@ function get(key = null) {
return config[key]
}
+/**
+ * @param {object} opts
+ */
+// function set(opts) {
+// Object.assign(config, opts)
+// }
+
module.exports = {
parseWorkerConfig,
parseMasterConfig,
+ parseJobctlConfig,
get,
+ // set,
} \ No newline at end of file
diff --git a/src/lib/logger.js b/src/lib/logger.js
index 54c9d54..8a44e07 100644
--- a/src/lib/logger.js
+++ b/src/lib/logger.js
@@ -3,13 +3,16 @@ const fs = require('fs/promises')
const fsConstants = require('fs').constants
const util = require('./util')
+const ALLOWED_LEVELS = ['trace', 'debug', 'info', 'warn', 'error']
+
module.exports = {
/**
* @param {string} file
* @param {string} levelFile
* @param {string} levelConsole
+ * @param {boolean} disableTimestamps
*/
- async init({file, levelFile, levelConsole}) {
+ async init({file, levelFile, levelConsole, disableTimestamps=false}) {
const categories = {
default: {
appenders: ['stdout-filter'],
@@ -17,19 +20,30 @@ module.exports = {
}
}
+ if (!ALLOWED_LEVELS.includes(levelConsole))
+ throw new Error(`Level ${levelConsole} is not allowed.`)
+
const appenders = {
stdout: {
type: 'stdout',
- level: 'trace'
+ level: 'trace',
},
'stdout-filter': {
type: 'logLevelFilter',
appender: 'stdout',
- level: levelConsole
+ level: levelConsole,
}
}
+ if (disableTimestamps)
+ appenders.stdout.layout = {
+ type: 'pattern',
+ pattern: '%[%p [%c]%] %m',
+ }
if (file) {
+ if (!ALLOWED_LEVELS.includes(levelFile))
+ throw new Error(`Level ${levelFile} is not allowed.`)
+
let exists
try {
await fs.stat(file)
diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js
index 4ab06fb..4330b6b 100644
--- a/src/lib/request-handler.js
+++ b/src/lib/request-handler.js
@@ -1,5 +1,5 @@
const {getLogger} = require('./logger')
-const {ResponseMessage} = require('./server')
+const {ResponseMessage, Connection} = require('./server')
class RequestHandler {
diff --git a/src/lib/server.js b/src/lib/server.js
index 618ca8c..051b8be 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -285,12 +285,19 @@ class Connection extends EventEmitter {
*/
this._requestPromises = {}
+ /**
+ * @type {Promise}
+ * @private
+ */
+ this._connectPromise = null
+
this._setLogger()
}
/**
* @param {string} host
* @param {number} port
+ * @return {Promise}
*/
connect(host, port) {
if (this.socket !== null)
@@ -298,14 +305,18 @@ class Connection extends EventEmitter {
this._isOutgoing = true
+ this.logger.trace(`Connecting to ${host}:${port}`)
+
this.socket = new net.Socket()
- this.socket.connect({host, port})
+ this.socket.connect(port, host)
this.remoteAddress = host
this.remotePort = port
this._setLogger()
this._setSocketEvents()
+
+ return this._connectPromise = createCallablePromise()
}
/**
@@ -616,14 +627,19 @@ class Connection extends EventEmitter {
}
for (const no in this._requestPromises) {
- this._requestPromises[no].reject(new Error('socket is closed'))
+ this._requestPromises[no].reject(new Error('Socket is closed'))
}
this._requestPromises = {}
}
onConnect = () => {
- this.logger.debug('connection established')
+ if (this._connectPromise) {
+ this._connectPromise.resolve()
+ this._connectPromise = null
+ }
+
+ this.logger.debug('Connection established.')
this.emit('connect')
}
@@ -642,12 +658,16 @@ class Connection extends EventEmitter {
onClose = (hadError) => {
this._handleClose()
- this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
+ this.logger.debug(`Socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
+ if (this._connectPromise) {
+ this._connectPromise.reject(error)
+ this._connectPromise = null
+ }
this._handleClose()
- this.logger.warn(`socket error:`, error)
+ this.logger.warn(`Socket error:`, error)
}
}