aboutsummaryrefslogtreecommitdiff
path: root/src/jobd.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:05:37 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:05:37 +0300
commit8783de4fe5945a7ea22995c25a3e546c0426c7f6 (patch)
tree17711b548483b57a5b5b2b81594ad2a19fc6eeab /src/jobd.js
parent57f541a6ea6722a2902e7cc7774aefca883fe8de (diff)
update protocol
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-xsrc/jobd.js107
1 files changed, 61 insertions, 46 deletions
diff --git a/src/jobd.js b/src/jobd.js
index a175b63..0df4e70 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -22,7 +22,7 @@ let logger
let server
/**
- * @type {object.<string, Connection>}
+ * @type {object.<string, {connection: Connection, requestNo: number}>}
*/
let jobDoneAwaiters = {}
@@ -90,8 +90,14 @@ async function main() {
}
worker.on('job-done', (data) => {
if (jobDoneAwaiters[data.id] !== undefined) {
- jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data))
- jobDoneAwaiters[data.id].close()
+ const {connection, requestNo} = jobDoneAwaiters[data.id]
+
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(data)
+ )
+ connection.close()
+
delete jobDoneAwaiters[data.id]
}
})
@@ -99,7 +105,9 @@ async function main() {
// start server
server = new Server()
- server.on('message', onMessage)
+ server.on('new-connection', (connection) => {
+ connection.on('request-message', onRequestMessage)
+ })
server.start(config.get('port'), config.get('host'))
logger.info('server started')
@@ -114,36 +122,27 @@ async function main() {
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onMessage({message, connection}) {
+async function onRequestMessage(message, connection) {
try {
- if (!(message instanceof RequestMessage)) {
- logger.debug('ignoring message', message)
- return
- }
-
- if (message.requestType !== 'ping')
- logger.info('onMessage:', message)
-
- if (config.get('password') && message.password !== config.get('password')) {
- connection.send(new ResponseMessage().setError('invalid password'))
- return connection.close()
- }
+ logger.info('onMessage:', message)
switch (message.requestType) {
- case 'ping':
- connection.send(new ResponseMessage().setData('pong'))
- break
-
case 'poll':
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError('empty targets'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('empty targets')
+ )
break
}
for (const t of targets) {
if (!worker.hasTarget(t)) {
- connection.send(new ResponseMessage().setError(`invalid target '${t}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`invalid target '${t}'`)
+ )
break
}
}
@@ -151,28 +150,38 @@ async function onMessage({message, connection}) {
worker.setPollTargets(targets)
worker.poll()
- connection.send(new ResponseMessage().setData('ok'));
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
case 'status':
const qs = worker.getStatus()
connection.send(
- new ResponseMessage().setData({
- queue: qs,
- jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
- memoryUsage: process.memoryUsage()
- })
+ new ResponseMessage(message.requestNo)
+ .setData({
+ queue: qs,
+ jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
+ memoryUsage: process.memoryUsage()
+ })
)
break
case 'run-manual':
const {id} = message.requestData
if (id in jobDoneAwaiters) {
- connection.send(new ResponseMessage().setError('another client is already waiting this job'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('another client is already waiting this job')
+ )
break
}
- jobDoneAwaiters[id] = connection
+ jobDoneAwaiters[id] = {
+ connection,
+ requestNo: message.requestNo
+ }
const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id})
if (!accepted) {
@@ -181,18 +190,28 @@ async function onMessage({message, connection}) {
let message = 'failed to run task'
if (typeof error === 'string')
message += `: ${error}`
- connection.send(new ResponseMessage().setError(message))
+
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(message)
+ )
}
break
default:
- connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`unknown request type: '${message.requestType}'`)
+ )
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
- connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('server error: ' + error?.message)
+ )
}
}
@@ -202,11 +221,17 @@ function connectToMaster() {
connection.connect(config.get('master_host'), config.get('master_port'))
connection.on('connect', function() {
- connection.send(
+ connection.sendRequest(
new RequestMessage('register-worker', {
targets: worker.getTargets()
})
)
+ .then(response => {
+ logger.debug('connectToMaster: response:', response)
+ })
+ .catch(error => {
+ logger.error('connectToMaster: error while awaiting response:', error)
+ })
})
connection.on('close', () => {
@@ -216,17 +241,7 @@ function connectToMaster() {
}, config.get('master_reconnect_timeout') * 1000)
})
- connection.on('message', (message) => {
- if (!(message instanceof RequestMessage)) {
- logger.debug('message from master is not a request, hmm... skipping', message)
- return
- }
-
- onMessage({message, connection})
- .catch((error) => {
- logger.error('connectToMaster: onMessage:', error)
- })
- })
+ connection.on('request-message', onRequestMessage)
}