From feaa5065f900a9c031ca7d66d80957040e2ee99f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 28 Feb 2021 16:11:57 +0300 Subject: refactor: moved some files to lib/ --- src/lib/config.js | 135 ++++++++++++++ src/lib/db.js | 49 +++++ src/lib/logger.js | 97 ++++++++++ src/lib/server.js | 452 +++++++++++++++++++++++++++++++++++++++++++++ src/lib/util.js | 9 + src/lib/worker.js | 478 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib/workers-list.js | 145 +++++++++++++++ 7 files changed, 1365 insertions(+) create mode 100644 src/lib/config.js create mode 100644 src/lib/db.js create mode 100644 src/lib/logger.js create mode 100644 src/lib/server.js create mode 100644 src/lib/util.js create mode 100644 src/lib/worker.js create mode 100644 src/lib/workers-list.js (limited to 'src/lib') diff --git a/src/lib/config.js b/src/lib/config.js new file mode 100644 index 0000000..a59615e --- /dev/null +++ b/src/lib/config.js @@ -0,0 +1,135 @@ +const fs = require('fs') +const ini = require('ini') +const {isNumeric} = require('./util') + +let config = null + +function readFile(file) { + if (!fs.existsSync(file)) + throw new Error(`file ${file} not found`) + + return ini.parse(fs.readFileSync(file, 'utf-8')) +} + +function processScheme(source, scheme) { + const result = {} + + for (let key in scheme) { + let opts = scheme[key] + let ne = !(key in source) || !source[key] + if (opts.required === true && ne) + throw new Error(`'${key}' is not defined`) + + let value = source[key] ?? opts.default ?? null + + switch (opts.type) { + case 'int': + if (!isNumeric(value)) + throw new Error(`'${key}' must be an integer`) + value = parseInt(value, 10) + break + + case 'float': + if (!isNumeric(value)) + throw new Error(`'${key}' must be a float`) + value = parseFloat(value) + break + } + + result[key] = value + } + + return result +} + +function parseWorkerConfig(file) { + config = {} + + const raw = readFile(file) + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + master_host: {}, + master_port: {type: 'int', default: 0}, + master_reconnect_timeout: {type: 'int', default: 10}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + + mysql_host: {required: true}, + mysql_port: {required: true, type: 'int'}, + mysql_user: {required: true}, + mysql_password: {required: true}, + mysql_database: {required: true}, + mysql_table: {required: true, default: 'jobs'}, + mysql_fetch_limit: {default: 100, type: 'int'}, + + launcher: {required: true}, + max_output_buffer: {default: 1024*1024, type: 'int'}, + } + Object.assign(config, processScheme(raw, scheme)) + + config.targets = {} + + // targets + for (let target in raw) { + if (target === 'null') + throw new Error('word \'null\' is reserved, please don\'t use it as a target name') + + if (typeof raw[target] !== 'object') + continue + + config.targets[target] = {slots: {}} + for (let slotName in raw[target]) { + let slotLimit = parseInt(raw[target][slotName], 10) + if (slotLimit < 1) + throw new Error(`${target}: slot ${slotName} has invalid limit`) + config.targets[target].slots[slotName] = slotLimit + } + } +} + +function parseMasterConfig(file) { + config = {} + + const raw = readFile(file) + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + ping_interval: {default: 30, type: 'int'}, + poke_throttle_interval: {default: 0.5, type: 'float'}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + } + Object.assign(config, processScheme(raw, scheme)) +} + +/** + * @param {string} key + * @return {string|number|object} + */ +function get(key = null) { + if (key === null) + return config + + if (typeof config !== 'object') + throw new Error(`config is not loaded`) + + if (!(key in config)) + throw new Error(`config: ${key} not found`) + + return config[key] +} + +module.exports = { + parseWorkerConfig, + parseMasterConfig, + get, +} \ No newline at end of file diff --git a/src/lib/db.js b/src/lib/db.js new file mode 100644 index 0000000..7874a92 --- /dev/null +++ b/src/lib/db.js @@ -0,0 +1,49 @@ +const config = require('./config') +const {getLogger} = require('./logger') +const mysql = require('promise-mysql') + +let link +const logger = getLogger('db') + +async function init() { + link = await mysql.createConnection({ + host: config.get('mysql_host'), + user: config.get('mysql_user'), + password: config.get('mysql_password'), + database: config.get('mysql_database') + }) +} + +function wrap(method, isAsync = true, log = true) { + return isAsync ? async function(...args) { + if (log) + logger.trace(`${method}: `, args) + + try { + return await link[method](...args) + } catch (error) { + logger.error(`db.${method}:`, error, link) + + if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR' + || error.code === 'PROTOCOL_CONNECTION_LOST' + || error.fatal === true) { + // try to reconnect and call it again, once + await init() + return await link[method](...args) + } + } + } : function(...args) { + if (log) + logger.trace(`${method}: `, args) + + return link[method](...args) + } +} + +module.exports = { + init, + query: wrap('query'), + beginTransaction: wrap('beginTransaction'), + commit: wrap('commit'), + escape: wrap('escape', false, false) +} \ No newline at end of file diff --git a/src/lib/logger.js b/src/lib/logger.js new file mode 100644 index 0000000..260ba41 --- /dev/null +++ b/src/lib/logger.js @@ -0,0 +1,97 @@ +const log4js = require('log4js') +const fs = require('fs/promises') +const fsConstants = require('fs').constants +const util = require('src/lib/util') + +module.exports = { + /** + * @param {string} file + * @param {string} levelFile + * @param {string} levelConsole + */ + async init({file, levelFile, levelConsole}) { + const categories = { + default: { + appenders: ['stdout-filter'], + level: 'trace' + } + } + + const appenders = { + stdout: { + type: 'stdout', + level: 'trace' + }, + 'stdout-filter': { + type: 'logLevelFilter', + appender: 'stdout', + level: levelConsole + } + } + + if (file) { + let exists + try { + await fs.stat(file) + exists = true + } catch (error) { + exists = false + } + + // if file exists + if (exists) { + // see if it's writable + try { + // this promise fullfills with undefined upon success + await fs.access(file, fsConstants.W_OK) + } catch (error) { + throw new Error(`file '${file}' is not writable:` + error.message) + } + } else { + // try to create an empty file + let fd + try { + fd = await fs.open(file, 'wx') + } catch (error) { + throw new Error(`failed to create file '${file}': ` + error.message) + } finally { + await fd?.close() + } + } + + categories.default.appenders.push('file-filter') + appenders.file = { + type: 'file', + filename: file, + maxLogSize: 1024 * 1024 * 50, + debug: 'debug' + } + appenders['file-filter'] = { + type: 'logLevelFilter', + appender: 'file', + level: levelFile + } + } + + log4js.configure({ + appenders, + categories + }) + }, + + /** + * @return {Logger} + */ + getLogger(...args) { + return log4js.getLogger(...args) + }, + + /** + * @param cb + */ + shutdown(cb) { + log4js.shutdown(cb) + }, + + Logger: log4js.Logger, +} diff --git a/src/lib/server.js b/src/lib/server.js new file mode 100644 index 0000000..08eadca --- /dev/null +++ b/src/lib/server.js @@ -0,0 +1,452 @@ +const net = require('net') +const EventEmitter = require('events') +const {getLogger} = require('./logger') +const isObject = require('lodash/isObject') + +const EOT = 0x04 + +class Message { + + static REQUEST = 0 + static RESPONSE = 1 + + /** + * @param {number} type + */ + constructor(type) { + /** + * @type {number} + */ + this.type = type + } + + getAsObject() { + return [this.type] + } + +} + +class ResponseMessage extends Message { + constructor() { + super(Message.RESPONSE) + + this.error = null + this.data = null + } + + setError(error) { + this.error = error + return this + } + + setData(data) { + this.data = data + return this + } + + getAsObject() { + return [ + ...super.getAsObject(), + [ + this.error, + this.data + ] + ] + } +} + +class RequestMessage extends Message { + /** + * @param {string} type + * @param {any} data + */ + constructor(type, data = null) { + super(Message.REQUEST) + + /** + * @type string + */ + this.requestType = type + + /** + * @type any + */ + this.requestData = data + + /** + * @type {null|string} + */ + this.password = null + } + + getAsObject() { + let request = { + type: this.requestType + } + + if (this.requestData) + request.data = this.requestData + + return [ + ...super.getAsObject(), + request + ] + } + + /** + * @param {string} password + */ + setPassword(password) { + this.password = password + } +} + +class Server extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Server} + */ + this.server = null + + /** + * @type {Logger} + */ + this.logger = getLogger('server') + } + + /** + * @param {number} port + * @param {string} host + */ + start(port, host) { + this.server = net.createServer() + + this.server.on('connection', this.onConnection) + this.server.on('error', this.onError) + this.server.on('listening', this.onListening) + + this.server.listen(port, host) + } + + /** + * @param {module:net.Socket} socket + */ + onConnection = (socket) => { + let connection = new Connection() + connection.setSocket(socket) + connection.on('message', (message) => { + this.emit('message', { + message, + connection + }) + }) + + this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + } + + onListening = () => { + let addr = this.server.address() + this.logger.info(`server is listening on ${addr.address}:${addr.port}`) + } + + onError = (error) => { + this.logger.error('error: ', error) + } + +} + +class Connection extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Socket} + */ + this.socket = null + + /** + * @type {Buffer} + */ + this.data = Buffer.from([]) + + /** + * @type {boolean} + * @private + */ + this._closeEmitted = false + + /** + * @type {null|string} + */ + this.remoteAddress = null + + /** + * @type {null|number} + */ + this.remotePort = null + + /** + * @type {null|number} + */ + this.id = null + + this._setLogger() + } + + /** + * @param {string} host + * @param {number} port + */ + connect(host, port) { + if (this.socket !== null) + throw new Error(`this Connection already has a socket`) + + this.socket = new net.Socket() + this.socket.connect({host, port}) + + this.remoteAddress = host + this.remotePort = port + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @param {module:net.Socket} socket + */ + setSocket(socket) { + this.socket = socket + + this.remoteAddress = socket.remoteAddress + this.remotePort = socket.remotePort + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @private + */ + _setLogger() { + let addr = this.socket ? this.remoteAddr() : '?' + this.logger = getLogger(``) + } + + /** + * @private + */ + _setId() { + this.id = Math.floor(Math.random() * 10000) + } + + /** + * @private + */ + _setSocketEvents() { + this.socket.on('connect', this.onConnect) + this.socket.on('data', this.onData) + this.socket.on('end', this.onEnd) + this.socket.on('close', this.onClose) + this.socket.on('error', this.onError) + } + + /** + * @param {Buffer} data + * @private + */ + _appendToBuffer(data) { + this.data = Buffer.concat([this.data, data]) + } + + /** + * @return {string} + */ + remoteAddr() { + return this.remoteAddress + ':' + this.remotePort + } + + /** + * @private + */ + _processChunks() { + if (!this.data.length) + return + + this.logger.trace(`processChunks (start):`, this.data) + + /** + * @type {Buffer[]} + */ + let messages = [] + let offset = 0 + let eotPos + do { + eotPos = this.data.indexOf(EOT, offset) + if (eotPos !== -1) { + let message = this.data.slice(offset, eotPos) + messages.push(message) + + this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`) + offset = eotPos + 1 + } + } while (eotPos !== -1 && offset < this.data.length-1) + + if (offset !== 0) { + this.data = this.data.slice(offset) + this.logger.trace(`processChunks: slicing data from ${offset}`) + } + + this.logger.trace(`processChunks (after parsing):`, this.data) + + for (let message of messages) { + try { + let buf = message.toString('utf-8') + this.logger.debug(buf) + + let json = JSON.parse(buf) + this._emitMessage(json) + } catch (error) { + this.logger.error('failed to parse data as JSON') + this.logger.debug(message) + } + } + } + + /** + * @param {object} json + * @private + */ + _emitMessage(json) { + if (!Array.isArray(json)) { + this.logger.error('malformed message, JSON array expected', json) + return + } + + let type = json.shift() + let message + switch (type) { + case Message.REQUEST: { + let data = json.shift() + if (!data || !isObject(data)) { + this.logger.error('malformed REQUEST message') + return + } + + message = new RequestMessage(data.type, data.data || null) + if (data.password) + message.setPassword(data.password) + break + } + + case Message.RESPONSE: { + let data = json.shift() + if (!data || !Array.isArray(data) || data.length < 2) { + this.logger.error('malformed RESPONSE message') + return + } + + message = new ResponseMessage() + message.setError(data[0]).setData(data[1]) + + break + } + + default: + this.logger.error(`malformed message, unexpected type ${type}`) + return + } + + this.emit('message', message) + } + + /** + * @type {Message} data + * @param message + */ + send(message) { + if (!(message instanceof Message)) + throw new Error('send expects Message, got', message) + + // TODO set password! + + let json = JSON.stringify(message.getAsObject()) + let buf = Buffer.concat([ + Buffer.from(json), + Buffer.from([EOT]) + ]) + + this.logger.debug('send:', json) + this.logger.trace('send:', buf) + + try { + this.socket.write(buf) + } catch (error) { + this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error) + } + } + + /** + */ + close() { + try { + this.socket.end() + this.socket.destroy() + this._emitClose() + } catch (error) { + this.logger.error('close:', error) + } + } + + /** + * @private + */ + _emitClose() { + if (this._closeEmitted) + return + + this._closeEmitted = true + this.emit('close') + } + + onConnect = () => { + this.logger.debug('connection established') + this.emit('connect') + } + + onData = (data) => { + this.logger.trace('onData', data) + this._appendToBuffer(data) + this._processChunks() + } + + onEnd = (data) => { + if (data) + this._appendToBuffer(data) + + this._processChunks() + } + + onClose = (hadError) => { + this._emitClose() + this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + } + + onError = (error) => { + this._emitClose() + this.logger.warn(`socket error:`, error) + } + +} + +module.exports = { + Server, + Connection, + RequestMessage, + ResponseMessage +} \ No newline at end of file diff --git a/src/lib/util.js b/src/lib/util.js new file mode 100644 index 0000000..6de07e5 --- /dev/null +++ b/src/lib/util.js @@ -0,0 +1,9 @@ +module.exports = { + timestamp() { + return parseInt(+(new Date())/1000) + }, + + isNumeric(n) { + return !isNaN(parseFloat(n)) && isFinite(n) + } +} \ No newline at end of file diff --git a/src/lib/worker.js b/src/lib/worker.js new file mode 100644 index 0000000..b4beeab --- /dev/null +++ b/src/lib/worker.js @@ -0,0 +1,478 @@ +const Queue = require('queue') +const child_process = require('child_process') +const db = require('./db') +const {timestamp} = require('./util') +const {getLogger} = require('./logger') +const EventEmitter = require('events') +const config = require('./config') + +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' +const STATUS_ACCEPTED = 'accepted' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' + +const RESULT_OK = 'ok' +const RESULT_FAIL = 'fail' + +class Worker extends EventEmitter { + + constructor() { + super() + + /** + * @type {object.}>} + */ + this.targets = {} + + /** + * @type {boolean} + */ + this.polling = false + + /** + * @type {boolean} + */ + this.nextpoll = {} + + /** + * @type {Logger} + */ + this.logger = getLogger('Worker') + } + + /** + * @param {string} target + * @param {string} slot + * @param {number} limit + */ + addSlot(target, slot, limit) { + this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + + if (this.targets[target] === undefined) + this.targets[target] = {slots: {}} + + if (this.targets[target].slots[slot] !== undefined) + throw new Error(`slot ${slot} for target ${target} has already been added`) + + let queue = Queue({ + concurrency: limit, + autostart: true + }) + queue.on('success', this.onJobFinished.bind(this, target, slot)) + queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.start() + + this.targets[target].slots[slot] = {limit, queue} + } + + /** + * @param {string} target + * @returns {boolean} + */ + hasTarget(target) { + return (target in this.targets) + } + + /** + * Returns status of all queues. + * + * @return {object} + */ + getStatus() { + let status = {targets: {}} + for (const targetName in this.targets) { + let target = this.targets[targetName] + status.targets[targetName] = {} + for (const slotName in target.slots) { + const {queue, limit} = target.slots[slotName] + status.targets[targetName][slotName] = { + concurrency: queue.concurrency, + limit, + length: queue.length, + } + } + } + return status + } + + /** + * @return {string[]} + */ + getTargets() { + return Object.keys(this.targets) + } + + /** + * + */ + poll() { + const LOGPREFIX = `poll():` + + let targets = this.getPollTargets() + if (!targets.length) { + this.poller.warn(`${LOGPREFIX} no targets`) + return + } + + // skip and postpone the poll, if we're in the middle on another poll + // it will be called again from the last .then() at the end of this method + if (this.polling) { + this.logger.debug(`${LOGPREFIX} already polling`) + return + } + + // skip and postpone the poll, if no free slots + // it will be called again from onJobFinished() + if (!this.hasFreeSlots(targets)) { + this.logger.debug(`${LOGPREFIX} no free slots`) + return + } + + // set polling flag + this.polling = true + + // clear postponed polls target list + this.setPollTargets() + + this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) + this.getTasks(targets) + .then(({rows}) => { + let message = `${LOGPREFIX} ${rows} processed` + if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { + // it seems, there are more, so we'll need to perform another query + this.setPollTargets(targets) + message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` + } + this.logger.debug(message) + }) + .catch((error) => { + this.logger.error(`${LOGPREFIX}`, error) + //this.setPollTargets(targets) + }) + .then(() => { + // unset polling flag + this.polling = false + + // perform another poll, if needed + if (this.getPollTargets().length > 0) { + this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) + this.poll() + } + }) + } + + /** + * @param {string|string[]|null} target + */ + setPollTargets(target) { + // when called without parameter, remove all targets + if (target === undefined) { + this.nextpoll = {} + return + } + + // just a fix + if (target === 'null') + target = null + + if (Array.isArray(target)) { + target.forEach(t => { + this.nextpoll[t] = true + }) + } else { + if (target === null) + this.nextpoll = {} + this.nextpoll[target] = true + } + } + + /** + * @return {string[]} + */ + getPollTargets() { + if (null in this.nextpoll) + return Object.keys(this.targets) + + return Object.keys(this.nextpoll) + } + + /** + * @param {string} target + * @return {boolean} + */ + hasPollTarget(target) { + return target in this.nextpoll || null in this.nextpoll + } + + /** + * @param {string|null|string[]} target + * @param {string} reqstatus + * @param {object} data + * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + */ + async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` + + // get new jobs in transaction + await db.beginTransaction() + + let error = null + + let sqlFields = `id, status, target, slot` + let sql + if (data.id) { + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` + } else { + let targets + if (target === null) { + targets = Object.keys(this.targets) + } else if (!Array.isArray(target)) { + targets = [target] + } else { + targets = target + } + let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' + let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` + } + + /** @type {object[]} results */ + let results = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, results) + + /** + * @type {{target: string, slot: string, id: number}[]} + */ + let accepted = [] + + /** + * @type {number[]} + */ + let ignored = [] + + for (let result of results) { + let {id, slot, target, status} = result + id = parseInt(id) + + if (status !== reqstatus) { + error = `status = ${status} != ${reqstatus}` + this.logger.warn(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + if (!target || this.targets[target] === undefined) { + error = `target '${target}' not found (job id=${id})` + this.logger.error(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + if (!slot || this.targets[target].slots[slot] === undefined) { + error = `slot '${slot}' of target '${target}' not found (job id=${id})` + this.logger.error(`${LOGPREFIX} ${error}`) + ignored.push(id) + continue + } + + this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + accepted.push({target, slot, id}) + } + + if (accepted.length) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + + if (ignored.length) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) + + await db.commit() + + accepted.forEach(({id, target, slot}) => { + let q = this.targets[target].slots[slot].queue + q.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) + result = RESULT_FAIL + } catch (error) { + this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) + + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) + } + + cb() + } + }) + }) + + return { + error, + rows: results.length, + accepted: accepted.length, + ignored: ignored.length, + } + } + + /** + * @param {number} id + */ + async run(id) { + let command = config.get('launcher').replace(/\{id\}/g, id) + let args = command.split(/ +/) + return new Promise((resolve, reject) => { + this.logger.info(`run(${id}): launching`, args) + + let process = child_process.spawn(args[0], args.slice(1), { + maxBuffer: config.get('max_output_buffer') + }) + + let stdoutChunks = [] + let stderrChunks = [] + + process.on('exit', + /** + * @param {null|number} code + * @param {null|string} signal + */ + (code, signal) => { + let stdout = stdoutChunks.join('') + let stderr = stderrChunks.join('') + + stdoutChunks = undefined + stderrChunks = undefined + + resolve({ + code, + signal, + stdout, + stderr + }) + }) + + process.on('error', (error) => { + reject(error) + }) + + process.stdout.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stdoutChunks.push(data) + }) + + process.stderr.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stderrChunks.push(data) + }) + }) + } + + /** + * @param {number} id + * @param {string} status + * @param {string} result + * @param {object} data + * @return {Promise} + */ + async setJobStatus(id, status, result = RESULT_OK, data = {}) { + let update = { + status, + result + } + switch (status) { + case STATUS_RUNNING: + case STATUS_DONE: + update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() + break + } + if (data.code !== undefined) + update.return_code = data.code + if (data.signal !== undefined) + update.sig = data.signal + if (data.stderr !== undefined) + update.stderr = data.stderr + if (data.stdout !== undefined) + update.stdout = data.stdout + + let list = [] + for (let field in update) { + let val = update[field] + if (val !== null) + val = db.escape(val) + list.push(`${field}=${val}`) + } + + await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id]) + } + + /** + * @param {string[]} inTargets + * @returns {boolean} + */ + hasFreeSlots(inTargets = []) { + const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + + this.logger.debug(`${LOGPREFIX} entered`) + + for (const target in this.targets) { + if (!inTargets.includes(target)) + continue + + for (const slot in this.targets[target].slots) { + const {limit, queue} = this.targets[target].slots[slot] + this.logger.debug(LOGPREFIX, limit, queue.length) + if (queue.length < limit) + return true + } + } + + return false + } + + /** + * @param {string} target + * @param {string} slot + */ + onJobFinished = (target, slot) => { + this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] + if (queue.length < limit && this.hasPollTarget(target)) { + this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) + this.poll() + } + } + +} + +module.exports = { + Worker, + STATUS_WAITING, + STATUS_MANUAL, + STATUS_ACCEPTED, + STATUS_IGNORED, + STATUS_RUNNING, + STATUS_DONE, +} \ No newline at end of file diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js new file mode 100644 index 0000000..620f711 --- /dev/null +++ b/src/lib/workers-list.js @@ -0,0 +1,145 @@ +const intersection = require('lodash/intersection') +const config = require('./config') +const {getLogger} = require('./logger') +const {RequestMessage} = require('./server') +const throttle = require('lodash/throttle') + +class WorkersList { + + constructor() { + /** + * @type {{connection: Connection, targets: string[]}[]} + */ + this.workers = [] + + /** + * @type {object.} + */ + this.targetsToPoke = {} + + /** + * @type {object.} + */ + this.targetsWaitingToPoke = {} + + /** + * @type {NodeJS.Timeout} + */ + this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000) + + /** + * @type {Logger} + */ + this.logger = getLogger('WorkersList') + } + + /** + * @param {Connection} connection + * @param {string[]} targets + */ + add(connection, targets) { + this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) + + this.workers.push({connection, targets}) + connection.on('close', () => { + this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) + this.workers = this.workers.filter(worker => { + return worker.connection !== connection + }) + }) + + let waiting = Object.keys(this.targetsWaitingToPoke) + if (!waiting.length) + return + + let intrs = intersection(waiting, targets) + if (intrs.length) { + this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') + this._pokeWorkerConnection(connection, intrs) + for (let target of intrs) + delete this.targetsWaitingToPoke[target] + this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) + } + } + + /** + * @param {string[]} targets + */ + poke(targets) { + this.logger.debug('poke:', targets) + if (!Array.isArray(targets)) + throw new Error('targets must be Array') + + for (let t of targets) + this.targetsToPoke[t] = true + + this._pokeWorkers() + } + + /** + * @private + */ + _pokeWorkers = throttle(() => { + const targets = Object.keys(this.targetsToPoke) + this.targetsToPoke = {} + + const found = {} + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + intrs.forEach(t => { + found[t] = true + }) + if (intrs.length > 0) + this._pokeWorkerConnection(worker.connection, targets) + } + + for (let target of targets) { + if (!(target in found)) { + this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) + this.targetsWaitingToPoke[target] = true + } + this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) + } + }, config.get('poke_throttle_interval') * 1000, {leading: true}) + + /** + * @param {Connection} connection + * @param {string[]} targets + * @private + */ + _pokeWorkerConnection(connection, targets) { + this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.send( + new RequestMessage('poll', { + targets + }) + ) + } + + /** + * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} + */ + getInfo() { + return this.workers.map(worker => { + return { + remoteAddr: worker.connection.socket?.remoteAddress, + remotePort: worker.connection.socket?.remotePort, + targets: worker.targets + } + }) + } + + /** + * @private + */ + sendPings = () => { + this.workers + .forEach(w => { + this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) + w.connection.send(new RequestMessage('ping')) + }) + } + +} + +module.exports = WorkersList \ No newline at end of file -- cgit v1.2.3