diff options
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | README.md | 44 | ||||
-rw-r--r-- | jobd-master.conf.example | 12 | ||||
-rw-r--r-- | jobd.conf.example | 38 | ||||
-rw-r--r-- | package.json | 34 | ||||
-rw-r--r-- | src/config.js | 119 | ||||
-rw-r--r-- | src/db.js | 49 | ||||
-rwxr-xr-x | src/jobd-master.js | 153 | ||||
-rwxr-xr-x | src/jobd.js | 244 | ||||
-rw-r--r-- | src/logger.js | 97 | ||||
-rw-r--r-- | src/server.js | 450 | ||||
-rw-r--r-- | src/util.js | 9 | ||||
-rw-r--r-- | src/worker.js | 472 | ||||
-rw-r--r-- | src/workers-list.js | 145 |
14 files changed, 1868 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a1537b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +node_modules diff --git a/README.md b/README.md new file mode 100644 index 0000000..7354f91 --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# jobd + +**jobd** is a simple job queue daemon written in Node.JS. It uses MySQL +table as a storage. + + +## Installation + +To be written + + +## Usage + +To be written + + +## MySQL setup + +Table scheme. You can add additional fields if you need. + +``` +CREATE TABLE `jobs` ( + `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT, + `target` char(16) NOT NULL, + `slot` char(16) DEFAULT NULL, + `time_created` int(10) UNSIGNED NOT NULL, + `time_started` int(10) UNSIGNED NOT NULL DEFAULT 0, + `time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0, + `status` enum('waiting','manual','accepted','running','done','ignored') NOT NULL DEFAULT 'waiting', + `result` enum('ok','fail') DEFAULT NULL, + `return_code` tinyint(3) UNSIGNED DEFAULT NULL, + `sig` char(10) DEFAULT NULL, + `stdout` mediumtext DEFAULT NULL, + `stderr` mediumtext DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `status_target_idx` (`status`, `target`, `id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +``` + +You can turn `target` and `slot` to `ENUM`, for optimization. + +## License + +BSD-2c
\ No newline at end of file diff --git a/jobd-master.conf.example b/jobd-master.conf.example new file mode 100644 index 0000000..48819d8 --- /dev/null +++ b/jobd-master.conf.example @@ -0,0 +1,12 @@ +; server settings +host = 0.0.0.0 +port = 13597 +;password = + +ping_interval = 30 ; seconds +poke_throttle_interval = 0.5 ; seconds + +; logging +log_file = /tmp/jobd-master.log +log_level_file = info +log_level_console = debug diff --git a/jobd.conf.example b/jobd.conf.example new file mode 100644 index 0000000..0659018 --- /dev/null +++ b/jobd.conf.example @@ -0,0 +1,38 @@ +; server settings +host = 0.0.0.0 +port = 13596 +;password = + +master_host = 127.0.0.1 +master_port = 13597 +master_reconnect_timeout = 10 + +; log +log_file = /tmp/jobd.log +log_level_file = info +log_level_console = debug + +; mysql settings +mysql_host = 10.211.55.6 +mysql_port = 3306 +mysql_user = jobd +mysql_password = password +mysql_database = jobd +mysql_table = jobs +mysql_fetch_limit = 10 + +; launcher command template +launcher = php /Users/ch1p/jobd-launcher.php --id {id} +max_output_buffer = 16777216 + +; +; targets +; + +[server1] +low = 5 +normal = 5 +high = 5 + +[global] +normal = 3
\ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..a1b4220 --- /dev/null +++ b/package.json @@ -0,0 +1,34 @@ +{ + "name": "jobd", + "version": "1.0.0", + "description": "job queue daemon", + "main": "src/jobd", + "homepage": "https://github.com/gch1p/jobd#readme", + "bugs": { + "url" : "https://github.com/gch1p/jobd/issues", + "email": "me@ch1p.io" + }, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "bin": { + "jobd": "./src/jobd.js", + "jobd-master": "./src/jobd-master.js" + }, + "keywords": [], + "author": "Evgeny Zinoviev", + "license": "BSD-2-Clause", + "os": [ + "darwin", + "linux" + ], + "dependencies": { + "ini": "^2.0.0", + "lodash": "^4.17.21", + "log4js": "^6.3.0", + "minimist": "^1.2.5", + "mysql": "^2.18.1", + "promise-mysql": "^5.0.2", + "queue": "^6.0.2" + } +} diff --git a/src/config.js b/src/config.js new file mode 100644 index 0000000..9ee6338 --- /dev/null +++ b/src/config.js @@ -0,0 +1,119 @@ +const fs = require('fs') +const ini = require('ini') +const {isNumeric} = require('./util') + +let workerConfig = { + targets: {}, +} +let masterConfig = {} + +function readFile(file) { + if (!fs.existsSync(file)) + throw new Error(`file ${file} not found`) + + return ini.parse(fs.readFileSync(file, 'utf-8')) +} + +function processScheme(source, scheme) { + const result = {} + + for (let key in scheme) { + let opts = scheme[key] + let ne = !(key in source) || !source[key] + if (opts.required === true && ne) + throw new Error(`'${key}' is not defined`) + + let value = source[key] ?? opts.default ?? null + + switch (opts.type) { + case 'int': + if (!isNumeric(value)) + throw new Error(`'${key}' must be an integer`) + value = parseInt(value, 10) + break + + case 'float': + if (!isNumeric(value)) + throw new Error(`'${key}' must be a float`) + value = parseFloat(value) + break + } + + result[key] = value + } + + return result +} + +function parseWorkerConfig(file) { + const raw = readFile(file) + + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + master_host: {}, + master_port: {type: 'int', default: 0}, + master_reconnect_timeout: {type: 'int', default: 10}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + + mysql_host: {required: true}, + mysql_port: {required: true, type: 'int'}, + mysql_user: {required: true}, + mysql_password: {required: true}, + mysql_database: {required: true}, + mysql_table: {required: true, default: 'jobs'}, + mysql_fetch_limit: {default: 100, type: 'int'}, + + launcher: {required: true}, + max_output_buffer: {default: 1024*1024, type: 'int'}, + } + Object.assign(workerConfig, processScheme(raw, scheme)) + + // targets + for (let target in raw) { + if (target === 'null') + throw new Error('word \'null\' is reserved, please don\'t use it as a target name') + + if (typeof raw[target] !== 'object') + continue + + workerConfig.targets[target] = {slots: {}} + for (let slotName in raw[target]) { + let slotLimit = parseInt(raw[target][slotName], 10) + if (slotLimit < 1) + throw new Error(`${target}: slot ${slotName} has invalid limit`) + workerConfig.targets[target].slots[slotName] = slotLimit + } + } +} + +function parseMasterConfig(file) { + const raw = readFile(file) + + const scheme = { + host: {required: true}, + port: {required: true, type: 'int'}, + password: {}, + + ping_interval: {default: 30, type: 'int'}, + poke_throttle_interval: {default: 0.5, type: 'float'}, + + log_file: {}, + log_level_file: {default: 'warn'}, + log_level_console: {default: 'warn'}, + } + Object.assign(masterConfig, processScheme(raw, scheme)) +} + +module.exports = { + parseWorkerConfig, + parseMasterConfig, + + workerConfig, + masterConfig +}
\ No newline at end of file diff --git a/src/db.js b/src/db.js new file mode 100644 index 0000000..0ffba84 --- /dev/null +++ b/src/db.js @@ -0,0 +1,49 @@ +const {workerConfig} = require('./config') +const {getLogger} = require('./logger') +const mysql = require('promise-mysql') + +let link +const logger = getLogger('db') + +async function init() { + link = await mysql.createConnection({ + host: workerConfig.mysql_host, + user: workerConfig.mysql_user, + password: workerConfig.mysql_password, + database: workerConfig.mysql_database + }) +} + +function wrap(method, isAsync = true, log = true) { + return isAsync ? async function(...args) { + if (log) + logger.trace(`${method}: `, args) + + try { + return await link[method](...args) + } catch (error) { + logger.error(`db.${method}:`, error, link) + + if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR' + || error.code === 'PROTOCOL_CONNECTION_LOST' + || error.fatal === true) { + // try to reconnect and call it again, once + await init() + return await link[method](...args) + } + } + } : function(...args) { + if (log) + logger.trace(`${method}: `, args) + + return link[method](...args) + } +} + +module.exports = { + init, + query: wrap('query'), + beginTransaction: wrap('beginTransaction'), + commit: wrap('commit'), + escape: wrap('escape', false, false) +}
\ No newline at end of file diff --git a/src/jobd-master.js b/src/jobd-master.js new file mode 100755 index 0000000..0c33942 --- /dev/null +++ b/src/jobd-master.js @@ -0,0 +1,153 @@ +#!/usr/bin/env node +const minimist = require('minimist') +const loggerModule = require('./logger') +const configModule = require('./config') +const {Server, ResponseMessage, RequestMessage} = require('./server') +const WorkersList = require('./workers-list') +const {masterConfig} = configModule + + +/** + * @type {Logger} + */ +let logger + +/** + * @type {Server} + */ +let server + +/** + * @type WorkersList + */ +let workers + + +main().catch(e => { + console.error(e) + process.exit(1) +}) + + +async function main() { + if (process.argv.length < 3) { + usage() + process.exit(0) + } + + process.on('SIGINT', term) + process.on('SIGTERM', term) + + const argv = minimist(process.argv.slice(2)) + if (!argv.config) + throw new Error('--config option is required') + + // read config + try { + configModule.parseMasterConfig(argv.config) + } catch (e) { + console.error(`config parsing error: ${e.message}`) + process.exit(1) + } + + await loggerModule.init({ + file: masterConfig.log_file, + levelFile: masterConfig.log_level_file, + levelConsole: masterConfig.log_level_console, + }) + logger = loggerModule.getLogger('jobd-master') + + // console.log(masterConfig) + + workers = new WorkersList() + + // start server + server = new Server() + server.on('message', onMessage) + server.start(masterConfig.port, masterConfig.host) + logger.info('server started') +} + +/** + * @param {RequestMessage|ResponseMessage} message + * @param {Connection} connection + * @return {Promise<*>} + */ +async function onMessage({message, connection}) { + try { + if (!(message instanceof RequestMessage)) { + logger.debug('ignoring message', message) + return + } + + if (message.requestType !== 'ping') + logger.info('onMessage:', message) + + if (masterConfig.password && message.password !== masterConfig.password) { + connection.send(new ResponseMessage().setError('invalid password')) + return connection.close() + } + + switch (message.requestType) { + case 'ping': + connection.send(new ResponseMessage().setError('pong')) + break + + case 'register-worker': { + const targets = message.requestData?.targets || [] + if (!targets.length) { + connection.send(new ResponseMessage().setError(`targets are empty`)) + break + } + + workers.add(connection, targets) + connection.send(new ResponseMessage().setData('ok')) + break + } + + case 'poke': { + const targets = message.requestData?.targets || [] + if (!targets.length) { + connection.send(new ResponseMessage().setError(`targets are empty`)) + break + } + + workers.poke(targets) + connection.send(new ResponseMessage().setData('ok')) + break + } + + case 'status': + const info = workers.getInfo() + connection.send(new ResponseMessage().setData({ + workers: info, + memoryUsage: process.memoryUsage() + })) + break + + default: + connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + break + } + } catch (error) { + logger.error(`error while handling message:`, message, error) + connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + } +} + +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config <path>` + console.log(s) +} + +function term() { + if (logger) + logger.info('shutdown') + + loggerModule.shutdown(function() { + process.exit() + }) +} diff --git a/src/jobd.js b/src/jobd.js new file mode 100755 index 0000000..6567d06 --- /dev/null +++ b/src/jobd.js @@ -0,0 +1,244 @@ +#!/usr/bin/env node +const minimist = require('minimist') +const loggerModule = require('./logger') +const configModule = require('./config') +const db = require('./db') +const {Server, Connection, RequestMessage, ResponseMessage} = require('./server') +const {Worker, STATUS_MANUAL} = require('./worker') + +const {workerConfig} = configModule + + +/** + * @type {Worker} + */ +let worker + +/** + * @type {Logger} + */ +let logger + +/** + * @type {Server} + */ +let server + +/** + * @type {object.<string, Connection>} + */ +let jobDoneAwaiters = {} + + +main().catch(e => { + console.error(e) + process.exit(1) +}) + + +async function main() { + if (process.argv.length < 3) { + usage() + process.exit(0) + } + + process.on('SIGINT', term) + process.on('SIGTERM', term) + + const argv = minimist(process.argv.slice(2)) + if (!argv.config) + throw new Error('--config option is required') + + // read config + try { + configModule.parseWorkerConfig(argv.config) + } catch (e) { + console.error(`config parsing error: ${e.message}`) + process.exit(1) + } + + await loggerModule.init({ + file: workerConfig.log_file, + levelFile: workerConfig.log_level_file, + levelConsole: workerConfig.log_level_console, + }) + logger = loggerModule.getLogger('jobd') + + // console.log(workerConfig) + + // init database + try { + await db.init() + } catch (error) { + logger.error('failed to connect to MySQL', error) + process.exit(1) + } + logger.info('db initialized') + + // init queue + worker = new Worker() + for (let targetName in workerConfig.targets) { + let slots = workerConfig.targets[targetName].slots + // let target = new Target({name: targetName}) + // queue.addTarget(target) + + for (let slotName in slots) { + let slotLimit = slots[slotName] + worker.addSlot(targetName, slotName, slotLimit) + } + } + worker.on('job-done', (data) => { + if (jobDoneAwaiters[data.id] !== undefined) { + jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data)) + jobDoneAwaiters[data.id].close() + delete jobDoneAwaiters[data.id] + } + }) + logger.info('queue initialized') + + // start server + server = new Server() + server.on('message', onMessage) + server.start(workerConfig.port, workerConfig.host) + logger.info('server started') + + // connect to master + if (workerConfig.master_port && workerConfig.master_host) + connectToMaster() +} + + +/** + * @param {RequestMessage|ResponseMessage} message + * @param {Connection} connection + * @return {Promise<*>} + */ +async function onMessage({message, connection}) { + try { + if (!(message instanceof RequestMessage)) { + logger.debug('ignoring message', message) + return + } + + if (message.requestType !== 'ping') + logger.info('onMessage:', message) + + if (workerConfig.password && message.password !== workerConfig.password) { + connection.send(new ResponseMessage().setError('invalid password')) + return connection.close() + } + + switch (message.requestType) { + case 'ping': + connection.send(new ResponseMessage().setData('pong')) + break + + case 'poll': + const targets = message.requestData?.targets || [] + if (!targets.length) { + connection.send(new ResponseMessage().setError('empty targets')) + break + } + + for (const t of targets) { + if (!worker.hasTarget(t)) { + connection.send(new ResponseMessage().setError(`invalid target '${t}'`)) + break + } + } + + worker.setPollTargets(targets) + worker.poll() + + connection.send(new ResponseMessage().setData('ok')); + break + + case 'status': + const qs = worker.getStatus() + connection.send( + new ResponseMessage().setData({ + queue: qs, + jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + memoryUsage: process.memoryUsage() + }) + ) + break + + case 'run-manual': + const {id} = message.requestData + if (id in jobDoneAwaiters) { + connection.send(new ResponseMessage().setError('another client is already waiting this job')) + break + } + + jobDoneAwaiters[id] = connection + + const {accepted} = await worker.getTasks(null, STATUS_MANUAL, {id}) + if (!accepted) { + delete jobDoneAwaiters[id] + connection.send(new ResponseMessage().setError('failed to run task')) // would be nice to provide some error... + } + + break + + default: + connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + break + } + } catch (error) { + logger.error(`error while handling message:`, message, error) + connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + } +} + + +function connectToMaster() { + const connection = new Connection() + connection.connect(workerConfig.master_host, workerConfig.master_port) + + connection.on('connect', function() { + connection.send( + new RequestMessage('register-worker', { + targets: worker.getTargets() + }) + ) + }) + + connection.on('close', () => { + logger.warn(`connectToMaster: connection closed`) + setTimeout(() => { + connectToMaster() + }, workerConfig.master_reconnect_timeout * 1000) + }) + + connection.on('message', (message) => { + if (!(message instanceof RequestMessage)) { + logger.debug('message from master is not a request, hmm... skipping', message) + return + } + + onMessage({message, connection}) + .catch((error) => { + logger.error('connectToMaster: onMessage:', error) + }) + }) +} + + +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config <path>` + console.log(s) +} + + +function term() { + if (logger) + logger.info('shutdown') + + loggerModule.shutdown(function() { + process.exit() + }) +}
\ No newline at end of file diff --git a/src/logger.js b/src/logger.js new file mode 100644 index 0000000..22dd679 --- /dev/null +++ b/src/logger.js @@ -0,0 +1,97 @@ +const log4js = require('log4js') +const fs = require('fs/promises') +const fsConstants = require('fs').constants +const util = require('util') + +module.exports = { + /** + * @param {string} file + * @param {string} levelFile + * @param {string} levelConsole + */ + async init({file, levelFile, levelConsole}) { + const categories = { + default: { + appenders: ['stdout-filter'], + level: 'trace' + } + } + + const appenders = { + stdout: { + type: 'stdout', + level: 'trace' + }, + 'stdout-filter': { + type: 'logLevelFilter', + appender: 'stdout', + level: levelConsole + } + } + + if (file) { + let exists + try { + await fs.stat(file) + exists = true + } catch (error) { + exists = false + } + + // if file exists + if (exists) { + // see if it's writable + try { + // this promise fullfills with undefined upon success + await fs.access(file, fsConstants.W_OK) + } catch (error) { + throw new Error(`file '${file}' is not writable:` + error.message) + } + } else { + // try to create an empty file + let fd + try { + fd = await fs.open(file, 'wx') + } catch (error) { + throw new Error(`failed to create file '${file}': ` + error.message) + } finally { + await fd?.close() + } + } + + categories.default.appenders.push('file-filter') + appenders.file = { + type: 'file', + filename: file, + maxLogSize: 1024 * 1024 * 50, + debug: 'debug' + } + appenders['file-filter'] = { + type: 'logLevelFilter', + appender: 'file', + level: levelFile + } + } + + log4js.configure({ + appenders, + categories + }) + }, + + /** + * @return {Logger} + */ + getLogger(...args) { + return log4js.getLogger(...args) + }, + + /** + * @param cb + */ + shutdown(cb) { + log4js.shutdown(cb) + }, + + Logger: log4js.Logger, +} diff --git a/src/server.js b/src/server.js new file mode 100644 index 0000000..b5eca9a --- /dev/null +++ b/src/server.js @@ -0,0 +1,450 @@ +const net = require('net') +const EventEmitter = require('events') +const {getLogger} = require('./logger') +const isObject = require('lodash/isObject') + +const EOT = 0x04 + +class Message { + + static REQUEST = 0 + static RESPONSE = 1 + + /** + * @param {number} type + */ + constructor(type) { + /** + * @type {number} + */ + this.type = type + } + + getAsObject() { + return [this.type] + } + +} + +class ResponseMessage extends Message { + constructor() { + super(Message.RESPONSE) + + this.error = null + this.data = null + } + + setError(error) { + this.error = error + return this + } + + setData(data) { + this.data = data + return this + } + + getAsObject() { + return [ + ...super.getAsObject(), + [ + this.error, + this.data + ] + ] + } +} + +class RequestMessage extends Message { + /** + * @param {string} type + * @param {any} data + */ + constructor(type, data = null) { + super(Message.REQUEST) + + /** + * @type string + */ + this.requestType = type + + /** + * @type any + */ + this.requestData = data + + /** + * @type {null|string} + */ + this.password = null + } + + getAsObject() { + let request = { + type: this.requestType + } + + if (this.requestData) + request.data = this.requestData + + return [ + ...super.getAsObject(), + request + ] + } + + /** + * @param {string} password + */ + setPassword(password) { + this.password = password + } +} + +class Server extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Server} + */ + this.server = null + + /** + * @type {Logger} + */ + this.logger = getLogger('server') + } + + /** + * @param {number} port + * @param {string} host + */ + start(port, host) { + this.server = net.createServer() + + this.server.on('connection', this.onConnection) + this.server.on('error', this.onError) + this.server.on('listening', this.onListening) + + this.server.listen(port, host) + } + + /** + * @param {module:net.Socket} socket + */ + onConnection = (socket) => { + let connection = new Connection() + connection.setSocket(socket) + connection.on('message', (message) => { + this.emit('message', { + message, + connection + }) + }) + + this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + } + + onListening = () => { + let addr = this.server.address() + this.logger.info(`server is listening on ${addr.address}:${addr.port}`) + } + + onError = (error) => { + this.logger.error('error: ', error) + } + +} + +class Connection extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Socket} + */ + this.socket = null + + /** + * @type {Buffer} + */ + this.data = Buffer.from([]) + + /** + * @type {boolean} + * @private + */ + this._closeEmitted = false + + /** + * @type {null|string} + */ + this.remoteAddress = null + + /** + * @type {null|number} + */ + this.remotePort = null + + /** + * @type {null|number} + */ + this.id = null + + this._setLogger() + } + + /** + * @param {string} host + * @param {number} port + */ + connect(host, port) { + if (this.socket !== null) + throw new Error(`this Connection already has a socket`) + + this.socket = new net.Socket() + this.socket.connect({host, port}) + + this.remoteAddress = host + this.remotePort = port + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @param {module:net.Socket} socket + */ + setSocket(socket) { + this.socket = socket + + this.remoteAddress = socket.remoteAddress + this.remotePort = socket.remotePort + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @private + */ + _setLogger() { + let addr = this.socket ? this.remoteAddr() : '?' + this.logger = getLogger(`<Connection ${this.id} ${addr}>`) + } + + /** + * @private + */ + _setId() { + this.id = Math.floor(Math.random() * 10000) + } + + /** + * @private + */ + _setSocketEvents() { + this.socket.on('connect', this.onConnect) + this.socket.on('data', this.onData) + this.socket.on('end', this.onEnd) + this.socket.on('close', this.onClose) + this.socket.on('error', this.onError) + } + + /** + * @param {Buffer} data + * @private + */ + _appendToBuffer(data) { + this.data = Buffer.concat([this.data, data]) + } + + /** + * @return {string} + */ + remoteAddr() { + return this.remoteAddress + ':' + this.remotePort + } + + /** + * @private + */ + _processChunks() { + if (!this.data.length) + return + + this.logger.trace(`processChunks (start):`, this.data) + + /** + * @type {Buffer[]} + */ + let messages = [] + let offset = 0 + let eotPos + do { + eotPos = this.data.indexOf(EOT, offset) + if (eotPos !== -1) { + let message = this.data.slice(offset, eotPos) + messages.push(message) + + this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`) + offset = eotPos + 1 + } + } while (eotPos !== -1 && offset < this.data.length-1) + + if (offset !== 0) { + this.data = this.data.slice(offset) + this.logger.trace(`processChunks: slicing data from ${offset}`) + } + + this.logger.trace(`processChunks (after parsing):`, this.data) + + for (let message of messages) { + try { + let buf = message.toString('utf-8') + this.logger.debug(buf) + + let json = JSON.parse(buf) + this._emitMessage(json) + } catch (error) { + this.logger.error('failed to parse data as JSON') + this.logger.debug(message) + } + } + } + + /** + * @param {object} json + * @private + */ + _emitMessage(json) { + if (!Array.isArray(json)) { + this.logger.error('malformed message, JSON array expected', json) + return + } + + let type = json.shift() + let message + switch (type) { + case Message.REQUEST: { + let data = json.shift() + if (!data || !isObject(data)) { + this.logger.error('malformed REQUEST message') + return + } + + message = new RequestMessage(data.type, data.data || null) + if (data.password) + message.setPassword(data.password) + break + } + + case Message.RESPONSE: { + let data = json.shift() + if (!data || !Array.isArray(data) || data.length < 2) { + this.logger.error('malformed RESPONSE message') + return + } + + message = new ResponseMessage() + message.setError(data[0]).setData(data[1]) + + break + } + + default: + this.logger.error(`malformed message, unexpected type ${type}`) + return + } + + this.emit('message', message) + } + + /** + * @type {Message} data + * @param message + */ + send(message) { + if (!(message instanceof Message)) + throw new Error('send expects Message, got', message) + + let json = JSON.stringify(message.getAsObject()) + let buf = Buffer.concat([ + Buffer.from(json), + Buffer.from([EOT]) + ]) + + this.logger.debug('send:', json) + this.logger.trace('send:', buf) + + try { + this.socket.write(buf) + } catch (error) { + this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error) + } + } + + /** + */ + close() { + try { + this.socket.end() + this.socket.destroy() + this._emitClose() + } catch (error) { + this.logger.error('close:', error) + } + } + + /** + * @private + */ + _emitClose() { + if (this._closeEmitted) + return + + this._closeEmitted = true + this.emit('close') + } + + onConnect = () => { + this.logger.debug('connection established') + this.emit('connect') + } + + onData = (data) => { + this.logger.trace('onData', data) + this._appendToBuffer(data) + this._processChunks() + } + + onEnd = (data) => { + if (data) + this._appendToBuffer(data) + + this._processChunks() + } + + onClose = (hadError) => { + this._emitClose() + this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + } + + onError = (error) => { + this._emitClose() + this.logger.warn(`socket error:`, error) + } + +} + +module.exports = { + Server, + Connection, + RequestMessage, + ResponseMessage +}
\ No newline at end of file diff --git a/src/util.js b/src/util.js new file mode 100644 index 0000000..6de07e5 --- /dev/null +++ b/src/util.js @@ -0,0 +1,9 @@ +module.exports = { + timestamp() { + return parseInt(+(new Date())/1000) + }, + + isNumeric(n) { + return !isNaN(parseFloat(n)) && isFinite(n) + } +}
\ No newline at end of file diff --git a/src/worker.js b/src/worker.js new file mode 100644 index 0000000..3151b40 --- /dev/null +++ b/src/worker.js @@ -0,0 +1,472 @@ +const Queue = require('queue') +const child_process = require('child_process') +const db = require('./db') +const {timestamp} = require('./util') +const {getLogger} = require('./logger') +const EventEmitter = require('events') +const {workerConfig} = require('./config') + +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' +const STATUS_ACCEPTED = 'accepted' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' + +const RESULT_OK = 'ok' +const RESULT_FAIL = 'fail' + +class Worker extends EventEmitter { + + constructor() { + super() + + /** + * @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>} + */ + this.targets = {} + + /** + * @type {boolean} + */ + this.polling = false + + /** + * @type {boolean} + */ + this.nextpoll = {} + + /** + * @type {Logger} + */ + this.logger = getLogger('Worker') + } + + /** + * @param {string} target + * @param {string} slot + * @param {number} limit + */ + addSlot(target, slot, limit) { + this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + + if (this.targets[target] === undefined) + this.targets[target] = {slots: {}} + + if (this.targets[target].slots[slot] !== undefined) + throw new Error(`slot ${slot} for target ${target} has already been added`) + + let queue = Queue({ + concurrency: limit, + autostart: true + }) + queue.on('success', this.onJobFinished.bind(this, target, slot)) + queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.start() + + this.targets[target].slots[slot] = {limit, queue} + } + + /** + * @param {string} target + * @returns {boolean} + */ + hasTarget(target) { + return (target in this.targets) + } + + /** + * Returns status of all queues. + * + * @return {object} + */ + getStatus() { + let status = {targets: {}} + for (const targetName in this.targets) { + let target = this.targets[targetName] + status.targets[targetName] = {} + for (const slotName in target.slots) { + const {queue, limit} = target.slots[slotName] + status.targets[targetName][slotName] = { + concurrency: queue.concurrency, + limit, + length: queue.length, + } + } + } + return status + } + + /** + * @return {string[]} + */ + getTargets() { + return Object.keys(this.targets) + } + + /** + * + */ + poll() { + const LOGPREFIX = `poll():` + + let targets = this.getPollTargets() + if (!targets.length) { + this.poller.warn(`${LOGPREFIX} no targets`) + return + } + + // skip and postpone the poll, if we're in the middle on another poll + // it will be called again from the last .then() at the end of this method + if (this.polling) { + this.logger.debug(`${LOGPREFIX} already polling`) + return + } + + // skip and postpone the poll, if no free slots + // it will be called again from onJobFinished() + if (!this.hasFreeSlots(targets)) { + this.logger.debug(`${LOGPREFIX} no free slots`) + return + } + + // set polling flag + this.polling = true + + // clear postponed polls target list + this.setPollTargets() + + this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) + this.getTasks(targets) + .then(({rows}) => { + let message = `${LOGPREFIX} ${rows} processed` + if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) { + // it seems, there are more, so we'll need to perform another query + this.setPollTargets(targets) + message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` + } + this.logger.debug(message) + }) + .catch((error) => { + this.logger.error(`${LOGPREFIX}`, error) + //this.setPollTargets(targets) + }) + .then(() => { + // unset polling flag + this.polling = false + + // perform another poll, if needed + if (this.getPollTargets().length > 0) { + this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) + this.poll() + } + }) + } + + /** + * @param {string|string[]|null} target + */ + setPollTargets(target) { + // when called without parameter, remove all targets + if (target === undefined) { + this.nextpoll = {} + return + } + + // just a fix + if (target === 'null') + target = null + + if (Array.isArray(target)) { + target.forEach(t => { + this.nextpoll[t] = true + }) + } else { + if (target === null) + this.nextpoll = {} + this.nextpoll[target] = true + } + } + + /** + * @return {string[]} + */ + getPollTargets() { + if (null in this.nextpoll) + return Object.keys(this.targets) + + return Object.keys(this.nextpoll) + } + + /** + * @param {string} target + * @return {boolean} + */ + hasPollTarget(target) { + return target in this.nextpoll || null in this.nextpoll + } + + /** + * @param {string|null|string[]} target + * @param {string} reqstatus + * @param {object} data + * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + */ + async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` + + // get new jobs in transaction + await db.beginTransaction() + + let sqlFields = `id, status, target, slot` + let sql + if (data.id) { + sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE` + } else { + let targets + if (target === null) { + targets = Object.keys(this.targets) + } else if (!Array.isArray(target)) { + targets = [target] + } else { + targets = target + } + let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : '' + let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` + } + + /** @type {object[]} results */ + let results = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, results) + + /** + * @type {{target: string, slot: string, id: number}[]} + */ + let accepted = [] + + /** + * @type {number[]} + */ + let ignored = [] + + for (let result of results) { + let {id, slot, target, status} = result + id = parseInt(id) + + if (status !== reqstatus) { + this.logger.warn(`${LOGPREFIX} status = ${status} != ${reqstatus}`) + ignored.push(id) + continue + } + + if (!target || this.targets[target] === undefined) { + this.logger.error(`${LOGPREFIX} target '${target}' not found (job id=${id})`) + ignored.push(id) + continue + } + + if (!slot || this.targets[target].slots[slot] === undefined) { + this.logger.error(`${LOGPREFIX} slot '${slot}' of target '${target}' not found (job id=${id})`) + ignored.push(id) + continue + } + + this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + accepted.push({target, slot, id}) + } + + if (accepted.length) + await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + + if (ignored.length) + await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) + + await db.commit() + + accepted.forEach(({id, target, slot}) => { + let q = this.targets[target].slots[slot].queue + q.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) + result = RESULT_FAIL + } catch (error) { + this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) + + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) + } + + cb() + } + }) + }) + + return { + rows: results.length, + accepted: accepted.length, + ignored: ignored.length, + } + } + + /** + * @param {number} id + */ + async run(id) { + let command = workerConfig.launcher.replace(/\{id\}/g, id) + let args = command.split(/ +/) + return new Promise((resolve, reject) => { + this.logger.info(`run(${id}): launching`, args) + + let process = child_process.spawn(args[0], args.slice(1), { + maxBuffer: workerConfig.max_output_buffer + }) + + let stdoutChunks = [] + let stderrChunks = [] + + process.on('exit', + /** + * @param {null|number} code + * @param {null|string} signal + */ + (code, signal) => { + let stdout = stdoutChunks.join('') + let stderr = stderrChunks.join('') + + stdoutChunks = undefined + stderrChunks = undefined + + resolve({ + code, + signal, + stdout, + stderr + }) + }) + + process.on('error', (error) => { + reject(error) + }) + + process.stdout.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stdoutChunks.push(data) + }) + + process.stderr.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stderrChunks.push(data) + }) + }) + } + + /** + * @param {number} id + * @param {string} status + * @param {string} result + * @param {object} data + * @return {Promise<void>} + */ + async setJobStatus(id, status, result = RESULT_OK, data = {}) { + let update = { + status, + result + } + switch (status) { + case STATUS_RUNNING: + case STATUS_DONE: + update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() + break + } + if (data.code !== undefined) + update.return_code = data.code + if (data.signal !== undefined) + update.sig = data.signal + if (data.stderr !== undefined) + update.stderr = data.stderr + if (data.stdout !== undefined) + update.stdout = data.stdout + + let list = [] + for (let field in update) { + let val = update[field] + if (val !== null) + val = db.escape(val) + list.push(`${field}=${val}`) + } + + await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id]) + } + + /** + * @param {string[]} inTargets + * @returns {boolean} + */ + hasFreeSlots(inTargets = []) { + const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + + this.logger.debug(`${LOGPREFIX} entered`) + + for (const target in this.targets) { + if (!inTargets.includes(target)) + continue + + for (const slot in this.targets[target].slots) { + const {limit, queue} = this.targets[target].slots[slot] + this.logger.debug(LOGPREFIX, limit, queue.length) + if (queue.length < limit) + return true + } + } + + return false + } + + /** + * @param {string} target + * @param {string} slot + */ + onJobFinished = (target, slot) => { + this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] + if (queue.length < limit && this.hasPollTarget(target)) { + this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) + this.poll() + } + } + +} + +module.exports = { + Worker, + STATUS_WAITING, + STATUS_MANUAL, + STATUS_ACCEPTED, + STATUS_IGNORED, + STATUS_RUNNING, + STATUS_DONE, +}
\ No newline at end of file diff --git a/src/workers-list.js b/src/workers-list.js new file mode 100644 index 0000000..96a127a --- /dev/null +++ b/src/workers-list.js @@ -0,0 +1,145 @@ +const intersection = require('lodash/intersection') +const {masterConfig} = require('./config') +const {getLogger} = require('./logger') +const {RequestMessage} = require('./server') +const throttle = require('lodash/throttle') + +class WorkersList { + + constructor() { + /** + * @type {{connection: Connection, targets: string[]}[]} + */ + this.workers = [] + + /** + * @type {object.<string, boolean>} + */ + this.targetsToPoke = {} + + /** + * @type {object.<string, boolean>} + */ + this.targetsWaitingToPoke = {} + + /** + * @type {NodeJS.Timeout} + */ + this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000) + + /** + * @type {Logger} + */ + this.logger = getLogger('WorkersList') + } + + /** + * @param {Connection} connection + * @param {string[]} targets + */ + add(connection, targets) { + this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) + + this.workers.push({connection, targets}) + connection.on('close', () => { + this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) + this.workers = this.workers.filter(worker => { + return worker.connection !== connection + }) + }) + + let waiting = Object.keys(this.targetsWaitingToPoke) + if (!waiting.length) + return + + let intrs = intersection(waiting, targets) + if (intrs.length) { + this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') + this._pokeWorkerConnection(connection, intrs) + for (let target of intrs) + delete this.targetsWaitingToPoke[target] + this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) + } + } + + /** + * @param {string[]} targets + */ + poke(targets) { + this.logger.debug('poke:', targets) + if (!Array.isArray(targets)) + throw new Error('targets must be Array') + + for (let t of targets) + this.targetsToPoke[t] = true + + this._pokeWorkers() + } + + /** + * @private + */ + _pokeWorkers = throttle(() => { + const targets = Object.keys(this.targetsToPoke) + this.targetsToPoke = {} + + const found = {} + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + intrs.forEach(t => { + found[t] = true + }) + if (intrs.length > 0) + this._pokeWorkerConnection(worker.connection, targets) + } + + for (let target of targets) { + if (!(target in found)) { + this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) + this.targetsWaitingToPoke[target] = true + } + this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) + } + }, masterConfig.poke_throttle_interval * 1000, {leading: true}) + + /** + * @param {Connection} connection + * @param {string[]} targets + * @private + */ + _pokeWorkerConnection(connection, targets) { + this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.send( + new RequestMessage('poll', { + targets + }) + ) + } + + /** + * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} + */ + getInfo() { + return this.workers.map(worker => { + return { + remoteAddr: worker.connection.socket?.remoteAddress, + remotePort: worker.connection.socket?.remotePort, + targets: worker.targets + } + }) + } + + /** + * @private + */ + sendPings = () => { + this.workers + .forEach(w => { + this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) + w.connection.send(new RequestMessage('ping')) + }) + } + +} + +module.exports = WorkersList
\ No newline at end of file |