aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:06 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:06 +0300
commit142869948c40900569f339a2177e95a3be3bbdfb (patch)
tree88011b401ce178767744bf6665d580c83d4e6ad3
parent203fe6221b07784bd595cadd7fc7516020c683cb (diff)
config refactor
-rw-r--r--src/config.js42
-rw-r--r--src/db.js10
-rwxr-xr-xsrc/jobd-master.js18
-rwxr-xr-xsrc/jobd.js29
-rw-r--r--src/server.js2
-rw-r--r--src/worker.js20
-rw-r--r--src/workers-list.js6
7 files changed, 68 insertions, 59 deletions
diff --git a/src/config.js b/src/config.js
index 9ee6338..a59615e 100644
--- a/src/config.js
+++ b/src/config.js
@@ -2,10 +2,7 @@ const fs = require('fs')
const ini = require('ini')
const {isNumeric} = require('./util')
-let workerConfig = {
- targets: {},
-}
-let masterConfig = {}
+let config = null
function readFile(file) {
if (!fs.existsSync(file))
@@ -46,8 +43,9 @@ function processScheme(source, scheme) {
}
function parseWorkerConfig(file) {
- const raw = readFile(file)
+ config = {}
+ const raw = readFile(file)
const scheme = {
host: {required: true},
port: {required: true, type: 'int'},
@@ -72,7 +70,9 @@ function parseWorkerConfig(file) {
launcher: {required: true},
max_output_buffer: {default: 1024*1024, type: 'int'},
}
- Object.assign(workerConfig, processScheme(raw, scheme))
+ Object.assign(config, processScheme(raw, scheme))
+
+ config.targets = {}
// targets
for (let target in raw) {
@@ -82,19 +82,20 @@ function parseWorkerConfig(file) {
if (typeof raw[target] !== 'object')
continue
- workerConfig.targets[target] = {slots: {}}
+ config.targets[target] = {slots: {}}
for (let slotName in raw[target]) {
let slotLimit = parseInt(raw[target][slotName], 10)
if (slotLimit < 1)
throw new Error(`${target}: slot ${slotName} has invalid limit`)
- workerConfig.targets[target].slots[slotName] = slotLimit
+ config.targets[target].slots[slotName] = slotLimit
}
}
}
function parseMasterConfig(file) {
- const raw = readFile(file)
+ config = {}
+ const raw = readFile(file)
const scheme = {
host: {required: true},
port: {required: true, type: 'int'},
@@ -107,13 +108,28 @@ function parseMasterConfig(file) {
log_level_file: {default: 'warn'},
log_level_console: {default: 'warn'},
}
- Object.assign(masterConfig, processScheme(raw, scheme))
+ Object.assign(config, processScheme(raw, scheme))
+}
+
+/**
+ * @param {string} key
+ * @return {string|number|object}
+ */
+function get(key = null) {
+ if (key === null)
+ return config
+
+ if (typeof config !== 'object')
+ throw new Error(`config is not loaded`)
+
+ if (!(key in config))
+ throw new Error(`config: ${key} not found`)
+
+ return config[key]
}
module.exports = {
parseWorkerConfig,
parseMasterConfig,
-
- workerConfig,
- masterConfig
+ get,
} \ No newline at end of file
diff --git a/src/db.js b/src/db.js
index 0ffba84..7874a92 100644
--- a/src/db.js
+++ b/src/db.js
@@ -1,4 +1,4 @@
-const {workerConfig} = require('./config')
+const config = require('./config')
const {getLogger} = require('./logger')
const mysql = require('promise-mysql')
@@ -7,10 +7,10 @@ const logger = getLogger('db')
async function init() {
link = await mysql.createConnection({
- host: workerConfig.mysql_host,
- user: workerConfig.mysql_user,
- password: workerConfig.mysql_password,
- database: workerConfig.mysql_database
+ host: config.get('mysql_host'),
+ user: config.get('mysql_user'),
+ password: config.get('mysql_password'),
+ database: config.get('mysql_database')
})
}
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 54e2377..6604e05 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -1,11 +1,9 @@
#!/usr/bin/env node
const minimist = require('minimist')
const loggerModule = require('./logger')
-const configModule = require('./config')
+const config = require('./config')
const {Server, ResponseMessage, RequestMessage} = require('./server')
const WorkersList = require('./workers-list')
-const {masterConfig} = configModule
-
/**
* @type {Logger}
@@ -50,27 +48,25 @@ async function main() {
// read config
try {
- configModule.parseMasterConfig(argv.config)
+ config.parseMasterConfig(argv.config)
} catch (e) {
console.error(`config parsing error: ${e.message}`)
process.exit(1)
}
await loggerModule.init({
- file: masterConfig.log_file,
- levelFile: masterConfig.log_level_file,
- levelConsole: masterConfig.log_level_console,
+ file: config.get('log_file'),
+ levelFile: config.get('log_level_file'),
+ levelConsole: config.get('log_level_console'),
})
logger = loggerModule.getLogger('jobd-master')
- // console.log(masterConfig)
-
workers = new WorkersList()
// start server
server = new Server()
server.on('message', onMessage)
- server.start(masterConfig.port, masterConfig.host)
+ server.start(config.get('port'), config.get('host'))
logger.info('server started')
}
@@ -89,7 +85,7 @@ async function onMessage({message, connection}) {
if (message.requestType !== 'ping')
logger.info('onMessage:', message)
- if (masterConfig.password && message.password !== masterConfig.password) {
+ if (config.get('password') && message.password !== config.get('password')) {
connection.send(new ResponseMessage().setError('invalid password'))
return connection.close()
}
diff --git a/src/jobd.js b/src/jobd.js
index 457c029..63f8716 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -1,14 +1,11 @@
#!/usr/bin/env node
const minimist = require('minimist')
const loggerModule = require('./logger')
-const configModule = require('./config')
+const config = require('./config')
const db = require('./db')
const {Server, Connection, RequestMessage, ResponseMessage} = require('./server')
const {Worker, STATUS_MANUAL} = require('./worker')
-const {workerConfig} = configModule
-
-
/**
* @type {Worker}
*/
@@ -57,21 +54,19 @@ async function main() {
// read config
try {
- configModule.parseWorkerConfig(argv.config)
+ config.parseWorkerConfig(argv.config)
} catch (e) {
console.error(`config parsing error: ${e.message}`)
process.exit(1)
}
await loggerModule.init({
- file: workerConfig.log_file,
- levelFile: workerConfig.log_level_file,
- levelConsole: workerConfig.log_level_console,
+ file: config.get('log_file'),
+ levelFile: config.get('log_level_file'),
+ levelConsole: config.get('log_level_console'),
})
logger = loggerModule.getLogger('jobd')
- // console.log(workerConfig)
-
// init database
try {
await db.init()
@@ -83,8 +78,8 @@ async function main() {
// init queue
worker = new Worker()
- for (let targetName in workerConfig.targets) {
- let slots = workerConfig.targets[targetName].slots
+ for (let targetName in config.get('targets')) {
+ let slots = config.get('targets')[targetName].slots
// let target = new Target({name: targetName})
// queue.addTarget(target)
@@ -105,11 +100,11 @@ async function main() {
// start server
server = new Server()
server.on('message', onMessage)
- server.start(workerConfig.port, workerConfig.host)
+ server.start(config.get('port'), config.get('host'))
logger.info('server started')
// connect to master
- if (workerConfig.master_port && workerConfig.master_host)
+ if (config.get('master_port') && config.get('master_host'))
connectToMaster()
}
@@ -129,7 +124,7 @@ async function onMessage({message, connection}) {
if (message.requestType !== 'ping')
logger.info('onMessage:', message)
- if (workerConfig.password && message.password !== workerConfig.password) {
+ if (config.get('password') && message.password !== config.get('password')) {
connection.send(new ResponseMessage().setError('invalid password'))
return connection.close()
}
@@ -204,7 +199,7 @@ async function onMessage({message, connection}) {
function connectToMaster() {
const connection = new Connection()
- connection.connect(workerConfig.master_host, workerConfig.master_port)
+ connection.connect(config.get('master_host'), config.get('master_port'))
connection.on('connect', function() {
connection.send(
@@ -218,7 +213,7 @@ function connectToMaster() {
logger.warn(`connectToMaster: connection closed`)
setTimeout(() => {
connectToMaster()
- }, workerConfig.master_reconnect_timeout * 1000)
+ }, config.get('master_reconnect_timeout') * 1000)
})
connection.on('message', (message) => {
diff --git a/src/server.js b/src/server.js
index b5eca9a..08eadca 100644
--- a/src/server.js
+++ b/src/server.js
@@ -373,6 +373,8 @@ class Connection extends EventEmitter {
if (!(message instanceof Message))
throw new Error('send expects Message, got', message)
+ // TODO set password!
+
let json = JSON.stringify(message.getAsObject())
let buf = Buffer.concat([
Buffer.from(json),
diff --git a/src/worker.js b/src/worker.js
index 663a517..b4beeab 100644
--- a/src/worker.js
+++ b/src/worker.js
@@ -4,7 +4,7 @@ const db = require('./db')
const {timestamp} = require('./util')
const {getLogger} = require('./logger')
const EventEmitter = require('events')
-const {workerConfig} = require('./config')
+const config = require('./config')
const STATUS_WAITING = 'waiting'
const STATUS_MANUAL = 'manual'
@@ -140,7 +140,7 @@ class Worker extends EventEmitter {
this.getTasks(targets)
.then(({rows}) => {
let message = `${LOGPREFIX} ${rows} processed`
- if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) {
+ if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) {
// it seems, there are more, so we'll need to perform another query
this.setPollTargets(targets)
message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
@@ -223,7 +223,7 @@ class Worker extends EventEmitter {
let sqlFields = `id, status, target, slot`
let sql
if (data.id) {
- sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE`
+ sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE`
} else {
let targets
if (target === null) {
@@ -233,9 +233,9 @@ class Worker extends EventEmitter {
} else {
targets = target
}
- let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : ''
+ let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : ''
let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
- sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
+ sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
}
/** @type {object[]} results */
@@ -282,10 +282,10 @@ class Worker extends EventEmitter {
}
if (accepted.length)
- await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
+ await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
if (ignored.length)
- await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
+ await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
await db.commit()
@@ -340,13 +340,13 @@ class Worker extends EventEmitter {
* @param {number} id
*/
async run(id) {
- let command = workerConfig.launcher.replace(/\{id\}/g, id)
+ let command = config.get('launcher').replace(/\{id\}/g, id)
let args = command.split(/ +/)
return new Promise((resolve, reject) => {
this.logger.info(`run(${id}): launching`, args)
let process = child_process.spawn(args[0], args.slice(1), {
- maxBuffer: workerConfig.max_output_buffer
+ maxBuffer: config.get('max_output_buffer')
})
let stdoutChunks = []
@@ -425,7 +425,7 @@ class Worker extends EventEmitter {
list.push(`${field}=${val}`)
}
- await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id])
+ await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id])
}
/**
diff --git a/src/workers-list.js b/src/workers-list.js
index 96a127a..620f711 100644
--- a/src/workers-list.js
+++ b/src/workers-list.js
@@ -1,5 +1,5 @@
const intersection = require('lodash/intersection')
-const {masterConfig} = require('./config')
+const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage} = require('./server')
const throttle = require('lodash/throttle')
@@ -25,7 +25,7 @@ class WorkersList {
/**
* @type {NodeJS.Timeout}
*/
- this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000)
+ this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000)
/**
* @type {Logger}
@@ -100,7 +100,7 @@ class WorkersList {
}
this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke)
}
- }, masterConfig.poke_throttle_interval * 1000, {leading: true})
+ }, config.get('poke_throttle_interval') * 1000, {leading: true})
/**
* @param {Connection} connection