From feaa5065f900a9c031ca7d66d80957040e2ee99f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 28 Feb 2021 16:11:57 +0300 Subject: refactor: moved some files to lib/ --- src/config.js | 135 -------------- src/db.js | 49 ----- src/jobd-master.js | 8 +- src/jobd.js | 10 +- src/lib/config.js | 135 ++++++++++++++ src/lib/db.js | 49 +++++ src/lib/logger.js | 97 ++++++++++ src/lib/server.js | 452 +++++++++++++++++++++++++++++++++++++++++++++ src/lib/util.js | 9 + src/lib/worker.js | 478 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib/workers-list.js | 145 +++++++++++++++ src/logger.js | 97 ---------- src/server.js | 452 --------------------------------------------- src/util.js | 9 - src/worker.js | 478 ------------------------------------------------ src/workers-list.js | 145 --------------- 16 files changed, 1374 insertions(+), 1374 deletions(-) delete mode 100644 src/config.js delete mode 100644 src/db.js create mode 100644 src/lib/config.js create mode 100644 src/lib/db.js create mode 100644 src/lib/logger.js create mode 100644 src/lib/server.js create mode 100644 src/lib/util.js create mode 100644 src/lib/worker.js create mode 100644 src/lib/workers-list.js delete mode 100644 src/logger.js delete mode 100644 src/server.js delete mode 100644 src/util.js delete mode 100644 src/worker.js delete mode 100644 src/workers-list.js diff --git a/src/config.js b/src/config.js deleted file mode 100644 index a59615e..0000000 --- a/src/config.js +++ /dev/null @@ -1,135 +0,0 @@ -const fs = require('fs') -const ini = require('ini') -const {isNumeric} = require('./util') - -let config = null - -function readFile(file) { - if (!fs.existsSync(file)) - throw new Error(`file ${file} not found`) - - return ini.parse(fs.readFileSync(file, 'utf-8')) -} - -function processScheme(source, scheme) { - const result = {} - - for (let key in scheme) { - let opts = scheme[key] - let ne = !(key in source) || !source[key] - if (opts.required === true && ne) - throw new Error(`'${key}' is not defined`) - - let value = source[key] ?? opts.default ?? null - - switch (opts.type) { - case 'int': - if (!isNumeric(value)) - throw new Error(`'${key}' must be an integer`) - value = parseInt(value, 10) - break - - case 'float': - if (!isNumeric(value)) - throw new Error(`'${key}' must be a float`) - value = parseFloat(value) - break - } - - result[key] = value - } - - return result -} - -function parseWorkerConfig(file) { - config = {} - - const raw = readFile(file) - const scheme = { - host: {required: true}, - port: {required: true, type: 'int'}, - password: {}, - - master_host: {}, - master_port: {type: 'int', default: 0}, - master_reconnect_timeout: {type: 'int', default: 10}, - - log_file: {}, - log_level_file: {default: 'warn'}, - log_level_console: {default: 'warn'}, - - mysql_host: {required: true}, - mysql_port: {required: true, type: 'int'}, - mysql_user: {required: true}, - mysql_password: {required: true}, - mysql_database: {required: true}, - mysql_table: {required: true, default: 'jobs'}, - mysql_fetch_limit: {default: 100, type: 'int'}, - - launcher: {required: true}, - max_output_buffer: {default: 1024*1024, type: 'int'}, - } - Object.assign(config, processScheme(raw, scheme)) - - config.targets = {} - - // targets - for (let target in raw) { - if (target === 'null') - throw new Error('word \'null\' is reserved, please don\'t use it as a target name') - - if (typeof raw[target] !== 'object') - continue - - config.targets[target] = {slots: {}} - for (let slotName in raw[target]) { - let slotLimit = parseInt(raw[target][slotName], 10) - if (slotLimit < 1) - throw new Error(`${target}: slot ${slotName} has invalid limit`) - config.targets[target].slots[slotName] = slotLimit - } - } -} - -function parseMasterConfig(file) { - config = {} - - const raw = readFile(file) - const scheme = { - host: {required: true}, - port: {required: true, type: 'int'}, - password: {}, - - ping_interval: {default: 30, type: 'int'}, - poke_throttle_interval: {default: 0.5, type: 'float'}, - - log_file: {}, - log_level_file: {default: 'warn'}, - log_level_console: {default: 'warn'}, - } - Object.assign(config, processScheme(raw, scheme)) -} - -/** - * @param {string} key - * @return {string|number|object} - */ -function get(key = null) { - if (key === null) - return config - - if (typeof config !== 'object') - throw new Error(`config is not loaded`) - - if (!(key in config)) - throw new Error(`config: ${key} not found`) - - return config[key] -} - -module.exports = { - parseWorkerConfig, - parseMasterConfig, - get, -} \ No newline at end of file diff --git a/src/db.js b/src/db.js deleted file mode 100644 index 7874a92..0000000 --- a/src/db.js +++ /dev/null @@ -1,49 +0,0 @@ -const config = require('./config') -const {getLogger} = require('./logger') -const mysql = require('promise-mysql') - -let link -const logger = getLogger('db') - -async function init() { - link = await mysql.createConnection({ - host: config.get('mysql_host'), - user: config.get('mysql_user'), - password: config.get('mysql_password'), - database: config.get('mysql_database') - }) -} - -function wrap(method, isAsync = true, log = true) { - return isAsync ? async function(...args) { - if (log) - logger.trace(`${method}: `, args) - - try { - return await link[method](...args) - } catch (error) { - logger.error(`db.${method}:`, error, link) - - if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR' - || error.code === 'PROTOCOL_CONNECTION_LOST' - || error.fatal === true) { - // try to reconnect and call it again, once - await init() - return await link[method](...args) - } - } - } : function(...args) { - if (log) - logger.trace(`${method}: `, args) - - return link[method](...args) - } -} - -module.exports = { - init, - query: wrap('query'), - beginTransaction: wrap('beginTransaction'), - commit: wrap('commit'), - escape: wrap('escape', false, false) -} \ No newline at end of file diff --git a/src/jobd-master.js b/src/jobd-master.js index 6604e05..608750d 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -1,9 +1,9 @@ #!/usr/bin/env node const minimist = require('minimist') -const loggerModule = require('./logger') -const config = require('./config') -const {Server, ResponseMessage, RequestMessage} = require('./server') -const WorkersList = require('./workers-list') +const loggerModule = require('./lib/logger') +const config = require('./lib/config') +const {Server, ResponseMessage, RequestMessage} = require('./lib/server') +const WorkersList = require('./lib/workers-list') /** * @type {Logger} diff --git a/src/jobd.js b/src/jobd.js index 63f8716..a175b63 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -1,10 +1,10 @@ #!/usr/bin/env node const minimist = require('minimist') -const loggerModule = require('./logger') -const config = require('./config') -const db = require('./db') -const {Server, Connection, RequestMessage, ResponseMessage} = require('./server') -const {Worker, STATUS_MANUAL} = require('./worker') +const loggerModule = require('./lib/logger') +const config = require('./lib/config') +const db = require('./lib/db') +const {Server, Connection, RequestMessage, ResponseMessage} = require('./lib/server') +const {Worker, STATUS_MANUAL} = require('./lib/worker') /** * @type {Worker} diff --git a/src/lib/config.js b/src/lib/config.js new file mode 100644 index 0000000..a59615e --- /dev/null +++ b/src/lib/config.js @@ -0,0 +1,135 @@ +const fs = require('fs') +const ini = require('ini') +const {isNumeric} = require('./util') + +let config = null + +function readFile(file) { + if (!fs.existsSync(file)) + throw new Error(`file ${file} not found`) + + return ini.parse(fs.readFileSync(file, 'utf-8')) +} + +function processScheme(source, scheme) { + const result = {} + + for (let key in scheme) { + let opts = scheme[key] + let ne = !(key in source) || !source[key] + if (opts.required === true && ne) + throw new Error(`'${key}' is not defined`) + + let value = source[key] ?? opts.default ?? null + + switch (opts.type) { + case 'int': + if (!isNumeric(value)) + throw new Error(`'${key}' must be an integer`) + value = parseInt(value, 10) + break + + case 'float': + if (!isNumeric(value)) + throw new Error(`'${key}' must be a float`) + value = parseFloat(value) + break + } + + result[key] = value + } + + return result +} + +function parseWorkerConfig(file) { + config = {} + + const raw = readFile(file) + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + master_host: {}, + master_port: {type: 'int', default: 0}, + master_reconnect_timeout: {type: 'int', default: 10}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + + mysql_host: {required: true}, + mysql_port: {required: true, type: 'int'}, + mysql_user: {required: true}, + mysql_password: {required: true}, + mysql_database: {required: true}, + mysql_table: {required: true, default: 'jobs'}, + mysql_fetch_limit: {default: 100, type: 'int'}, + + launcher: {required: true}, + max_output_buffer: {default: 1024*1024, type: 'int'}, + } + Object.assign(config, processScheme(raw, scheme)) + + config.targets = {} + + // targets + for (let target in raw) { + if (target === 'null') + throw new Error('word \'null\' is reserved, please don\'t use it as a target name') + + if (typeof raw[target] !== 'object') + continue + + config.targets[target] = {slots: {}} + for (let slotName in raw[target]) { + let slotLimit = parseInt(raw[target][slotName], 10) + if (slotLimit < 1) + throw new Error(`${target}: slot ${slotName} has invalid limit`) + config.targets[target].slots[slotName] = slotLimit + } + } +} + +function parseMasterConfig(file) { + config = {} + + const raw = readFile(file) + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + ping_interval: {default: 30, type: 'int'}, + poke_throttle_interval: {default: 0.5, type: 'float'}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + } + Object.assign(config, processScheme(raw, scheme)) +} + +/** + * @param {string} key + * @return {string|number|object} + */ +function get(key = null) { + if (key === null) + return config + + if (typeof config !== 'object') + throw new Error(`config is not loaded`) + + if (!(key in config)) + throw new Error(`config: ${key} not found`) + + return config[key] +} + +module.exports = { + parseWorkerConfig, + parseMasterConfig, + get, +} \ No newline at end of file diff --git a/src/lib/db.js b/src/lib/db.js new file mode 100644 index 0000000..7874a92 --- /dev/null +++ b/src/lib/db.js @@ -0,0 +1,49 @@ +const config = require('./config') +const {getLogger} = require('./logger') +const mysql = require('promise-mysql') + +let link +const logger = getLogger('db') + +async function init() { + link = await mysql.createConnection({ + host: config.get('mysql_host'), + user: config.get('mysql_user'), + password: config.get('mysql_password'), + database: config.get('mysql_database') + }) +} + +function wrap(method, isAsync = true, log = true) { + return isAsync ? async function(...args) { + if (log) + logger.trace(`${method}: `, args) + + try { + return await link[method](...args) + } catch (error) { + logger.error(`db.${method}:`, error, link) + + if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR' + || error.code === 'PROTOCOL_CONNECTION_LOST' + || error.fatal === true) { + // try to reconnect and call it again, once + await init() + return await link[method](...args) + } + } + } : function(...args) { + if (log) + logger.trace(`${method}: `, args) + + return link[method](...args) + } +} + +module.exports = { + init, + query: wrap('query'), + beginTransaction: wrap('beginTransaction'), + commit: wrap('commit'), + escape: wrap('escape', false, false) +} \ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js new file mode 100644 index 0000000..260ba41 --- /dev/null +++ b/src/lib/logger.js @@ -0,0 +1,97 @@ +const log4js = require('log4js') +const fs = require('fs/promises') +const fsConstants = require('fs').constants +const util = require('src/lib/util') + +module.exports = { + /** + * @param {string} file + * @param {string} levelFile + * @param {string} levelConsole + */ + async init({file, levelFile, levelConsole}) { + const categories = { + default: { + appenders: ['stdout-filter'], + level: 'trace' + } + } + + const appenders = { + stdout: { + type: 'stdout', + level: 'trace' + }, + 'stdout-filter': { + type: 'logLevelFilter', + appender: 'stdout', + level: levelConsole + } + } + + if (file) { + let exists + try { + await fs.stat(file) + exists = true + } catch (error) { + exists = false + } + + // if file exists + if (exists) { + // see if it's writable + try { + // this promise fullfills with undefined upon success + await fs.access(file, fsConstants.W_OK) + } catch (error) { + throw new Error(`file '${file}' is not writable:` + error.message) + } + } else { + // try to create an empty file + let fd + try { + fd = await fs.open(file, 'wx') + } catch (error) { + throw new Error(`failed to create file '${file}': ` + error.message) + } finally { + await fd?.close() + } + } + + categories.default.appenders.push('file-filter') + appenders.file = { + type: 'file', + filename: file, + maxLogSize: 1024 * 1024 * 50, + debug: 'debug' + } + appenders['file-filter'] = { + type: 'logLevelFilter', + appender: 'file', + level: levelFile + } + } + + log4js.configure({ + appenders, + categories + }) + }, + + /** + * @return {Logger} + */ + getLogger(...args) { + return log4js.getLogger(...args) + }, + + /** + * @param cb + */ + shutdown(cb) { + log4js.shutdown(cb) + }, + + Logger: log4js.Logger, +} diff --git a/src/lib/server.js b/src/lib/server.js new file mode 100644 index 0000000..08eadca --- /dev/null +++ b/src/lib/server.js @@ -0,0 +1,452 @@ +const net = require('net') +const EventEmitter = require('events') +const {getLogger} = require('./logger') +const isObject = require('lodash/isObject') + +const EOT = 0x04 + +class Message { + + static REQUEST = 0 + static RESPONSE = 1 + + /** + * @param {number} type + */ + constructor(type) { + /** + * @type {number} + */ + this.type = type + } + + getAsObject() { + return [this.type] + } + +} + +class ResponseMessage extends Message { + constructor() { + super(Message.RESPONSE) + + this.error = null + this.data = null + } + + setError(error) { + this.error = error + return this + } + + setData(data) { + this.data = data + return this + } + + getAsObject() { + return [ + ...super.getAsObject(), + [ + this.error, + this.data + ] + ] + } +} + +class RequestMessage extends Message { + /** + * @param {string} type + * @param {any} data + */ + constructor(type, data = null) { + super(Message.REQUEST) + + /** + * @type string + */ + this.requestType = type + + /** + * @type any + */ + this.requestData = data + + /** + * @type {null|string} + */ + this.password = null + } + + getAsObject() { + let request = { + type: this.requestType + } + + if (this.requestData) + request.data = this.requestData + + return [ + ...super.getAsObject(), + request + ] + } + + /** + * @param {string} password + */ + setPassword(password) { + this.password = password + } +} + +class Server extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Server} + */ + this.server = null + + /** + * @type {Logger} + */ + this.logger = getLogger('server') + } + + /** + * @param {number} port + * @param {string} host + */ + start(port, host) { + this.server = net.createServer() + + this.server.on('connection', this.onConnection) + this.server.on('error', this.onError) + this.server.on('listening', this.onListening) + + this.server.listen(port, host) + } + + /** + * @param {module:net.Socket} socket + */ + onConnection = (socket) => { + let connection = new Connection() + connection.setSocket(socket) + connection.on('message', (message) => { + this.emit('message', { + message, + connection + }) + }) + + this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + } + + onListening = () => { + let addr = this.server.address() + this.logger.info(`server is listening on ${addr.address}:${addr.port}`) + } + + onError = (error) => { + this.logger.error('error: ', error) + } + +} + +class Connection extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Socket} + */ + this.socket = null + + /** + * @type {Buffer} + */ + this.data = Buffer.from([]) + + /** + * @type {boolean} + * @private + */ + this._closeEmitted = false + + /** + * @type {null|string} + */ + this.remoteAddress = null + + /** + * @type {null|number} + */ + this.remotePort = null + + /** + * @type {null|number} + */ + this.id = null + + this._setLogger() + } + + /** + * @param {string} host + * @param {number} port + */ + connect(host, port) { + if (this.socket !== null) + throw new Error(`this Connection already has a socket`) + + this.socket = new net.Socket() + this.socket.connect({host, port}) + + this.remoteAddress = host + this.remotePort = port + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @param {module:net.Socket} socket + */ + setSocket(socket) { + this.socket = socket + + this.remoteAddress = socket.remoteAddress + this.remotePort = socket.remotePort + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @private + */ + _setLogger() { + let addr = this.socket ? this.remoteAddr() : '?' + this.logger = getLogger(``) + } + + /** + * @private + */ + _setId() { + this.id = Math.floor(Math.random() * 10000) + } + + /** + * @private + */ + _setSocketEvents() { + this.socket.on('connect', this.onConnect) + this.socket.on('data', this.onData) + this.socket.on('end', this.onEnd) + this.socket.on('close', this.onClose) + this.socket.on('error', this.onError) + } + + /** + * @param {Buffer} data + * @private + */ + _appendToBuffer(data) { + this.data = Buffer.concat([this.data, data]) + } + + /** + * @return {string} + */ + remoteAddr() { + return this.remoteAddress + ':' + this.remotePort + } + + /** + * @private + */ + _processChunks() { + if (!this.data.length) + return + + this.logger.trace(`processChunks (start):`, this.data) + + /** + * @type {Buffer[]} + */ + let messages = [] + let offset = 0 + let eotPos + do { + eotPos = this.data.indexOf(EOT, offset) + if (eotPos !== -1) { + let message = this.data.slice(offset, eotPos) + messages.push(message) + + this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`) + offset = eotPos + 1 + } + } while (eotPos !== -1 && offset < this.data.length-1) + + if (offset !== 0) { + this.data = this.data.slice(offset) + this.logger.trace(`processChunks: slicing data from ${offset}`) + } + + this.logger.trace(`processChunks (after parsing):`, this.data) + + for (let message of messages) { + try { + let buf = message.toString('utf-8') + this.logger.debug(buf) + + let json = JSON.parse(buf) + this._emitMessage(json) + } catch (error) { + this.logger.error('failed to parse data as JSON') + this.logger.debug(message) + } + } + } + + /** + * @param {object} json + * @private + */ + _emitMessage(json) { + if (!Array.isArray(json)) { + this.logger.error('malformed message, JSON array expected', json) + return + } + + let type = json.shift() + let message + switch (type) { + case Message.REQUEST: { + let data = json.shift() + if (!data || !isObject(data)) { + this.logger.error('malformed REQUEST message') + return + } + + message = new RequestMessage(data.type, data.data || null) + if (data.password) + message.setPassword(data.password) + break + } + + case Message.RESPONSE: { + let data = json.shift() + if (!data || !Array.isArray(data) || data.length < 2) { + this.logger.error('malformed RESPONSE message') + return + } + + message = new ResponseMessage() + message.setError(data[0]).setData(data[1]) + + break + } + + default: + this.logger.error(`malformed message, unexpected type ${type}`) + return + } + + this.emit('message', message) + } + + /** + * @type {Message} data + * @param message + */ + send(message) { + if (!(message instanceof Message)) + throw new Error('send expects Message, got', message) + + // TODO set password! + + let json = JSON.stringify(message.getAsObject()) + let buf = Buffer.concat([ + Buffer.from(json), + Buffer.from([EOT]) + ]) + + this.logger.debug('send:', json) + this.logger.trace('send:', buf) + + try { + this.socket.write(buf) + } catch (error) { + this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error) + } + } + + /** + */ + close() { + try { + this.socket.end() + this.socket.destroy() + this._emitClose() + } catch (error) { + this.logger.error('close:', error) + } + } + + /** + * @private + */ + _emitClose() { + if (this._closeEmitted) + return + + this._closeEmitted = true + this.emit('close') + } + + onConnect = () => { + this.logger.debug('connection established') + this.emit('connect') + } + + onData = (data) => { + this.logger.trace('onData', data) + this._appendToBuffer(data) + this._processChunks() + } + + onEnd = (data) => { + if (data) + this._appendToBuffer(data) + + this._processChunks() + } + + onClose = (hadError) => { + this._emitClose() + this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + } + + onError = (error) => { + this._emitClose() + this.logger.warn(`socket error:`, error) + } + +} + +module.exports = { + Server, + Connection, + RequestMessage, + ResponseMessage +} \ No newline at end of file diff --git a/src/lib/util.js b/src/lib/util.js new file mode 100644 index 0000000..6de07e5 --- /dev/null +++ b/src/lib/util.js @@ -0,0 +1,9 @@ +module.exports = { + timestamp() { + return parseInt(+(new Date())/1000) + }, + + isNumeric(n) { + return !isNaN(parseFloat(n)) && isFinite(n) + } +} \ No newline at end of file diff --git a/src/lib/worker.js b/src/lib/worker.js new file mode 100644 index 0000000..b4beeab --- /dev/null +++ b/src/lib/worker.js @@ -0,0 +1,478 @@ +const Queue = require('queue') +const child_process = require('child_process') +const db = require('./db') +const {timestamp} = require('./util') +const {getLogger} = require('./logger') +const EventEmitter = require('events') +const config = require('./config') + +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' +const STATUS_ACCEPTED = 'accepted' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' + +const RESULT_OK = 'ok' +const RESULT_FAIL = 'fail' + +class Worker extends EventEmitter { + + constructor() { + super() + + /** + * @type {object.}>} + */ + this.targets = {} + + /** + * @type {boolean} + */ + this.polling = false + + /** + * @type {boolean} + */ + this.nextpoll = {} + + /** + * @type {Logger} + */ + this.logger = getLogger('Worker') + } + + /** + * @param {string} target + * @param {string} slot + * @param {number} limit + */ + addSlot(target, slot, limit) { + this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + + if (this.targets[target] === undefined) + this.targets[target] = {slots: {}} + + if (this.targets[target].slots[slot] !== undefined) + throw new Error(`slot ${slot} for target ${target} has already been added`) + + let queue = Queue({ + concurrency: limit, + autostart: true + }) + queue.on('success', this.onJobFinished.bind(this, target, slot)) + queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.start() + + this.targets[target].slots[slot] = {limit, queue} + } + + /** + * @param {string} target + * @returns {boolean} + */ + hasTarget(target) { + return (target in this.targets) + } + + /** + * Returns status of all queues. + * + * @return {object} + */ + getStatus() { + let status = {targets: {}} + for (const targetName in this.targets) { + let target = this.targets[targetName] + status.targets[targetName] = {} + for (const slotName in target.slots) { + const {queue, limit} = target.slots[slotName] + status.targets[targetName][slotName] = { + concurrency: queue.concurrency, + limit, + length: queue.length, + } + } + } + return status + } + + /** + * @return {string[]} + */ + getTargets() { + return Object.keys(this.targets) + } + + /** + * + */ + poll() { + const LOGPREFIX = `poll():` + + let targets = this.getPollTargets() + if (!targets.length) { + this.poller.warn(`${LOGPREFIX} no targets`) + return + } + + // skip and postpone the poll, if we're in the middle on another poll + // it will be called again from the last .then() at the end of this method + if (this.polling) { + this.logger.debug(`${LOGPREFIX} already polling`) + return + } + + // skip and postpone the poll, if no free slots + // it will be called again from onJobFinished() + if (!this.hasFreeSlots(targets)) { + this.logger.debug(`${LOGPREFIX} no free slots`) + return + } + + // set polling flag + this.polling = true + + // clear postponed polls target list + this.setPollTargets() + + this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) + this.getTasks(targets) + .then(({rows}) => { + let message = `${LOGPREFIX} ${rows} processed` + if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { + // it seems, there are more, so we'll need to perform another query + this.setPollTargets(targets) + message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` + } + this.logger.debug(message) + }) + .catch((error) => { + this.logger.error(`${LOGPREFIX}`, error) + //this.setPollTargets(targets) + }) + .then(() => { + // unset polling flag + this.polling = false + + // perform another poll, if needed + if (this.getPollTargets().length > 0) { + this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) + this.poll() + } + }) + } + + /** + * @param {string|string[]|null} target + */ + setPollTargets(target) { + // when called without parameter, remove all targets + if (target === undefined) { + this.nextpoll = {} + return + } + + // just a fix + if (target === 'null') + target = null + + if (Array.isArray(target)) { + target.forEach(t => { + this.nextpoll[t] = true + }) + } else { + if (target === null) + this.nextpoll = {} + this.nextpoll[target] = true + } + } + + /** + * @return {string[]} + */ + getPollTargets() { + if (null in this.nextpoll) + return Object.keys(this.targets) + + return Object.keys(this.nextpoll) + } + + /** + * @param {string} target + * @return {boolean} + */ + hasPollTarget(target) { + return target in this.nextpoll || null in this.nextpoll + } + + /** + * @param {string|null|string[]} target + * @param {string} reqstatus + * @param {object} data + * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + */ + async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` + + // get new jobs in transaction + await db.beginTransaction() + + let error = null + + let sqlFields = `id, status, target, slot` + let sql + if (data.id) { + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` + } else { + let targets + if (target === null) { + targets = Object.keys(this.targets) + } else if (!Array.isArray(target)) { + targets = [target] + } else { + targets = target + } + let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' + let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` + } + + /** @type {object[]} results */ + let results = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, results) + + /** + * @type {{target: string, slot: string, id: number}[]} + */ + let accepted = [] + + /** + * @type {number[]} + */ + let ignored = [] + + for (let result of results) { + let {id, slot, target, status} = result + id = parseInt(id) + + if (status !== reqstatus) { + error = `status = ${status} != ${reqstatus}` + this.logger.warn(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + if (!target || this.targets[target] === undefined) { + error = `target '${target}' not found (job id=${id})` + this.logger.error(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + if (!slot || this.targets[target].slots[slot] === undefined) { + error = `slot '${slot}' of target '${target}' not found (job id=${id})` + this.logger.error(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + accepted.push({target, slot, id}) + } + + if (accepted.length) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + + if (ignored.length) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) + + await db.commit() + + accepted.forEach(({id, target, slot}) => { + let q = this.targets[target].slots[slot].queue + q.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) + result = RESULT_FAIL + } catch (error) { + this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) + + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) + } + + cb() + } + }) + }) + + return { + error, + rows: results.length, + accepted: accepted.length, + ignored: ignored.length, + } + } + + /** + * @param {number} id + */ + async run(id) { + let command = config.get('launcher').replace(/\{id\}/g, id) + let args = command.split(/ +/) + return new Promise((resolve, reject) => { + this.logger.info(`run(${id}): launching`, args) + + let process = child_process.spawn(args[0], args.slice(1), { + maxBuffer: config.get('max_output_buffer') + }) + + let stdoutChunks = [] + let stderrChunks = [] + + process.on('exit', + /** + * @param {null|number} code + * @param {null|string} signal + */ + (code, signal) => { + let stdout = stdoutChunks.join('') + let stderr = stderrChunks.join('') + + stdoutChunks = undefined + stderrChunks = undefined + + resolve({ + code, + signal, + stdout, + stderr + }) + }) + + process.on('error', (error) => { + reject(error) + }) + + process.stdout.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stdoutChunks.push(data) + }) + + process.stderr.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stderrChunks.push(data) + }) + }) + } + + /** + * @param {number} id + * @param {string} status + * @param {string} result + * @param {object} data + * @return {Promise} + */ + async setJobStatus(id, status, result = RESULT_OK, data = {}) { + let update = { + status, + result + } + switch (status) { + case STATUS_RUNNING: + case STATUS_DONE: + update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() + break + } + if (data.code !== undefined) + update.return_code = data.code + if (data.signal !== undefined) + update.sig = data.signal + if (data.stderr !== undefined) + update.stderr = data.stderr + if (data.stdout !== undefined) + update.stdout = data.stdout + + let list = [] + for (let field in update) { + let val = update[field] + if (val !== null) + val = db.escape(val) + list.push(`${field}=${val}`) + } + + await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id]) + } + + /** + * @param {string[]} inTargets + * @returns {boolean} + */ + hasFreeSlots(inTargets = []) { + const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + + this.logger.debug(`${LOGPREFIX} entered`) + + for (const target in this.targets) { + if (!inTargets.includes(target)) + continue + + for (const slot in this.targets[target].slots) { + const {limit, queue} = this.targets[target].slots[slot] + this.logger.debug(LOGPREFIX, limit, queue.length) + if (queue.length < limit) + return true + } + } + + return false + } + + /** + * @param {string} target + * @param {string} slot + */ + onJobFinished = (target, slot) => { + this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] + if (queue.length < limit && this.hasPollTarget(target)) { + this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) + this.poll() + } + } + +} + +module.exports = { + Worker, + STATUS_WAITING, + STATUS_MANUAL, + STATUS_ACCEPTED, + STATUS_IGNORED, + STATUS_RUNNING, + STATUS_DONE, +} \ No newline at end of file diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js new file mode 100644 index 0000000..620f711 --- /dev/null +++ b/src/lib/workers-list.js @@ -0,0 +1,145 @@ +const intersection = require('lodash/intersection') +const config = require('./config') +const {getLogger} = require('./logger') +const {RequestMessage} = require('./server') +const throttle = require('lodash/throttle') + +class WorkersList { + + constructor() { + /** + * @type {{connection: Connection, targets: string[]}[]} + */ + this.workers = [] + + /** + * @type {object.} + */ + this.targetsToPoke = {} + + /** + * @type {object.} + */ + this.targetsWaitingToPoke = {} + + /** + * @type {NodeJS.Timeout} + */ + this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000) + + /** + * @type {Logger} + */ + this.logger = getLogger('WorkersList') + } + + /** + * @param {Connection} connection + * @param {string[]} targets + */ + add(connection, targets) { + this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) + + this.workers.push({connection, targets}) + connection.on('close', () => { + this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) + this.workers = this.workers.filter(worker => { + return worker.connection !== connection + }) + }) + + let waiting = Object.keys(this.targetsWaitingToPoke) + if (!waiting.length) + return + + let intrs = intersection(waiting, targets) + if (intrs.length) { + this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') + this._pokeWorkerConnection(connection, intrs) + for (let target of intrs) + delete this.targetsWaitingToPoke[target] + this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) + } + } + + /** + * @param {string[]} targets + */ + poke(targets) { + this.logger.debug('poke:', targets) + if (!Array.isArray(targets)) + throw new Error('targets must be Array') + + for (let t of targets) + this.targetsToPoke[t] = true + + this._pokeWorkers() + } + + /** + * @private + */ + _pokeWorkers = throttle(() => { + const targets = Object.keys(this.targetsToPoke) + this.targetsToPoke = {} + + const found = {} + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + intrs.forEach(t => { + found[t] = true + }) + if (intrs.length > 0) + this._pokeWorkerConnection(worker.connection, targets) + } + + for (let target of targets) { + if (!(target in found)) { + this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) + this.targetsWaitingToPoke[target] = true + } + this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) + } + }, config.get('poke_throttle_interval') * 1000, {leading: true}) + + /** + * @param {Connection} connection + * @param {string[]} targets + * @private + */ + _pokeWorkerConnection(connection, targets) { + this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.send( + new RequestMessage('poll', { + targets + }) + ) + } + + /** + * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} + */ + getInfo() { + return this.workers.map(worker => { + return { + remoteAddr: worker.connection.socket?.remoteAddress, + remotePort: worker.connection.socket?.remotePort, + targets: worker.targets + } + }) + } + + /** + * @private + */ + sendPings = () => { + this.workers + .forEach(w => { + this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) + w.connection.send(new RequestMessage('ping')) + }) + } + +} + +module.exports = WorkersList \ No newline at end of file diff --git a/src/logger.js b/src/logger.js deleted file mode 100644 index 22dd679..0000000 --- a/src/logger.js +++ /dev/null @@ -1,97 +0,0 @@ -const log4js = require('log4js') -const fs = require('fs/promises') -const fsConstants = require('fs').constants -const util = require('util') - -module.exports = { - /** - * @param {string} file - * @param {string} levelFile - * @param {string} levelConsole - */ - async init({file, levelFile, levelConsole}) { - const categories = { - default: { - appenders: ['stdout-filter'], - level: 'trace' - } - } - - const appenders = { - stdout: { - type: 'stdout', - level: 'trace' - }, - 'stdout-filter': { - type: 'logLevelFilter', - appender: 'stdout', - level: levelConsole - } - } - - if (file) { - let exists - try { - await fs.stat(file) - exists = true - } catch (error) { - exists = false - } - - // if file exists - if (exists) { - // see if it's writable - try { - // this promise fullfills with undefined upon success - await fs.access(file, fsConstants.W_OK) - } catch (error) { - throw new Error(`file '${file}' is not writable:` + error.message) - } - } else { - // try to create an empty file - let fd - try { - fd = await fs.open(file, 'wx') - } catch (error) { - throw new Error(`failed to create file '${file}': ` + error.message) - } finally { - await fd?.close() - } - } - - categories.default.appenders.push('file-filter') - appenders.file = { - type: 'file', - filename: file, - maxLogSize: 1024 * 1024 * 50, - debug: 'debug' - } - appenders['file-filter'] = { - type: 'logLevelFilter', - appender: 'file', - level: levelFile - } - } - - log4js.configure({ - appenders, - categories - }) - }, - - /** - * @return {Logger} - */ - getLogger(...args) { - return log4js.getLogger(...args) - }, - - /** - * @param cb - */ - shutdown(cb) { - log4js.shutdown(cb) - }, - - Logger: log4js.Logger, -} diff --git a/src/server.js b/src/server.js deleted file mode 100644 index 08eadca..0000000 --- a/src/server.js +++ /dev/null @@ -1,452 +0,0 @@ -const net = require('net') -const EventEmitter = require('events') -const {getLogger} = require('./logger') -const isObject = require('lodash/isObject') - -const EOT = 0x04 - -class Message { - - static REQUEST = 0 - static RESPONSE = 1 - - /** - * @param {number} type - */ - constructor(type) { - /** - * @type {number} - */ - this.type = type - } - - getAsObject() { - return [this.type] - } - -} - -class ResponseMessage extends Message { - constructor() { - super(Message.RESPONSE) - - this.error = null - this.data = null - } - - setError(error) { - this.error = error - return this - } - - setData(data) { - this.data = data - return this - } - - getAsObject() { - return [ - ...super.getAsObject(), - [ - this.error, - this.data - ] - ] - } -} - -class RequestMessage extends Message { - /** - * @param {string} type - * @param {any} data - */ - constructor(type, data = null) { - super(Message.REQUEST) - - /** - * @type string - */ - this.requestType = type - - /** - * @type any - */ - this.requestData = data - - /** - * @type {null|string} - */ - this.password = null - } - - getAsObject() { - let request = { - type: this.requestType - } - - if (this.requestData) - request.data = this.requestData - - return [ - ...super.getAsObject(), - request - ] - } - - /** - * @param {string} password - */ - setPassword(password) { - this.password = password - } -} - -class Server extends EventEmitter { - - constructor() { - super() - - /** - * @type {null|module:net.Server} - */ - this.server = null - - /** - * @type {Logger} - */ - this.logger = getLogger('server') - } - - /** - * @param {number} port - * @param {string} host - */ - start(port, host) { - this.server = net.createServer() - - this.server.on('connection', this.onConnection) - this.server.on('error', this.onError) - this.server.on('listening', this.onListening) - - this.server.listen(port, host) - } - - /** - * @param {module:net.Socket} socket - */ - onConnection = (socket) => { - let connection = new Connection() - connection.setSocket(socket) - connection.on('message', (message) => { - this.emit('message', { - message, - connection - }) - }) - - this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) - } - - onListening = () => { - let addr = this.server.address() - this.logger.info(`server is listening on ${addr.address}:${addr.port}`) - } - - onError = (error) => { - this.logger.error('error: ', error) - } - -} - -class Connection extends EventEmitter { - - constructor() { - super() - - /** - * @type {null|module:net.Socket} - */ - this.socket = null - - /** - * @type {Buffer} - */ - this.data = Buffer.from([]) - - /** - * @type {boolean} - * @private - */ - this._closeEmitted = false - - /** - * @type {null|string} - */ - this.remoteAddress = null - - /** - * @type {null|number} - */ - this.remotePort = null - - /** - * @type {null|number} - */ - this.id = null - - this._setLogger() - } - - /** - * @param {string} host - * @param {number} port - */ - connect(host, port) { - if (this.socket !== null) - throw new Error(`this Connection already has a socket`) - - this.socket = new net.Socket() - this.socket.connect({host, port}) - - this.remoteAddress = host - this.remotePort = port - - this._setId() - this._setLogger() - this._setSocketEvents() - } - - /** - * @param {module:net.Socket} socket - */ - setSocket(socket) { - this.socket = socket - - this.remoteAddress = socket.remoteAddress - this.remotePort = socket.remotePort - - this._setId() - this._setLogger() - this._setSocketEvents() - } - - /** - * @private - */ - _setLogger() { - let addr = this.socket ? this.remoteAddr() : '?' - this.logger = getLogger(``) - } - - /** - * @private - */ - _setId() { - this.id = Math.floor(Math.random() * 10000) - } - - /** - * @private - */ - _setSocketEvents() { - this.socket.on('connect', this.onConnect) - this.socket.on('data', this.onData) - this.socket.on('end', this.onEnd) - this.socket.on('close', this.onClose) - this.socket.on('error', this.onError) - } - - /** - * @param {Buffer} data - * @private - */ - _appendToBuffer(data) { - this.data = Buffer.concat([this.data, data]) - } - - /** - * @return {string} - */ - remoteAddr() { - return this.remoteAddress + ':' + this.remotePort - } - - /** - * @private - */ - _processChunks() { - if (!this.data.length) - return - - this.logger.trace(`processChunks (start):`, this.data) - - /** - * @type {Buffer[]} - */ - let messages = [] - let offset = 0 - let eotPos - do { - eotPos = this.data.indexOf(EOT, offset) - if (eotPos !== -1) { - let message = this.data.slice(offset, eotPos) - messages.push(message) - - this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`) - offset = eotPos + 1 - } - } while (eotPos !== -1 && offset < this.data.length-1) - - if (offset !== 0) { - this.data = this.data.slice(offset) - this.logger.trace(`processChunks: slicing data from ${offset}`) - } - - this.logger.trace(`processChunks (after parsing):`, this.data) - - for (let message of messages) { - try { - let buf = message.toString('utf-8') - this.logger.debug(buf) - - let json = JSON.parse(buf) - this._emitMessage(json) - } catch (error) { - this.logger.error('failed to parse data as JSON') - this.logger.debug(message) - } - } - } - - /** - * @param {object} json - * @private - */ - _emitMessage(json) { - if (!Array.isArray(json)) { - this.logger.error('malformed message, JSON array expected', json) - return - } - - let type = json.shift() - let message - switch (type) { - case Message.REQUEST: { - let data = json.shift() - if (!data || !isObject(data)) { - this.logger.error('malformed REQUEST message') - return - } - - message = new RequestMessage(data.type, data.data || null) - if (data.password) - message.setPassword(data.password) - break - } - - case Message.RESPONSE: { - let data = json.shift() - if (!data || !Array.isArray(data) || data.length < 2) { - this.logger.error('malformed RESPONSE message') - return - } - - message = new ResponseMessage() - message.setError(data[0]).setData(data[1]) - - break - } - - default: - this.logger.error(`malformed message, unexpected type ${type}`) - return - } - - this.emit('message', message) - } - - /** - * @type {Message} data - * @param message - */ - send(message) { - if (!(message instanceof Message)) - throw new Error('send expects Message, got', message) - - // TODO set password! - - let json = JSON.stringify(message.getAsObject()) - let buf = Buffer.concat([ - Buffer.from(json), - Buffer.from([EOT]) - ]) - - this.logger.debug('send:', json) - this.logger.trace('send:', buf) - - try { - this.socket.write(buf) - } catch (error) { - this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error) - } - } - - /** - */ - close() { - try { - this.socket.end() - this.socket.destroy() - this._emitClose() - } catch (error) { - this.logger.error('close:', error) - } - } - - /** - * @private - */ - _emitClose() { - if (this._closeEmitted) - return - - this._closeEmitted = true - this.emit('close') - } - - onConnect = () => { - this.logger.debug('connection established') - this.emit('connect') - } - - onData = (data) => { - this.logger.trace('onData', data) - this._appendToBuffer(data) - this._processChunks() - } - - onEnd = (data) => { - if (data) - this._appendToBuffer(data) - - this._processChunks() - } - - onClose = (hadError) => { - this._emitClose() - this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) - } - - onError = (error) => { - this._emitClose() - this.logger.warn(`socket error:`, error) - } - -} - -module.exports = { - Server, - Connection, - RequestMessage, - ResponseMessage -} \ No newline at end of file diff --git a/src/util.js b/src/util.js deleted file mode 100644 index 6de07e5..0000000 --- a/src/util.js +++ /dev/null @@ -1,9 +0,0 @@ -module.exports = { - timestamp() { - return parseInt(+(new Date())/1000) - }, - - isNumeric(n) { - return !isNaN(parseFloat(n)) && isFinite(n) - } -} \ No newline at end of file diff --git a/src/worker.js b/src/worker.js deleted file mode 100644 index b4beeab..0000000 --- a/src/worker.js +++ /dev/null @@ -1,478 +0,0 @@ -const Queue = require('queue') -const child_process = require('child_process') -const db = require('./db') -const {timestamp} = require('./util') -const {getLogger} = require('./logger') -const EventEmitter = require('events') -const config = require('./config') - -const STATUS_WAITING = 'waiting' -const STATUS_MANUAL = 'manual' -const STATUS_ACCEPTED = 'accepted' -const STATUS_IGNORED = 'ignored' -const STATUS_RUNNING = 'running' -const STATUS_DONE = 'done' - -const RESULT_OK = 'ok' -const RESULT_FAIL = 'fail' - -class Worker extends EventEmitter { - - constructor() { - super() - - /** - * @type {object.}>} - */ - this.targets = {} - - /** - * @type {boolean} - */ - this.polling = false - - /** - * @type {boolean} - */ - this.nextpoll = {} - - /** - * @type {Logger} - */ - this.logger = getLogger('Worker') - } - - /** - * @param {string} target - * @param {string} slot - * @param {number} limit - */ - addSlot(target, slot, limit) { - this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) - - if (this.targets[target] === undefined) - this.targets[target] = {slots: {}} - - if (this.targets[target].slots[slot] !== undefined) - throw new Error(`slot ${slot} for target ${target} has already been added`) - - let queue = Queue({ - concurrency: limit, - autostart: true - }) - queue.on('success', this.onJobFinished.bind(this, target, slot)) - queue.on('error', this.onJobFinished.bind(this, target, slot)) - queue.start() - - this.targets[target].slots[slot] = {limit, queue} - } - - /** - * @param {string} target - * @returns {boolean} - */ - hasTarget(target) { - return (target in this.targets) - } - - /** - * Returns status of all queues. - * - * @return {object} - */ - getStatus() { - let status = {targets: {}} - for (const targetName in this.targets) { - let target = this.targets[targetName] - status.targets[targetName] = {} - for (const slotName in target.slots) { - const {queue, limit} = target.slots[slotName] - status.targets[targetName][slotName] = { - concurrency: queue.concurrency, - limit, - length: queue.length, - } - } - } - return status - } - - /** - * @return {string[]} - */ - getTargets() { - return Object.keys(this.targets) - } - - /** - * - */ - poll() { - const LOGPREFIX = `poll():` - - let targets = this.getPollTargets() - if (!targets.length) { - this.poller.warn(`${LOGPREFIX} no targets`) - return - } - - // skip and postpone the poll, if we're in the middle on another poll - // it will be called again from the last .then() at the end of this method - if (this.polling) { - this.logger.debug(`${LOGPREFIX} already polling`) - return - } - - // skip and postpone the poll, if no free slots - // it will be called again from onJobFinished() - if (!this.hasFreeSlots(targets)) { - this.logger.debug(`${LOGPREFIX} no free slots`) - return - } - - // set polling flag - this.polling = true - - // clear postponed polls target list - this.setPollTargets() - - this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) - this.getTasks(targets) - .then(({rows}) => { - let message = `${LOGPREFIX} ${rows} processed` - if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { - // it seems, there are more, so we'll need to perform another query - this.setPollTargets(targets) - message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` - } - this.logger.debug(message) - }) - .catch((error) => { - this.logger.error(`${LOGPREFIX}`, error) - //this.setPollTargets(targets) - }) - .then(() => { - // unset polling flag - this.polling = false - - // perform another poll, if needed - if (this.getPollTargets().length > 0) { - this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) - this.poll() - } - }) - } - - /** - * @param {string|string[]|null} target - */ - setPollTargets(target) { - // when called without parameter, remove all targets - if (target === undefined) { - this.nextpoll = {} - return - } - - // just a fix - if (target === 'null') - target = null - - if (Array.isArray(target)) { - target.forEach(t => { - this.nextpoll[t] = true - }) - } else { - if (target === null) - this.nextpoll = {} - this.nextpoll[target] = true - } - } - - /** - * @return {string[]} - */ - getPollTargets() { - if (null in this.nextpoll) - return Object.keys(this.targets) - - return Object.keys(this.nextpoll) - } - - /** - * @param {string} target - * @return {boolean} - */ - hasPollTarget(target) { - return target in this.nextpoll || null in this.nextpoll - } - - /** - * @param {string|null|string[]} target - * @param {string} reqstatus - * @param {object} data - * @returns {Promise<{ignored: number, accepted: number, rows: number}>} - */ - async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { - const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` - - // get new jobs in transaction - await db.beginTransaction() - - let error = null - - let sqlFields = `id, status, target, slot` - let sql - if (data.id) { - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` - } else { - let targets - if (target === null) { - targets = Object.keys(this.targets) - } else if (!Array.isArray(target)) { - targets = [target] - } else { - targets = target - } - let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' - let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` - } - - /** @type {object[]} results */ - let results = await db.query(sql) - this.logger.trace(`${LOGPREFIX} query result:`, results) - - /** - * @type {{target: string, slot: string, id: number}[]} - */ - let accepted = [] - - /** - * @type {number[]} - */ - let ignored = [] - - for (let result of results) { - let {id, slot, target, status} = result - id = parseInt(id) - - if (status !== reqstatus) { - error = `status = ${status} != ${reqstatus}` - this.logger.warn(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - if (!target || this.targets[target] === undefined) { - error = `target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - if (!slot || this.targets[target].slots[slot] === undefined) { - error = `slot '${slot}' of target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) - accepted.push({target, slot, id}) - } - - if (accepted.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) - - if (ignored.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) - - await db.commit() - - accepted.forEach(({id, target, slot}) => { - let q = this.targets[target].slots[slot].queue - q.push(async (cb) => { - let data = { - code: null, - signal: null, - stdout: '', - stderr: '' - } - let result = RESULT_OK - - try { - await this.setJobStatus(id, STATUS_RUNNING) - - Object.assign(data, (await this.run(id))) - if (data.code !== 0) - result = RESULT_FAIL - } catch (error) { - this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) - result = RESULT_FAIL - data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') - } finally { - this.emit('job-done', { - id, - result, - ...data - }) - - try { - await this.setJobStatus(id, STATUS_DONE, result, data) - } catch (error) { - this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) - } - - cb() - } - }) - }) - - return { - error, - rows: results.length, - accepted: accepted.length, - ignored: ignored.length, - } - } - - /** - * @param {number} id - */ - async run(id) { - let command = config.get('launcher').replace(/\{id\}/g, id) - let args = command.split(/ +/) - return new Promise((resolve, reject) => { - this.logger.info(`run(${id}): launching`, args) - - let process = child_process.spawn(args[0], args.slice(1), { - maxBuffer: config.get('max_output_buffer') - }) - - let stdoutChunks = [] - let stderrChunks = [] - - process.on('exit', - /** - * @param {null|number} code - * @param {null|string} signal - */ - (code, signal) => { - let stdout = stdoutChunks.join('') - let stderr = stderrChunks.join('') - - stdoutChunks = undefined - stderrChunks = undefined - - resolve({ - code, - signal, - stdout, - stderr - }) - }) - - process.on('error', (error) => { - reject(error) - }) - - process.stdout.on('data', (data) => { - if (data instanceof Buffer) - data = data.toString('utf-8') - stdoutChunks.push(data) - }) - - process.stderr.on('data', (data) => { - if (data instanceof Buffer) - data = data.toString('utf-8') - stderrChunks.push(data) - }) - }) - } - - /** - * @param {number} id - * @param {string} status - * @param {string} result - * @param {object} data - * @return {Promise} - */ - async setJobStatus(id, status, result = RESULT_OK, data = {}) { - let update = { - status, - result - } - switch (status) { - case STATUS_RUNNING: - case STATUS_DONE: - update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() - break - } - if (data.code !== undefined) - update.return_code = data.code - if (data.signal !== undefined) - update.sig = data.signal - if (data.stderr !== undefined) - update.stderr = data.stderr - if (data.stdout !== undefined) - update.stdout = data.stdout - - let list = [] - for (let field in update) { - let val = update[field] - if (val !== null) - val = db.escape(val) - list.push(`${field}=${val}`) - } - - await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id]) - } - - /** - * @param {string[]} inTargets - * @returns {boolean} - */ - hasFreeSlots(inTargets = []) { - const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` - - this.logger.debug(`${LOGPREFIX} entered`) - - for (const target in this.targets) { - if (!inTargets.includes(target)) - continue - - for (const slot in this.targets[target].slots) { - const {limit, queue} = this.targets[target].slots[slot] - this.logger.debug(LOGPREFIX, limit, queue.length) - if (queue.length < limit) - return true - } - } - - return false - } - - /** - * @param {string} target - * @param {string} slot - */ - onJobFinished = (target, slot) => { - this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) - const {queue, limit} = this.targets[target].slots[slot] - if (queue.length < limit && this.hasPollTarget(target)) { - this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) - this.poll() - } - } - -} - -module.exports = { - Worker, - STATUS_WAITING, - STATUS_MANUAL, - STATUS_ACCEPTED, - STATUS_IGNORED, - STATUS_RUNNING, - STATUS_DONE, -} \ No newline at end of file diff --git a/src/workers-list.js b/src/workers-list.js deleted file mode 100644 index 620f711..0000000 --- a/src/workers-list.js +++ /dev/null @@ -1,145 +0,0 @@ -const intersection = require('lodash/intersection') -const config = require('./config') -const {getLogger} = require('./logger') -const {RequestMessage} = require('./server') -const throttle = require('lodash/throttle') - -class WorkersList { - - constructor() { - /** - * @type {{connection: Connection, targets: string[]}[]} - */ - this.workers = [] - - /** - * @type {object.} - */ - this.targetsToPoke = {} - - /** - * @type {object.} - */ - this.targetsWaitingToPoke = {} - - /** - * @type {NodeJS.Timeout} - */ - this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000) - - /** - * @type {Logger} - */ - this.logger = getLogger('WorkersList') - } - - /** - * @param {Connection} connection - * @param {string[]} targets - */ - add(connection, targets) { - this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) - - this.workers.push({connection, targets}) - connection.on('close', () => { - this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) - this.workers = this.workers.filter(worker => { - return worker.connection !== connection - }) - }) - - let waiting = Object.keys(this.targetsWaitingToPoke) - if (!waiting.length) - return - - let intrs = intersection(waiting, targets) - if (intrs.length) { - this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') - this._pokeWorkerConnection(connection, intrs) - for (let target of intrs) - delete this.targetsWaitingToPoke[target] - this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) - } - } - - /** - * @param {string[]} targets - */ - poke(targets) { - this.logger.debug('poke:', targets) - if (!Array.isArray(targets)) - throw new Error('targets must be Array') - - for (let t of targets) - this.targetsToPoke[t] = true - - this._pokeWorkers() - } - - /** - * @private - */ - _pokeWorkers = throttle(() => { - const targets = Object.keys(this.targetsToPoke) - this.targetsToPoke = {} - - const found = {} - for (const worker of this.workers) { - const intrs = intersection(worker.targets, targets) - intrs.forEach(t => { - found[t] = true - }) - if (intrs.length > 0) - this._pokeWorkerConnection(worker.connection, targets) - } - - for (let target of targets) { - if (!(target in found)) { - this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) - this.targetsWaitingToPoke[target] = true - } - this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) - } - }, config.get('poke_throttle_interval') * 1000, {leading: true}) - - /** - * @param {Connection} connection - * @param {string[]} targets - * @private - */ - _pokeWorkerConnection(connection, targets) { - this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) - connection.send( - new RequestMessage('poll', { - targets - }) - ) - } - - /** - * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} - */ - getInfo() { - return this.workers.map(worker => { - return { - remoteAddr: worker.connection.socket?.remoteAddress, - remotePort: worker.connection.socket?.remotePort, - targets: worker.targets - } - }) - } - - /** - * @private - */ - sendPings = () => { - this.workers - .forEach(w => { - this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) - w.connection.send(new RequestMessage('ping')) - }) - } - -} - -module.exports = WorkersList \ No newline at end of file -- cgit v1.2.3