diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-07 19:41:43 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-07 19:41:43 +0300 |
commit | db7e1be9b58ba92556d579cd4b814ae083602bc9 (patch) | |
tree | 36b8a67e7f0c87286616b61d6794ff6e58726f8e /src/lib | |
parent | e19982e9736cebb3f52a146fbc5f0579b70827e9 (diff) |
jobctl
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/config.js | 51 | ||||
-rw-r--r-- | src/lib/logger.js | 20 | ||||
-rw-r--r-- | src/lib/request-handler.js | 2 | ||||
-rw-r--r-- | src/lib/server.js | 30 |
4 files changed, 94 insertions, 9 deletions
diff --git a/src/lib/config.js b/src/lib/config.js index ac711fd..73a6226 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -38,7 +38,15 @@ function processScheme(source, scheme) { case 'object': if (typeof value !== 'object') throw new Error(`'${key}' must be an object`) + break + case 'boolean': + if (value !== null) { + value = value.trim() + value = ['true', '1'].includes(value) + } else { + value = false + } break } @@ -115,6 +123,40 @@ function parseMasterConfig(file) { } /** + * @param {string} file + * @param {{ + * master: boolean, + * log_level: string|undefined, + * host: string, + * port: int, + * }} inputOptions + */ +function parseJobctlConfig(file, inputOptions) { + config = {} + const raw = readFile(file) + + Object.assign(config, processScheme(raw, { + master: {type: 'boolean'}, + password: {}, + log_level: {default: 'warn'}, + })) + + if (inputOptions.master) + config.master = inputOptions.master + Object.assign(config, processScheme(raw, { + host: {default: '127.0.0.1'}, + port: {default: config.master ? 7081 : 7080, type: 'int'} + })) + + for (let key of ['log_level', 'host', 'port']) { + if (inputOptions[key]) + config[key] = inputOptions[key] + } + + // console.log('parseJobctlConfig [2]', config) +} + +/** * @param {string|null} key * @return {string|number|object} */ @@ -131,8 +173,17 @@ function get(key = null) { return config[key] } +/** + * @param {object} opts + */ +// function set(opts) { +// Object.assign(config, opts) +// } + module.exports = { parseWorkerConfig, parseMasterConfig, + parseJobctlConfig, get, + // set, }
\ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js index 54c9d54..8a44e07 100644 --- a/src/lib/logger.js +++ b/src/lib/logger.js @@ -3,13 +3,16 @@ const fs = require('fs/promises') const fsConstants = require('fs').constants const util = require('./util') +const ALLOWED_LEVELS = ['trace', 'debug', 'info', 'warn', 'error'] + module.exports = { /** * @param {string} file * @param {string} levelFile * @param {string} levelConsole + * @param {boolean} disableTimestamps */ - async init({file, levelFile, levelConsole}) { + async init({file, levelFile, levelConsole, disableTimestamps=false}) { const categories = { default: { appenders: ['stdout-filter'], @@ -17,19 +20,30 @@ module.exports = { } } + if (!ALLOWED_LEVELS.includes(levelConsole)) + throw new Error(`Level ${levelConsole} is not allowed.`) + const appenders = { stdout: { type: 'stdout', - level: 'trace' + level: 'trace', }, 'stdout-filter': { type: 'logLevelFilter', appender: 'stdout', - level: levelConsole + level: levelConsole, } } + if (disableTimestamps) + appenders.stdout.layout = { + type: 'pattern', + pattern: '%[%p [%c]%] %m', + } if (file) { + if (!ALLOWED_LEVELS.includes(levelFile)) + throw new Error(`Level ${levelFile} is not allowed.`) + let exists try { await fs.stat(file) diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js index 4ab06fb..4330b6b 100644 --- a/src/lib/request-handler.js +++ b/src/lib/request-handler.js @@ -1,5 +1,5 @@ const {getLogger} = require('./logger') -const {ResponseMessage} = require('./server') +const {ResponseMessage, Connection} = require('./server') class RequestHandler { diff --git a/src/lib/server.js b/src/lib/server.js index 618ca8c..051b8be 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -285,12 +285,19 @@ class Connection extends EventEmitter { */ this._requestPromises = {} + /** + * @type {Promise} + * @private + */ + this._connectPromise = null + this._setLogger() } /** * @param {string} host * @param {number} port + * @return {Promise} */ connect(host, port) { if (this.socket !== null) @@ -298,14 +305,18 @@ class Connection extends EventEmitter { this._isOutgoing = true + this.logger.trace(`Connecting to ${host}:${port}`) + this.socket = new net.Socket() - this.socket.connect({host, port}) + this.socket.connect(port, host) this.remoteAddress = host this.remotePort = port this._setLogger() this._setSocketEvents() + + return this._connectPromise = createCallablePromise() } /** @@ -616,14 +627,19 @@ class Connection extends EventEmitter { } for (const no in this._requestPromises) { - this._requestPromises[no].reject(new Error('socket is closed')) + this._requestPromises[no].reject(new Error('Socket is closed')) } this._requestPromises = {} } onConnect = () => { - this.logger.debug('connection established') + if (this._connectPromise) { + this._connectPromise.resolve() + this._connectPromise = null + } + + this.logger.debug('Connection established.') this.emit('connect') } @@ -642,12 +658,16 @@ class Connection extends EventEmitter { onClose = (hadError) => { this._handleClose() - this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + this.logger.debug(`Socket closed` + (hadError ? ` with error` : '')) } onError = (error) => { + if (this._connectPromise) { + this._connectPromise.reject(error) + this._connectPromise = null + } this._handleClose() - this.logger.warn(`socket error:`, error) + this.logger.warn(`Socket error:`, error) } } |