diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-02-24 03:59:25 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-02-26 17:35:21 +0300 |
commit | 5e7d34458a6e60487393caa4f320ab1cfc1cf8e5 (patch) | |
tree | 11df96e6e105558e8bdcd7a1f61f720dd68150af /src/jobd.js |
initial commit
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-x | src/jobd.js | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/src/jobd.js b/src/jobd.js new file mode 100755 index 0000000..6567d06 --- /dev/null +++ b/src/jobd.js @@ -0,0 +1,244 @@ +#!/usr/bin/env node +const minimist = require('minimist') +const loggerModule = require('./logger') +const configModule = require('./config') +const db = require('./db') +const {Server, Connection, RequestMessage, ResponseMessage} = require('./server') +const {Worker, STATUS_MANUAL} = require('./worker') + +const {workerConfig} = configModule + + +/** + * @type {Worker} + */ +let worker + +/** + * @type {Logger} + */ +let logger + +/** + * @type {Server} + */ +let server + +/** + * @type {object.<string, Connection>} + */ +let jobDoneAwaiters = {} + + +main().catch(e => { + console.error(e) + process.exit(1) +}) + + +async function main() { + if (process.argv.length < 3) { + usage() + process.exit(0) + } + + process.on('SIGINT', term) + process.on('SIGTERM', term) + + const argv = minimist(process.argv.slice(2)) + if (!argv.config) + throw new Error('--config option is required') + + // read config + try { + configModule.parseWorkerConfig(argv.config) + } catch (e) { + console.error(`config parsing error: ${e.message}`) + process.exit(1) + } + + await loggerModule.init({ + file: workerConfig.log_file, + levelFile: workerConfig.log_level_file, + levelConsole: workerConfig.log_level_console, + }) + logger = loggerModule.getLogger('jobd') + + // console.log(workerConfig) + + // init database + try { + await db.init() + } catch (error) { + logger.error('failed to connect to MySQL', error) + process.exit(1) + } + logger.info('db initialized') + + // init queue + worker = new Worker() + for (let targetName in workerConfig.targets) { + let slots = workerConfig.targets[targetName].slots + // let target = new Target({name: targetName}) + // queue.addTarget(target) + + for (let slotName in slots) { + let slotLimit = slots[slotName] + worker.addSlot(targetName, slotName, slotLimit) + } + } + worker.on('job-done', (data) => { + if (jobDoneAwaiters[data.id] !== undefined) { + jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data)) + jobDoneAwaiters[data.id].close() + delete jobDoneAwaiters[data.id] + } + }) + logger.info('queue initialized') + + // start server + server = new Server() + server.on('message', onMessage) + server.start(workerConfig.port, workerConfig.host) + logger.info('server started') + + // connect to master + if (workerConfig.master_port && workerConfig.master_host) + connectToMaster() +} + + +/** + * @param {RequestMessage|ResponseMessage} message + * @param {Connection} connection + * @return {Promise<*>} + */ +async function onMessage({message, connection}) { + try { + if (!(message instanceof RequestMessage)) { + logger.debug('ignoring message', message) + return + } + + if (message.requestType !== 'ping') + logger.info('onMessage:', message) + + if (workerConfig.password && message.password !== workerConfig.password) { + connection.send(new ResponseMessage().setError('invalid password')) + return connection.close() + } + + switch (message.requestType) { + case 'ping': + connection.send(new ResponseMessage().setData('pong')) + break + + case 'poll': + const targets = message.requestData?.targets || [] + if (!targets.length) { + connection.send(new ResponseMessage().setError('empty targets')) + break + } + + for (const t of targets) { + if (!worker.hasTarget(t)) { + connection.send(new ResponseMessage().setError(`invalid target '${t}'`)) + break + } + } + + worker.setPollTargets(targets) + worker.poll() + + connection.send(new ResponseMessage().setData('ok')); + break + + case 'status': + const qs = worker.getStatus() + connection.send( + new ResponseMessage().setData({ + queue: qs, + jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + memoryUsage: process.memoryUsage() + }) + ) + break + + case 'run-manual': + const {id} = message.requestData + if (id in jobDoneAwaiters) { + connection.send(new ResponseMessage().setError('another client is already waiting this job')) + break + } + + jobDoneAwaiters[id] = connection + + const {accepted} = await worker.getTasks(null, STATUS_MANUAL, {id}) + if (!accepted) { + delete jobDoneAwaiters[id] + connection.send(new ResponseMessage().setError('failed to run task')) // would be nice to provide some error... + } + + break + + default: + connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + break + } + } catch (error) { + logger.error(`error while handling message:`, message, error) + connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + } +} + + +function connectToMaster() { + const connection = new Connection() + connection.connect(workerConfig.master_host, workerConfig.master_port) + + connection.on('connect', function() { + connection.send( + new RequestMessage('register-worker', { + targets: worker.getTargets() + }) + ) + }) + + connection.on('close', () => { + logger.warn(`connectToMaster: connection closed`) + setTimeout(() => { + connectToMaster() + }, workerConfig.master_reconnect_timeout * 1000) + }) + + connection.on('message', (message) => { + if (!(message instanceof RequestMessage)) { + logger.debug('message from master is not a request, hmm... skipping', message) + return + } + + onMessage({message, connection}) + .catch((error) => { + logger.error('connectToMaster: onMessage:', error) + }) + }) +} + + +function usage() { + let s = `${process.argv[1]} OPTIONS + +Options: + --config <path>` + console.log(s) +} + + +function term() { + if (logger) + logger.info('shutdown') + + loggerModule.shutdown(function() { + process.exit() + }) +}
\ No newline at end of file |