aboutsummaryrefslogtreecommitdiff
path: root/src/jobd-master.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/jobd-master.js')
-rwxr-xr-xsrc/jobd-master.js64
1 files changed, 36 insertions, 28 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 608750d..d6bc09a 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -65,7 +65,9 @@ async function main() {
// start server
server = new Server()
- server.on('message', onMessage)
+ server.on('new-connection', (connection) => {
+ connection.on('request-message', onRequestMessage)
+ })
server.start(config.get('port'), config.get('host'))
logger.info('server started')
}
@@ -75,65 +77,71 @@ async function main() {
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onMessage({message, connection}) {
+async function onRequestMessage(message, connection) {
try {
- if (!(message instanceof RequestMessage)) {
- logger.debug('ignoring message', message)
- return
- }
-
- if (message.requestType !== 'ping')
- logger.info('onMessage:', message)
-
- if (config.get('password') && message.password !== config.get('password')) {
- connection.send(new ResponseMessage().setError('invalid password'))
- return connection.close()
- }
+ logger.info('onMessage:', message)
switch (message.requestType) {
- case 'ping':
- connection.send(new ResponseMessage().setError('pong'))
- break
-
case 'register-worker': {
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError(`targets are empty`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`targets are empty`)
+ )
break
}
workers.add(connection, targets)
- connection.send(new ResponseMessage().setData('ok'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
}
case 'poke': {
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError(`targets are empty`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`targets are empty`)
+ )
break
}
workers.poke(targets)
- connection.send(new ResponseMessage().setData('ok'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
}
case 'status':
const info = workers.getInfo()
- connection.send(new ResponseMessage().setData({
- workers: info,
- memoryUsage: process.memoryUsage()
- }))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData({
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ })
+ )
break
default:
- connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`unknown request type: '${message.requestType}'`)
+ )
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
- connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('server error: ' + error?.message)
+ )
}
}