From 142869948c40900569f339a2177e95a3be3bbdfb Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 28 Feb 2021 16:11:06 +0300 Subject: config refactor --- src/config.js | 42 +++++++++++++++++++++++++++++------------- src/db.js | 10 +++++----- src/jobd-master.js | 18 +++++++----------- src/jobd.js | 29 ++++++++++++----------------- src/server.js | 2 ++ src/worker.js | 20 ++++++++++---------- src/workers-list.js | 6 +++--- 7 files changed, 68 insertions(+), 59 deletions(-) diff --git a/src/config.js b/src/config.js index 9ee6338..a59615e 100644 --- a/src/config.js +++ b/src/config.js @@ -2,10 +2,7 @@ const fs = require('fs') const ini = require('ini') const {isNumeric} = require('./util') -let workerConfig = { - targets: {}, -} -let masterConfig = {} +let config = null function readFile(file) { if (!fs.existsSync(file)) @@ -46,8 +43,9 @@ function processScheme(source, scheme) { } function parseWorkerConfig(file) { - const raw = readFile(file) + config = {} + const raw = readFile(file) const scheme = { host: {required: true}, port: {required: true, type: 'int'}, @@ -72,7 +70,9 @@ function parseWorkerConfig(file) { launcher: {required: true}, max_output_buffer: {default: 1024*1024, type: 'int'}, } - Object.assign(workerConfig, processScheme(raw, scheme)) + Object.assign(config, processScheme(raw, scheme)) + + config.targets = {} // targets for (let target in raw) { @@ -82,19 +82,20 @@ function parseWorkerConfig(file) { if (typeof raw[target] !== 'object') continue - workerConfig.targets[target] = {slots: {}} + config.targets[target] = {slots: {}} for (let slotName in raw[target]) { let slotLimit = parseInt(raw[target][slotName], 10) if (slotLimit < 1) throw new Error(`${target}: slot ${slotName} has invalid limit`) - workerConfig.targets[target].slots[slotName] = slotLimit + config.targets[target].slots[slotName] = slotLimit } } } function parseMasterConfig(file) { - const raw = readFile(file) + config = {} + const raw = readFile(file) const scheme = { host: {required: true}, port: {required: true, type: 'int'}, @@ -107,13 +108,28 @@ function parseMasterConfig(file) { log_level_file: {default: 'warn'}, log_level_console: {default: 'warn'}, } - Object.assign(masterConfig, processScheme(raw, scheme)) + Object.assign(config, processScheme(raw, scheme)) +} + +/** + * @param {string} key + * @return {string|number|object} + */ +function get(key = null) { + if (key === null) + return config + + if (typeof config !== 'object') + throw new Error(`config is not loaded`) + + if (!(key in config)) + throw new Error(`config: ${key} not found`) + + return config[key] } module.exports = { parseWorkerConfig, parseMasterConfig, - - workerConfig, - masterConfig + get, } \ No newline at end of file diff --git a/src/db.js b/src/db.js index 0ffba84..7874a92 100644 --- a/src/db.js +++ b/src/db.js @@ -1,4 +1,4 @@ -const {workerConfig} = require('./config') +const config = require('./config') const {getLogger} = require('./logger') const mysql = require('promise-mysql') @@ -7,10 +7,10 @@ const logger = getLogger('db') async function init() { link = await mysql.createConnection({ - host: workerConfig.mysql_host, - user: workerConfig.mysql_user, - password: workerConfig.mysql_password, - database: workerConfig.mysql_database + host: config.get('mysql_host'), + user: config.get('mysql_user'), + password: config.get('mysql_password'), + database: config.get('mysql_database') }) } diff --git a/src/jobd-master.js b/src/jobd-master.js index 54e2377..6604e05 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -1,11 +1,9 @@ #!/usr/bin/env node const minimist = require('minimist') const loggerModule = require('./logger') -const configModule = require('./config') +const config = require('./config') const {Server, ResponseMessage, RequestMessage} = require('./server') const WorkersList = require('./workers-list') -const {masterConfig} = configModule - /** * @type {Logger} @@ -50,27 +48,25 @@ async function main() { // read config try { - configModule.parseMasterConfig(argv.config) + config.parseMasterConfig(argv.config) } catch (e) { console.error(`config parsing error: ${e.message}`) process.exit(1) } await loggerModule.init({ - file: masterConfig.log_file, - levelFile: masterConfig.log_level_file, - levelConsole: masterConfig.log_level_console, + file: config.get('log_file'), + levelFile: config.get('log_level_file'), + levelConsole: config.get('log_level_console'), }) logger = loggerModule.getLogger('jobd-master') - // console.log(masterConfig) - workers = new WorkersList() // start server server = new Server() server.on('message', onMessage) - server.start(masterConfig.port, masterConfig.host) + server.start(config.get('port'), config.get('host')) logger.info('server started') } @@ -89,7 +85,7 @@ async function onMessage({message, connection}) { if (message.requestType !== 'ping') logger.info('onMessage:', message) - if (masterConfig.password && message.password !== masterConfig.password) { + if (config.get('password') && message.password !== config.get('password')) { connection.send(new ResponseMessage().setError('invalid password')) return connection.close() } diff --git a/src/jobd.js b/src/jobd.js index 457c029..63f8716 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -1,14 +1,11 @@ #!/usr/bin/env node const minimist = require('minimist') const loggerModule = require('./logger') -const configModule = require('./config') +const config = require('./config') const db = require('./db') const {Server, Connection, RequestMessage, ResponseMessage} = require('./server') const {Worker, STATUS_MANUAL} = require('./worker') -const {workerConfig} = configModule - - /** * @type {Worker} */ @@ -57,21 +54,19 @@ async function main() { // read config try { - configModule.parseWorkerConfig(argv.config) + config.parseWorkerConfig(argv.config) } catch (e) { console.error(`config parsing error: ${e.message}`) process.exit(1) } await loggerModule.init({ - file: workerConfig.log_file, - levelFile: workerConfig.log_level_file, - levelConsole: workerConfig.log_level_console, + file: config.get('log_file'), + levelFile: config.get('log_level_file'), + levelConsole: config.get('log_level_console'), }) logger = loggerModule.getLogger('jobd') - // console.log(workerConfig) - // init database try { await db.init() @@ -83,8 +78,8 @@ async function main() { // init queue worker = new Worker() - for (let targetName in workerConfig.targets) { - let slots = workerConfig.targets[targetName].slots + for (let targetName in config.get('targets')) { + let slots = config.get('targets')[targetName].slots // let target = new Target({name: targetName}) // queue.addTarget(target) @@ -105,11 +100,11 @@ async function main() { // start server server = new Server() server.on('message', onMessage) - server.start(workerConfig.port, workerConfig.host) + server.start(config.get('port'), config.get('host')) logger.info('server started') // connect to master - if (workerConfig.master_port && workerConfig.master_host) + if (config.get('master_port') && config.get('master_host')) connectToMaster() } @@ -129,7 +124,7 @@ async function onMessage({message, connection}) { if (message.requestType !== 'ping') logger.info('onMessage:', message) - if (workerConfig.password && message.password !== workerConfig.password) { + if (config.get('password') && message.password !== config.get('password')) { connection.send(new ResponseMessage().setError('invalid password')) return connection.close() } @@ -204,7 +199,7 @@ async function onMessage({message, connection}) { function connectToMaster() { const connection = new Connection() - connection.connect(workerConfig.master_host, workerConfig.master_port) + connection.connect(config.get('master_host'), config.get('master_port')) connection.on('connect', function() { connection.send( @@ -218,7 +213,7 @@ function connectToMaster() { logger.warn(`connectToMaster: connection closed`) setTimeout(() => { connectToMaster() - }, workerConfig.master_reconnect_timeout * 1000) + }, config.get('master_reconnect_timeout') * 1000) }) connection.on('message', (message) => { diff --git a/src/server.js b/src/server.js index b5eca9a..08eadca 100644 --- a/src/server.js +++ b/src/server.js @@ -373,6 +373,8 @@ class Connection extends EventEmitter { if (!(message instanceof Message)) throw new Error('send expects Message, got', message) + // TODO set password! + let json = JSON.stringify(message.getAsObject()) let buf = Buffer.concat([ Buffer.from(json), diff --git a/src/worker.js b/src/worker.js index 663a517..b4beeab 100644 --- a/src/worker.js +++ b/src/worker.js @@ -4,7 +4,7 @@ const db = require('./db') const {timestamp} = require('./util') const {getLogger} = require('./logger') const EventEmitter = require('events') -const {workerConfig} = require('./config') +const config = require('./config') const STATUS_WAITING = 'waiting' const STATUS_MANUAL = 'manual' @@ -140,7 +140,7 @@ class Worker extends EventEmitter { this.getTasks(targets) .then(({rows}) => { let message = `${LOGPREFIX} ${rows} processed` - if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) { + if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { // it seems, there are more, so we'll need to perform another query this.setPollTargets(targets) message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` @@ -223,7 +223,7 @@ class Worker extends EventEmitter { let sqlFields = `id, status, target, slot` let sql if (data.id) { - sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE` + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` } else { let targets if (target === null) { @@ -233,9 +233,9 @@ class Worker extends EventEmitter { } else { targets = target } - let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : '' + let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` - sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` } /** @type {object[]} results */ @@ -282,10 +282,10 @@ class Worker extends EventEmitter { } if (accepted.length) - await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) if (ignored.length) - await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) await db.commit() @@ -340,13 +340,13 @@ class Worker extends EventEmitter { * @param {number} id */ async run(id) { - let command = workerConfig.launcher.replace(/\{id\}/g, id) + let command = config.get('launcher').replace(/\{id\}/g, id) let args = command.split(/ +/) return new Promise((resolve, reject) => { this.logger.info(`run(${id}): launching`, args) let process = child_process.spawn(args[0], args.slice(1), { - maxBuffer: workerConfig.max_output_buffer + maxBuffer: config.get('max_output_buffer') }) let stdoutChunks = [] @@ -425,7 +425,7 @@ class Worker extends EventEmitter { list.push(`${field}=${val}`) } - await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id]) + await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id]) } /** diff --git a/src/workers-list.js b/src/workers-list.js index 96a127a..620f711 100644 --- a/src/workers-list.js +++ b/src/workers-list.js @@ -1,5 +1,5 @@ const intersection = require('lodash/intersection') -const {masterConfig} = require('./config') +const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage} = require('./server') const throttle = require('lodash/throttle') @@ -25,7 +25,7 @@ class WorkersList { /** * @type {NodeJS.Timeout} */ - this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000) + this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000) /** * @type {Logger} @@ -100,7 +100,7 @@ class WorkersList { } this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) } - }, masterConfig.poke_throttle_interval * 1000, {leading: true}) + }, config.get('poke_throttle_interval') * 1000, {leading: true}) /** * @param {Connection} connection -- cgit v1.2.3