diff options
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-x | src/jobd.js | 107 |
1 files changed, 61 insertions, 46 deletions
diff --git a/src/jobd.js b/src/jobd.js index a175b63..0df4e70 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -22,7 +22,7 @@ let logger let server /** - * @type {object.<string, Connection>} + * @type {object.<string, {connection: Connection, requestNo: number}>} */ let jobDoneAwaiters = {} @@ -90,8 +90,14 @@ async function main() { } worker.on('job-done', (data) => { if (jobDoneAwaiters[data.id] !== undefined) { - jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data)) - jobDoneAwaiters[data.id].close() + const {connection, requestNo} = jobDoneAwaiters[data.id] + + connection.send( + new ResponseMessage(requestNo) + .setData(data) + ) + connection.close() + delete jobDoneAwaiters[data.id] } }) @@ -99,7 +105,9 @@ async function main() { // start server server = new Server() - server.on('message', onMessage) + server.on('new-connection', (connection) => { + connection.on('request-message', onRequestMessage) + }) server.start(config.get('port'), config.get('host')) logger.info('server started') @@ -114,36 +122,27 @@ async function main() { * @param {Connection} connection * @return {Promise<*>} */ -async function onMessage({message, connection}) { +async function onRequestMessage(message, connection) { try { - if (!(message instanceof RequestMessage)) { - logger.debug('ignoring message', message) - return - } - - if (message.requestType !== 'ping') - logger.info('onMessage:', message) - - if (config.get('password') && message.password !== config.get('password')) { - connection.send(new ResponseMessage().setError('invalid password')) - return connection.close() - } + logger.info('onMessage:', message) switch (message.requestType) { - case 'ping': - connection.send(new ResponseMessage().setData('pong')) - break - case 'poll': const targets = message.requestData?.targets || [] if (!targets.length) { - connection.send(new ResponseMessage().setError('empty targets')) + connection.send( + new ResponseMessage(message.requestNo) + .setError('empty targets') + ) break } for (const t of targets) { if (!worker.hasTarget(t)) { - connection.send(new ResponseMessage().setError(`invalid target '${t}'`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`invalid target '${t}'`) + ) break } } @@ -151,28 +150,38 @@ async function onMessage({message, connection}) { worker.setPollTargets(targets) worker.poll() - connection.send(new ResponseMessage().setData('ok')); + connection.send( + new ResponseMessage(message.requestNo) + .setData('ok') + ) break case 'status': const qs = worker.getStatus() connection.send( - new ResponseMessage().setData({ - queue: qs, - jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, - memoryUsage: process.memoryUsage() - }) + new ResponseMessage(message.requestNo) + .setData({ + queue: qs, + jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + memoryUsage: process.memoryUsage() + }) ) break case 'run-manual': const {id} = message.requestData if (id in jobDoneAwaiters) { - connection.send(new ResponseMessage().setError('another client is already waiting this job')) + connection.send( + new ResponseMessage(message.requestNo) + .setError('another client is already waiting this job') + ) break } - jobDoneAwaiters[id] = connection + jobDoneAwaiters[id] = { + connection, + requestNo: message.requestNo + } const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id}) if (!accepted) { @@ -181,18 +190,28 @@ async function onMessage({message, connection}) { let message = 'failed to run task' if (typeof error === 'string') message += `: ${error}` - connection.send(new ResponseMessage().setError(message)) + + connection.send( + new ResponseMessage(message.requestNo) + .setError(message) + ) } break default: - connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`unknown request type: '${message.requestType}'`) + ) break } } catch (error) { logger.error(`error while handling message:`, message, error) - connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + connection.send( + new ResponseMessage(message.requestNo) + .setError('server error: ' + error?.message) + ) } } @@ -202,11 +221,17 @@ function connectToMaster() { connection.connect(config.get('master_host'), config.get('master_port')) connection.on('connect', function() { - connection.send( + connection.sendRequest( new RequestMessage('register-worker', { targets: worker.getTargets() }) ) + .then(response => { + logger.debug('connectToMaster: response:', response) + }) + .catch(error => { + logger.error('connectToMaster: error while awaiting response:', error) + }) }) connection.on('close', () => { @@ -216,17 +241,7 @@ function connectToMaster() { }, config.get('master_reconnect_timeout') * 1000) }) - connection.on('message', (message) => { - if (!(message instanceof RequestMessage)) { - logger.debug('message from master is not a request, hmm... skipping', message) - return - } - - onMessage({message, connection}) - .catch((error) => { - logger.error('connectToMaster: onMessage:', error) - }) - }) + connection.on('request-message', onRequestMessage) } |