summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-02-24 03:59:25 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-02-26 17:35:21 +0300
commit5e7d34458a6e60487393caa4f320ab1cfc1cf8e5 (patch)
tree11df96e6e105558e8bdcd7a1f61f720dd68150af /src
initial commit
Diffstat (limited to 'src')
-rw-r--r--src/config.js119
-rw-r--r--src/db.js49
-rwxr-xr-xsrc/jobd-master.js153
-rwxr-xr-xsrc/jobd.js244
-rw-r--r--src/logger.js97
-rw-r--r--src/server.js450
-rw-r--r--src/util.js9
-rw-r--r--src/worker.js472
-rw-r--r--src/workers-list.js145
9 files changed, 1738 insertions, 0 deletions
diff --git a/src/config.js b/src/config.js
new file mode 100644
index 0000000..9ee6338
--- /dev/null
+++ b/src/config.js
@@ -0,0 +1,119 @@
+const fs = require('fs')
+const ini = require('ini')
+const {isNumeric} = require('./util')
+
+let workerConfig = {
+ targets: {},
+}
+let masterConfig = {}
+
+function readFile(file) {
+ if (!fs.existsSync(file))
+ throw new Error(`file ${file} not found`)
+
+ return ini.parse(fs.readFileSync(file, 'utf-8'))
+}
+
+function processScheme(source, scheme) {
+ const result = {}
+
+ for (let key in scheme) {
+ let opts = scheme[key]
+ let ne = !(key in source) || !source[key]
+ if (opts.required === true && ne)
+ throw new Error(`'${key}' is not defined`)
+
+ let value = source[key] ?? opts.default ?? null
+
+ switch (opts.type) {
+ case 'int':
+ if (!isNumeric(value))
+ throw new Error(`'${key}' must be an integer`)
+ value = parseInt(value, 10)
+ break
+
+ case 'float':
+ if (!isNumeric(value))
+ throw new Error(`'${key}' must be a float`)
+ value = parseFloat(value)
+ break
+ }
+
+ result[key] = value
+ }
+
+ return result
+}
+
+function parseWorkerConfig(file) {
+ const raw = readFile(file)
+
+ const scheme = {
+ host: {required: true},
+ port: {required: true, type: 'int'},
+ password: {},
+
+ master_host: {},
+ master_port: {type: 'int', default: 0},
+ master_reconnect_timeout: {type: 'int', default: 10},
+
+ log_file: {},
+ log_level_file: {default: 'warn'},
+ log_level_console: {default: 'warn'},
+
+ mysql_host: {required: true},
+ mysql_port: {required: true, type: 'int'},
+ mysql_user: {required: true},
+ mysql_password: {required: true},
+ mysql_database: {required: true},
+ mysql_table: {required: true, default: 'jobs'},
+ mysql_fetch_limit: {default: 100, type: 'int'},
+
+ launcher: {required: true},
+ max_output_buffer: {default: 1024*1024, type: 'int'},
+ }
+ Object.assign(workerConfig, processScheme(raw, scheme))
+
+ // targets
+ for (let target in raw) {
+ if (target === 'null')
+ throw new Error('word \'null\' is reserved, please don\'t use it as a target name')
+
+ if (typeof raw[target] !== 'object')
+ continue
+
+ workerConfig.targets[target] = {slots: {}}
+ for (let slotName in raw[target]) {
+ let slotLimit = parseInt(raw[target][slotName], 10)
+ if (slotLimit < 1)
+ throw new Error(`${target}: slot ${slotName} has invalid limit`)
+ workerConfig.targets[target].slots[slotName] = slotLimit
+ }
+ }
+}
+
+function parseMasterConfig(file) {
+ const raw = readFile(file)
+
+ const scheme = {
+ host: {required: true},
+ port: {required: true, type: 'int'},
+ password: {},
+
+ ping_interval: {default: 30, type: 'int'},
+ poke_throttle_interval: {default: 0.5, type: 'float'},
+
+ log_file: {},
+ log_level_file: {default: 'warn'},
+ log_level_console: {default: 'warn'},
+ }
+ Object.assign(masterConfig, processScheme(raw, scheme))
+}
+
+module.exports = {
+ parseWorkerConfig,
+ parseMasterConfig,
+
+ workerConfig,
+ masterConfig
+} \ No newline at end of file
diff --git a/src/db.js b/src/db.js
new file mode 100644
index 0000000..0ffba84
--- /dev/null
+++ b/src/db.js
@@ -0,0 +1,49 @@
+const {workerConfig} = require('./config')
+const {getLogger} = require('./logger')
+const mysql = require('promise-mysql')
+
+let link
+const logger = getLogger('db')
+
+async function init() {
+ link = await mysql.createConnection({
+ host: workerConfig.mysql_host,
+ user: workerConfig.mysql_user,
+ password: workerConfig.mysql_password,
+ database: workerConfig.mysql_database
+ })
+}
+
+function wrap(method, isAsync = true, log = true) {
+ return isAsync ? async function(...args) {
+ if (log)
+ logger.trace(`${method}: `, args)
+
+ try {
+ return await link[method](...args)
+ } catch (error) {
+ logger.error(`db.${method}:`, error, link)
+
+ if ( error.code === 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR'
+ || error.code === 'PROTOCOL_CONNECTION_LOST'
+ || error.fatal === true) {
+ // try to reconnect and call it again, once
+ await init()
+ return await link[method](...args)
+ }
+ }
+ } : function(...args) {
+ if (log)
+ logger.trace(`${method}: `, args)
+
+ return link[method](...args)
+ }
+}
+
+module.exports = {
+ init,
+ query: wrap('query'),
+ beginTransaction: wrap('beginTransaction'),
+ commit: wrap('commit'),
+ escape: wrap('escape', false, false)
+} \ No newline at end of file
diff --git a/src/jobd-master.js b/src/jobd-master.js
new file mode 100755
index 0000000..0c33942
--- /dev/null
+++ b/src/jobd-master.js
@@ -0,0 +1,153 @@
+#!/usr/bin/env node
+const minimist = require('minimist')
+const loggerModule = require('./logger')
+const configModule = require('./config')
+const {Server, ResponseMessage, RequestMessage} = require('./server')
+const WorkersList = require('./workers-list')
+const {masterConfig} = configModule
+
+
+/**
+ * @type {Logger}
+ */
+let logger
+
+/**
+ * @type {Server}
+ */
+let server
+
+/**
+ * @type WorkersList
+ */
+let workers
+
+
+main().catch(e => {
+ console.error(e)
+ process.exit(1)
+})
+
+
+async function main() {
+ if (process.argv.length < 3) {
+ usage()
+ process.exit(0)
+ }
+
+ process.on('SIGINT', term)
+ process.on('SIGTERM', term)
+
+ const argv = minimist(process.argv.slice(2))
+ if (!argv.config)
+ throw new Error('--config option is required')
+
+ // read config
+ try {
+ configModule.parseMasterConfig(argv.config)
+ } catch (e) {
+ console.error(`config parsing error: ${e.message}`)
+ process.exit(1)
+ }
+
+ await loggerModule.init({
+ file: masterConfig.log_file,
+ levelFile: masterConfig.log_level_file,
+ levelConsole: masterConfig.log_level_console,
+ })
+ logger = loggerModule.getLogger('jobd-master')
+
+ // console.log(masterConfig)
+
+ workers = new WorkersList()
+
+ // start server
+ server = new Server()
+ server.on('message', onMessage)
+ server.start(masterConfig.port, masterConfig.host)
+ logger.info('server started')
+}
+
+/**
+ * @param {RequestMessage|ResponseMessage} message
+ * @param {Connection} connection
+ * @return {Promise<*>}
+ */
+async function onMessage({message, connection}) {
+ try {
+ if (!(message instanceof RequestMessage)) {
+ logger.debug('ignoring message', message)
+ return
+ }
+
+ if (message.requestType !== 'ping')
+ logger.info('onMessage:', message)
+
+ if (masterConfig.password && message.password !== masterConfig.password) {
+ connection.send(new ResponseMessage().setError('invalid password'))
+ return connection.close()
+ }
+
+ switch (message.requestType) {
+ case 'ping':
+ connection.send(new ResponseMessage().setError('pong'))
+ break
+
+ case 'register-worker': {
+ const targets = message.requestData?.targets || []
+ if (!targets.length) {
+ connection.send(new ResponseMessage().setError(`targets are empty`))
+ break
+ }
+
+ workers.add(connection, targets)
+ connection.send(new ResponseMessage().setData('ok'))
+ break
+ }
+
+ case 'poke': {
+ const targets = message.requestData?.targets || []
+ if (!targets.length) {
+ connection.send(new ResponseMessage().setError(`targets are empty`))
+ break
+ }
+
+ workers.poke(targets)
+ connection.send(new ResponseMessage().setData('ok'))
+ break
+ }
+
+ case 'status':
+ const info = workers.getInfo()
+ connection.send(new ResponseMessage().setData({
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ }))
+ break
+
+ default:
+ connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ break
+ }
+ } catch (error) {
+ logger.error(`error while handling message:`, message, error)
+ connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ }
+}
+
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path>`
+ console.log(s)
+}
+
+function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ loggerModule.shutdown(function() {
+ process.exit()
+ })
+}
diff --git a/src/jobd.js b/src/jobd.js
new file mode 100755
index 0000000..6567d06
--- /dev/null
+++ b/src/jobd.js
@@ -0,0 +1,244 @@
+#!/usr/bin/env node
+const minimist = require('minimist')
+const loggerModule = require('./logger')
+const configModule = require('./config')
+const db = require('./db')
+const {Server, Connection, RequestMessage, ResponseMessage} = require('./server')
+const {Worker, STATUS_MANUAL} = require('./worker')
+
+const {workerConfig} = configModule
+
+
+/**
+ * @type {Worker}
+ */
+let worker
+
+/**
+ * @type {Logger}
+ */
+let logger
+
+/**
+ * @type {Server}
+ */
+let server
+
+/**
+ * @type {object.<string, Connection>}
+ */
+let jobDoneAwaiters = {}
+
+
+main().catch(e => {
+ console.error(e)
+ process.exit(1)
+})
+
+
+async function main() {
+ if (process.argv.length < 3) {
+ usage()
+ process.exit(0)
+ }
+
+ process.on('SIGINT', term)
+ process.on('SIGTERM', term)
+
+ const argv = minimist(process.argv.slice(2))
+ if (!argv.config)
+ throw new Error('--config option is required')
+
+ // read config
+ try {
+ configModule.parseWorkerConfig(argv.config)
+ } catch (e) {
+ console.error(`config parsing error: ${e.message}`)
+ process.exit(1)
+ }
+
+ await loggerModule.init({
+ file: workerConfig.log_file,
+ levelFile: workerConfig.log_level_file,
+ levelConsole: workerConfig.log_level_console,
+ })
+ logger = loggerModule.getLogger('jobd')
+
+ // console.log(workerConfig)
+
+ // init database
+ try {
+ await db.init()
+ } catch (error) {
+ logger.error('failed to connect to MySQL', error)
+ process.exit(1)
+ }
+ logger.info('db initialized')
+
+ // init queue
+ worker = new Worker()
+ for (let targetName in workerConfig.targets) {
+ let slots = workerConfig.targets[targetName].slots
+ // let target = new Target({name: targetName})
+ // queue.addTarget(target)
+
+ for (let slotName in slots) {
+ let slotLimit = slots[slotName]
+ worker.addSlot(targetName, slotName, slotLimit)
+ }
+ }
+ worker.on('job-done', (data) => {
+ if (jobDoneAwaiters[data.id] !== undefined) {
+ jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data))
+ jobDoneAwaiters[data.id].close()
+ delete jobDoneAwaiters[data.id]
+ }
+ })
+ logger.info('queue initialized')
+
+ // start server
+ server = new Server()
+ server.on('message', onMessage)
+ server.start(workerConfig.port, workerConfig.host)
+ logger.info('server started')
+
+ // connect to master
+ if (workerConfig.master_port && workerConfig.master_host)
+ connectToMaster()
+}
+
+
+/**
+ * @param {RequestMessage|ResponseMessage} message
+ * @param {Connection} connection
+ * @return {Promise<*>}
+ */
+async function onMessage({message, connection}) {
+ try {
+ if (!(message instanceof RequestMessage)) {
+ logger.debug('ignoring message', message)
+ return
+ }
+
+ if (message.requestType !== 'ping')
+ logger.info('onMessage:', message)
+
+ if (workerConfig.password && message.password !== workerConfig.password) {
+ connection.send(new ResponseMessage().setError('invalid password'))
+ return connection.close()
+ }
+
+ switch (message.requestType) {
+ case 'ping':
+ connection.send(new ResponseMessage().setData('pong'))
+ break
+
+ case 'poll':
+ const targets = message.requestData?.targets || []
+ if (!targets.length) {
+ connection.send(new ResponseMessage().setError('empty targets'))
+ break
+ }
+
+ for (const t of targets) {
+ if (!worker.hasTarget(t)) {
+ connection.send(new ResponseMessage().setError(`invalid target '${t}'`))
+ break
+ }
+ }
+
+ worker.setPollTargets(targets)
+ worker.poll()
+
+ connection.send(new ResponseMessage().setData('ok'));
+ break
+
+ case 'status':
+ const qs = worker.getStatus()
+ connection.send(
+ new ResponseMessage().setData({
+ queue: qs,
+ jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
+ memoryUsage: process.memoryUsage()
+ })
+ )
+ break
+
+ case 'run-manual':
+ const {id} = message.requestData
+ if (id in jobDoneAwaiters) {
+ connection.send(new ResponseMessage().setError('another client is already waiting this job'))
+ break
+ }
+
+ jobDoneAwaiters[id] = connection
+
+ const {accepted} = await worker.getTasks(null, STATUS_MANUAL, {id})
+ if (!accepted) {
+ delete jobDoneAwaiters[id]
+ connection.send(new ResponseMessage().setError('failed to run task')) // would be nice to provide some error...
+ }
+
+ break
+
+ default:
+ connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ break
+ }
+ } catch (error) {
+ logger.error(`error while handling message:`, message, error)
+ connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ }
+}
+
+
+function connectToMaster() {
+ const connection = new Connection()
+ connection.connect(workerConfig.master_host, workerConfig.master_port)
+
+ connection.on('connect', function() {
+ connection.send(
+ new RequestMessage('register-worker', {
+ targets: worker.getTargets()
+ })
+ )
+ })
+
+ connection.on('close', () => {
+ logger.warn(`connectToMaster: connection closed`)
+ setTimeout(() => {
+ connectToMaster()
+ }, workerConfig.master_reconnect_timeout * 1000)
+ })
+
+ connection.on('message', (message) => {
+ if (!(message instanceof RequestMessage)) {
+ logger.debug('message from master is not a request, hmm... skipping', message)
+ return
+ }
+
+ onMessage({message, connection})
+ .catch((error) => {
+ logger.error('connectToMaster: onMessage:', error)
+ })
+ })
+}
+
+
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path>`
+ console.log(s)
+}
+
+
+function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ loggerModule.shutdown(function() {
+ process.exit()
+ })
+} \ No newline at end of file
diff --git a/src/logger.js b/src/logger.js
new file mode 100644
index 0000000..22dd679
--- /dev/null
+++ b/src/logger.js
@@ -0,0 +1,97 @@
+const log4js = require('log4js')
+const fs = require('fs/promises')
+const fsConstants = require('fs').constants
+const util = require('util')
+
+module.exports = {
+ /**
+ * @param {string} file
+ * @param {string} levelFile
+ * @param {string} levelConsole
+ */
+ async init({file, levelFile, levelConsole}) {
+ const categories = {
+ default: {
+ appenders: ['stdout-filter'],
+ level: 'trace'
+ }
+ }
+
+ const appenders = {
+ stdout: {
+ type: 'stdout',
+ level: 'trace'
+ },
+ 'stdout-filter': {
+ type: 'logLevelFilter',
+ appender: 'stdout',
+ level: levelConsole
+ }
+ }
+
+ if (file) {
+ let exists
+ try {
+ await fs.stat(file)
+ exists = true
+ } catch (error) {
+ exists = false
+ }
+
+ // if file exists
+ if (exists) {
+ // see if it's writable
+ try {
+ // this promise fullfills with undefined upon success
+ await fs.access(file, fsConstants.W_OK)
+ } catch (error) {
+ throw new Error(`file '${file}' is not writable:` + error.message)
+ }
+ } else {
+ // try to create an empty file
+ let fd
+ try {
+ fd = await fs.open(file, 'wx')
+ } catch (error) {
+ throw new Error(`failed to create file '${file}': ` + error.message)
+ } finally {
+ await fd?.close()
+ }
+ }
+
+ categories.default.appenders.push('file-filter')
+ appenders.file = {
+ type: 'file',
+ filename: file,
+ maxLogSize: 1024 * 1024 * 50,
+ debug: 'debug'
+ }
+ appenders['file-filter'] = {
+ type: 'logLevelFilter',
+ appender: 'file',
+ level: levelFile
+ }
+ }
+
+ log4js.configure({
+ appenders,
+ categories
+ })
+ },
+
+ /**
+ * @return {Logger}
+ */
+ getLogger(...args) {
+ return log4js.getLogger(...args)
+ },
+
+ /**
+ * @param cb
+ */
+ shutdown(cb) {
+ log4js.shutdown(cb)
+ },
+
+ Logger: log4js.Logger,
+}
diff --git a/src/server.js b/src/server.js
new file mode 100644
index 0000000..b5eca9a
--- /dev/null
+++ b/src/server.js
@@ -0,0 +1,450 @@
+const net = require('net')
+const EventEmitter = require('events')
+const {getLogger} = require('./logger')
+const isObject = require('lodash/isObject')
+
+const EOT = 0x04
+
+class Message {
+
+ static REQUEST = 0
+ static RESPONSE = 1
+
+ /**
+ * @param {number} type
+ */
+ constructor(type) {
+ /**
+ * @type {number}
+ */
+ this.type = type
+ }
+
+ getAsObject() {
+ return [this.type]
+ }
+
+}
+
+class ResponseMessage extends Message {
+ constructor() {
+ super(Message.RESPONSE)
+
+ this.error = null
+ this.data = null
+ }
+
+ setError(error) {
+ this.error = error
+ return this
+ }
+
+ setData(data) {
+ this.data = data
+ return this
+ }
+
+ getAsObject() {
+ return [
+ ...super.getAsObject(),
+ [
+ this.error,
+ this.data
+ ]
+ ]
+ }
+}
+
+class RequestMessage extends Message {
+ /**
+ * @param {string} type
+ * @param {any} data
+ */
+ constructor(type, data = null) {
+ super(Message.REQUEST)
+
+ /**
+ * @type string
+ */
+ this.requestType = type
+
+ /**
+ * @type any
+ */
+ this.requestData = data
+
+ /**
+ * @type {null|string}
+ */
+ this.password = null
+ }
+
+ getAsObject() {
+ let request = {
+ type: this.requestType
+ }
+
+ if (this.requestData)
+ request.data = this.requestData
+
+ return [
+ ...super.getAsObject(),
+ request
+ ]
+ }
+
+ /**
+ * @param {string} password
+ */
+ setPassword(password) {
+ this.password = password
+ }
+}
+
+class Server extends EventEmitter {
+
+ constructor() {
+ super()
+
+ /**
+ * @type {null|module:net.Server}
+ */
+ this.server = null
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('server')
+ }
+
+ /**
+ * @param {number} port
+ * @param {string} host
+ */
+ start(port, host) {
+ this.server = net.createServer()
+
+ this.server.on('connection', this.onConnection)
+ this.server.on('error', this.onError)
+ this.server.on('listening', this.onListening)
+
+ this.server.listen(port, host)
+ }
+
+ /**
+ * @param {module:net.Socket} socket
+ */
+ onConnection = (socket) => {
+ let connection = new Connection()
+ connection.setSocket(socket)
+ connection.on('message', (message) => {
+ this.emit('message', {
+ message,
+ connection
+ })
+ })
+
+ this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
+ }
+
+ onListening = () => {
+ let addr = this.server.address()
+ this.logger.info(`server is listening on ${addr.address}:${addr.port}`)
+ }
+
+ onError = (error) => {
+ this.logger.error('error: ', error)
+ }
+
+}
+
+class Connection extends EventEmitter {
+
+ constructor() {
+ super()
+
+ /**
+ * @type {null|module:net.Socket}
+ */
+ this.socket = null
+
+ /**
+ * @type {Buffer}
+ */
+ this.data = Buffer.from([])
+
+ /**
+ * @type {boolean}
+ * @private
+ */
+ this._closeEmitted = false
+
+ /**
+ * @type {null|string}
+ */
+ this.remoteAddress = null
+
+ /**
+ * @type {null|number}
+ */
+ this.remotePort = null
+
+ /**
+ * @type {null|number}
+ */
+ this.id = null
+
+ this._setLogger()
+ }
+
+ /**
+ * @param {string} host
+ * @param {number} port
+ */
+ connect(host, port) {
+ if (this.socket !== null)
+ throw new Error(`this Connection already has a socket`)
+
+ this.socket = new net.Socket()
+ this.socket.connect({host, port})
+
+ this.remoteAddress = host
+ this.remotePort = port
+
+ this._setId()
+ this._setLogger()
+ this._setSocketEvents()
+ }
+
+ /**
+ * @param {module:net.Socket} socket
+ */
+ setSocket(socket) {
+ this.socket = socket
+
+ this.remoteAddress = socket.remoteAddress
+ this.remotePort = socket.remotePort
+
+ this._setId()
+ this._setLogger()
+ this._setSocketEvents()
+ }
+
+ /**
+ * @private
+ */
+ _setLogger() {
+ let addr = this.socket ? this.remoteAddr() : '?'
+ this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
+ }
+
+ /**
+ * @private
+ */
+ _setId() {
+ this.id = Math.floor(Math.random() * 10000)
+ }
+
+ /**
+ * @private
+ */
+ _setSocketEvents() {
+ this.socket.on('connect', this.onConnect)
+ this.socket.on('data', this.onData)
+ this.socket.on('end', this.onEnd)
+ this.socket.on('close', this.onClose)
+ this.socket.on('error', this.onError)
+ }
+
+ /**
+ * @param {Buffer} data
+ * @private
+ */
+ _appendToBuffer(data) {
+ this.data = Buffer.concat([this.data, data])
+ }
+
+ /**
+ * @return {string}
+ */
+ remoteAddr() {
+ return this.remoteAddress + ':' + this.remotePort
+ }
+
+ /**
+ * @private
+ */
+ _processChunks() {
+ if (!this.data.length)
+ return
+
+ this.logger.trace(`processChunks (start):`, this.data)
+
+ /**
+ * @type {Buffer[]}
+ */
+ let messages = []
+ let offset = 0
+ let eotPos
+ do {
+ eotPos = this.data.indexOf(EOT, offset)
+ if (eotPos !== -1) {
+ let message = this.data.slice(offset, eotPos)
+ messages.push(message)
+
+ this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`)
+ offset = eotPos + 1
+ }
+ } while (eotPos !== -1 && offset < this.data.length-1)
+
+ if (offset !== 0) {
+ this.data = this.data.slice(offset)
+ this.logger.trace(`processChunks: slicing data from ${offset}`)
+ }
+
+ this.logger.trace(`processChunks (after parsing):`, this.data)
+
+ for (let message of messages) {
+ try {
+ let buf = message.toString('utf-8')
+ this.logger.debug(buf)
+
+ let json = JSON.parse(buf)
+ this._emitMessage(json)
+ } catch (error) {
+ this.logger.error('failed to parse data as JSON')
+ this.logger.debug(message)
+ }
+ }
+ }
+
+ /**
+ * @param {object} json
+ * @private
+ */
+ _emitMessage(json) {
+ if (!Array.isArray(json)) {
+ this.logger.error('malformed message, JSON array expected', json)
+ return
+ }
+
+ let type = json.shift()
+ let message
+ switch (type) {
+ case Message.REQUEST: {
+ let data = json.shift()
+ if (!data || !isObject(data)) {
+ this.logger.error('malformed REQUEST message')
+ return
+ }
+
+ message = new RequestMessage(data.type, data.data || null)
+ if (data.password)
+ message.setPassword(data.password)
+ break
+ }
+
+ case Message.RESPONSE: {
+ let data = json.shift()
+ if (!data || !Array.isArray(data) || data.length < 2) {
+ this.logger.error('malformed RESPONSE message')
+ return
+ }
+
+ message = new ResponseMessage()
+ message.setError(data[0]).setData(data[1])
+
+ break
+ }
+
+ default:
+ this.logger.error(`malformed message, unexpected type ${type}`)
+ return
+ }
+
+ this.emit('message', message)
+ }
+
+ /**
+ * @type {Message} data
+ * @param message
+ */
+ send(message) {
+ if (!(message instanceof Message))
+ throw new Error('send expects Message, got', message)
+
+ let json = JSON.stringify(message.getAsObject())
+ let buf = Buffer.concat([
+ Buffer.from(json),
+ Buffer.from([EOT])
+ ])
+
+ this.logger.debug('send:', json)
+ this.logger.trace('send:', buf)
+
+ try {
+ this.socket.write(buf)
+ } catch (error) {
+ this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error)
+ }
+ }
+
+ /**
+ */
+ close() {
+ try {
+ this.socket.end()
+ this.socket.destroy()
+ this._emitClose()
+ } catch (error) {
+ this.logger.error('close:', error)
+ }
+ }
+
+ /**
+ * @private
+ */
+ _emitClose() {
+ if (this._closeEmitted)
+ return
+
+ this._closeEmitted = true
+ this.emit('close')
+ }
+
+ onConnect = () => {
+ this.logger.debug('connection established')
+ this.emit('connect')
+ }
+
+ onData = (data) => {
+ this.logger.trace('onData', data)
+ this._appendToBuffer(data)
+ this._processChunks()
+ }
+
+ onEnd = (data) => {
+ if (data)
+ this._appendToBuffer(data)
+
+ this._processChunks()
+ }
+
+ onClose = (hadError) => {
+ this._emitClose()
+ this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
+ }
+
+ onError = (error) => {
+ this._emitClose()
+ this.logger.warn(`socket error:`, error)
+ }
+
+}
+
+module.exports = {
+ Server,
+ Connection,
+ RequestMessage,
+ ResponseMessage
+} \ No newline at end of file
diff --git a/src/util.js b/src/util.js
new file mode 100644
index 0000000..6de07e5
--- /dev/null
+++ b/src/util.js
@@ -0,0 +1,9 @@
+module.exports = {
+ timestamp() {
+ return parseInt(+(new Date())/1000)
+ },
+
+ isNumeric(n) {
+ return !isNaN(parseFloat(n)) && isFinite(n)
+ }
+} \ No newline at end of file
diff --git a/src/worker.js b/src/worker.js
new file mode 100644
index 0000000..3151b40
--- /dev/null
+++ b/src/worker.js
@@ -0,0 +1,472 @@
+const Queue = require('queue')
+const child_process = require('child_process')
+const db = require('./db')
+const {timestamp} = require('./util')
+const {getLogger} = require('./logger')
+const EventEmitter = require('events')
+const {workerConfig} = require('./config')
+
+const STATUS_WAITING = 'waiting'
+const STATUS_MANUAL = 'manual'
+const STATUS_ACCEPTED = 'accepted'
+const STATUS_IGNORED = 'ignored'
+const STATUS_RUNNING = 'running'
+const STATUS_DONE = 'done'
+
+const RESULT_OK = 'ok'
+const RESULT_FAIL = 'fail'
+
+class Worker extends EventEmitter {
+
+ constructor() {
+ super()
+
+ /**
+ * @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>}
+ */
+ this.targets = {}
+
+ /**
+ * @type {boolean}
+ */
+ this.polling = false
+
+ /**
+ * @type {boolean}
+ */
+ this.nextpoll = {}
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('Worker')
+ }
+
+ /**
+ * @param {string} target
+ * @param {string} slot
+ * @param {number} limit
+ */
+ addSlot(target, slot, limit) {
+ this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
+
+ if (this.targets[target] === undefined)
+ this.targets[target] = {slots: {}}
+
+ if (this.targets[target].slots[slot] !== undefined)
+ throw new Error(`slot ${slot} for target ${target} has already been added`)
+
+ let queue = Queue({
+ concurrency: limit,
+ autostart: true
+ })
+ queue.on('success', this.onJobFinished.bind(this, target, slot))
+ queue.on('error', this.onJobFinished.bind(this, target, slot))
+ queue.start()
+
+ this.targets[target].slots[slot] = {limit, queue}
+ }
+
+ /**
+ * @param {string} target
+ * @returns {boolean}
+ */
+ hasTarget(target) {
+ return (target in this.targets)
+ }
+
+ /**
+ * Returns status of all queues.
+ *
+ * @return {object}
+ */
+ getStatus() {
+ let status = {targets: {}}
+ for (const targetName in this.targets) {
+ let target = this.targets[targetName]
+ status.targets[targetName] = {}
+ for (const slotName in target.slots) {
+ const {queue, limit} = target.slots[slotName]
+ status.targets[targetName][slotName] = {
+ concurrency: queue.concurrency,
+ limit,
+ length: queue.length,
+ }
+ }
+ }
+ return status
+ }
+
+ /**
+ * @return {string[]}
+ */
+ getTargets() {
+ return Object.keys(this.targets)
+ }
+
+ /**
+ *
+ */
+ poll() {
+ const LOGPREFIX = `poll():`
+
+ let targets = this.getPollTargets()
+ if (!targets.length) {
+ this.poller.warn(`${LOGPREFIX} no targets`)
+ return
+ }
+
+ // skip and postpone the poll, if we're in the middle on another poll
+ // it will be called again from the last .then() at the end of this method
+ if (this.polling) {
+ this.logger.debug(`${LOGPREFIX} already polling`)
+ return
+ }
+
+ // skip and postpone the poll, if no free slots
+ // it will be called again from onJobFinished()
+ if (!this.hasFreeSlots(targets)) {
+ this.logger.debug(`${LOGPREFIX} no free slots`)
+ return
+ }
+
+ // set polling flag
+ this.polling = true
+
+ // clear postponed polls target list
+ this.setPollTargets()
+
+ this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`)
+ this.getTasks(targets)
+ .then(({rows}) => {
+ let message = `${LOGPREFIX} ${rows} processed`
+ if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) {
+ // it seems, there are more, so we'll need to perform another query
+ this.setPollTargets(targets)
+ message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
+ }
+ this.logger.debug(message)
+ })
+ .catch((error) => {
+ this.logger.error(`${LOGPREFIX}`, error)
+ //this.setPollTargets(targets)
+ })
+ .then(() => {
+ // unset polling flag
+ this.polling = false
+
+ // perform another poll, if needed
+ if (this.getPollTargets().length > 0) {
+ this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`)
+ this.poll()
+ }
+ })
+ }
+
+ /**
+ * @param {string|string[]|null} target
+ */
+ setPollTargets(target) {
+ // when called without parameter, remove all targets
+ if (target === undefined) {
+ this.nextpoll = {}
+ return
+ }
+
+ // just a fix
+ if (target === 'null')
+ target = null
+
+ if (Array.isArray(target)) {
+ target.forEach(t => {
+ this.nextpoll[t] = true
+ })
+ } else {
+ if (target === null)
+ this.nextpoll = {}
+ this.nextpoll[target] = true
+ }
+ }
+
+ /**
+ * @return {string[]}
+ */
+ getPollTargets() {
+ if (null in this.nextpoll)
+ return Object.keys(this.targets)
+
+ return Object.keys(this.nextpoll)
+ }
+
+ /**
+ * @param {string} target
+ * @return {boolean}
+ */
+ hasPollTarget(target) {
+ return target in this.nextpoll || null in this.nextpoll
+ }
+
+ /**
+ * @param {string|null|string[]} target
+ * @param {string} reqstatus
+ * @param {object} data
+ * @returns {Promise<{ignored: number, accepted: number, rows: number}>}
+ */
+ async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) {
+ const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):`
+
+ // get new jobs in transaction
+ await db.beginTransaction()
+
+ let sqlFields = `id, status, target, slot`
+ let sql
+ if (data.id) {
+ sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE`
+ } else {
+ let targets
+ if (target === null) {
+ targets = Object.keys(this.targets)
+ } else if (!Array.isArray(target)) {
+ targets = [target]
+ } else {
+ targets = target
+ }
+ let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : ''
+ let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
+ sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
+ }
+
+ /** @type {object[]} results */
+ let results = await db.query(sql)
+ this.logger.trace(`${LOGPREFIX} query result:`, results)
+
+ /**
+ * @type {{target: string, slot: string, id: number}[]}
+ */
+ let accepted = []
+
+ /**
+ * @type {number[]}
+ */
+ let ignored = []
+
+ for (let result of results) {
+ let {id, slot, target, status} = result
+ id = parseInt(id)
+
+ if (status !== reqstatus) {
+ this.logger.warn(`${LOGPREFIX} status = ${status} != ${reqstatus}`)
+ ignored.push(id)
+ continue
+ }
+
+ if (!target || this.targets[target] === undefined) {
+ this.logger.error(`${LOGPREFIX} target '${target}' not found (job id=${id})`)
+ ignored.push(id)
+ continue
+ }
+
+ if (!slot || this.targets[target].slots[slot] === undefined) {
+ this.logger.error(`${LOGPREFIX} slot '${slot}' of target '${target}' not found (job id=${id})`)
+ ignored.push(id)
+ continue
+ }
+
+ this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
+ accepted.push({target, slot, id})
+ }
+
+ if (accepted.length)
+ await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
+
+ if (ignored.length)
+ await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
+
+ await db.commit()
+
+ accepted.forEach(({id, target, slot}) => {
+ let q = this.targets[target].slots[slot].queue
+ q.push(async (cb) => {
+ let data = {
+ code: null,
+ signal: null,
+ stdout: '',
+ stderr: ''
+ }
+ let result = RESULT_OK
+
+ try {
+ await this.setJobStatus(id, STATUS_RUNNING)
+
+ Object.assign(data, (await this.run(id)))
+ if (data.code !== 0)
+ result = RESULT_FAIL
+ } catch (error) {
+ this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error)
+ result = RESULT_FAIL
+ data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
+ } finally {
+ this.emit('job-done', {
+ id,
+ result,
+ ...data
+ })
+
+ try {
+ await this.setJobStatus(id, STATUS_DONE, result, data)
+ } catch (error) {
+ this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error)
+ }
+
+ cb()
+ }
+ })
+ })
+
+ return {
+ rows: results.length,
+ accepted: accepted.length,
+ ignored: ignored.length,
+ }
+ }
+
+ /**
+ * @param {number} id
+ */
+ async run(id) {
+ let command = workerConfig.launcher.replace(/\{id\}/g, id)
+ let args = command.split(/ +/)
+ return new Promise((resolve, reject) => {
+ this.logger.info(`run(${id}): launching`, args)
+
+ let process = child_process.spawn(args[0], args.slice(1), {
+ maxBuffer: workerConfig.max_output_buffer
+ })
+
+ let stdoutChunks = []
+ let stderrChunks = []
+
+ process.on('exit',
+ /**
+ * @param {null|number} code
+ * @param {null|string} signal
+ */
+ (code, signal) => {
+ let stdout = stdoutChunks.join('')
+ let stderr = stderrChunks.join('')
+
+ stdoutChunks = undefined
+ stderrChunks = undefined
+
+ resolve({
+ code,
+ signal,
+ stdout,
+ stderr
+ })
+ })
+
+ process.on('error', (error) => {
+ reject(error)
+ })
+
+ process.stdout.on('data', (data) => {
+ if (data instanceof Buffer)
+ data = data.toString('utf-8')
+ stdoutChunks.push(data)
+ })
+
+ process.stderr.on('data', (data) => {
+ if (data instanceof Buffer)
+ data = data.toString('utf-8')
+ stderrChunks.push(data)
+ })
+ })
+ }
+
+ /**
+ * @param {number} id
+ * @param {string} status
+ * @param {string} result
+ * @param {object} data
+ * @return {Promise<void>}
+ */
+ async setJobStatus(id, status, result = RESULT_OK, data = {}) {
+ let update = {
+ status,
+ result
+ }
+ switch (status) {
+ case STATUS_RUNNING:
+ case STATUS_DONE:
+ update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp()
+ break
+ }
+ if (data.code !== undefined)
+ update.return_code = data.code
+ if (data.signal !== undefined)
+ update.sig = data.signal
+ if (data.stderr !== undefined)
+ update.stderr = data.stderr
+ if (data.stdout !== undefined)
+ update.stdout = data.stdout
+
+ let list = []
+ for (let field in update) {
+ let val = update[field]
+ if (val !== null)
+ val = db.escape(val)
+ list.push(`${field}=${val}`)
+ }
+
+ await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id])
+ }
+
+ /**
+ * @param {string[]} inTargets
+ * @returns {boolean}
+ */
+ hasFreeSlots(inTargets = []) {
+ const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):`
+
+ this.logger.debug(`${LOGPREFIX} entered`)
+
+ for (const target in this.targets) {
+ if (!inTargets.includes(target))
+ continue
+
+ for (const slot in this.targets[target].slots) {
+ const {limit, queue} = this.targets[target].slots[slot]
+ this.logger.debug(LOGPREFIX, limit, queue.length)
+ if (queue.length < limit)
+ return true
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * @param {string} target
+ * @param {string} slot
+ */
+ onJobFinished = (target, slot) => {
+ this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+ const {queue, limit} = this.targets[target].slots[slot]
+ if (queue.length < limit && this.hasPollTarget(target)) {
+ this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`)
+ this.poll()
+ }
+ }
+
+}
+
+module.exports = {
+ Worker,
+ STATUS_WAITING,
+ STATUS_MANUAL,
+ STATUS_ACCEPTED,
+ STATUS_IGNORED,
+ STATUS_RUNNING,
+ STATUS_DONE,
+} \ No newline at end of file
diff --git a/src/workers-list.js b/src/workers-list.js
new file mode 100644
index 0000000..96a127a
--- /dev/null
+++ b/src/workers-list.js
@@ -0,0 +1,145 @@
+const intersection = require('lodash/intersection')
+const {masterConfig} = require('./config')
+const {getLogger} = require('./logger')
+const {RequestMessage} = require('./server')
+const throttle = require('lodash/throttle')
+
+class WorkersList {
+
+ constructor() {
+ /**
+ * @type {{connection: Connection, targets: string[]}[]}
+ */
+ this.workers = []
+
+ /**
+ * @type {object.<string, boolean>}
+ */
+ this.targetsToPoke = {}
+
+ /**
+ * @type {object.<string, boolean>}
+ */
+ this.targetsWaitingToPoke = {}
+
+ /**
+ * @type {NodeJS.Timeout}
+ */
+ this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000)
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('WorkersList')
+ }
+
+ /**
+ * @param {Connection} connection
+ * @param {string[]} targets
+ */
+ add(connection, targets) {
+ this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`)
+
+ this.workers.push({connection, targets})
+ connection.on('close', () => {
+ this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`)
+ this.workers = this.workers.filter(worker => {
+ return worker.connection !== connection
+ })
+ })
+
+ let waiting = Object.keys(this.targetsWaitingToPoke)
+ if (!waiting.length)
+ return
+
+ let intrs = intersection(waiting, targets)
+ if (intrs.length) {
+ this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker')
+ this._pokeWorkerConnection(connection, intrs)
+ for (let target of intrs)
+ delete this.targetsWaitingToPoke[target]
+ this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke)
+ }
+ }
+
+ /**
+ * @param {string[]} targets
+ */
+ poke(targets) {
+ this.logger.debug('poke:', targets)
+ if (!Array.isArray(targets))
+ throw new Error('targets must be Array')
+
+ for (let t of targets)
+ this.targetsToPoke[t] = true
+
+ this._pokeWorkers()
+ }
+
+ /**
+ * @private
+ */
+ _pokeWorkers = throttle(() => {
+ const targets = Object.keys(this.targetsToPoke)
+ this.targetsToPoke = {}
+
+ const found = {}
+ for (const worker of this.workers) {
+ const intrs = intersection(worker.targets, targets)
+ intrs.forEach(t => {
+ found[t] = true
+ })
+ if (intrs.length > 0)
+ this._pokeWorkerConnection(worker.connection, targets)
+ }
+
+ for (let target of targets) {
+ if (!(target in found)) {
+ this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`)
+ this.targetsWaitingToPoke[target] = true
+ }
+ this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke)
+ }
+ }, masterConfig.poke_throttle_interval * 1000, {leading: true})
+
+ /**
+ * @param {Connection} connection
+ * @param {string[]} targets
+ * @private
+ */
+ _pokeWorkerConnection(connection, targets) {
+ this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
+ connection.send(
+ new RequestMessage('poll', {
+ targets
+ })
+ )
+ }
+
+ /**
+ * @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
+ */
+ getInfo() {
+ return this.workers.map(worker => {
+ return {
+ remoteAddr: worker.connection.socket?.remoteAddress,
+ remotePort: worker.connection.socket?.remotePort,
+ targets: worker.targets
+ }
+ })
+ }
+
+ /**
+ * @private
+ */
+ sendPings = () => {
+ this.workers
+ .forEach(w => {
+ this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
+ w.connection.send(new RequestMessage('ping'))
+ })
+ }
+
+}
+
+module.exports = WorkersList \ No newline at end of file