aboutsummaryrefslogtreecommitdiff
path: root/src/jobd.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-xsrc/jobd.js139
1 files changed, 104 insertions, 35 deletions
diff --git a/src/jobd.js b/src/jobd.js
index 6f3c85e..e341f3d 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -3,8 +3,21 @@ const minimist = require('minimist')
const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const db = require('./lib/db')
-const {Server, Connection, RequestMessage, ResponseMessage} = require('./lib/server')
-const {Worker, STATUS_MANUAL} = require('./lib/worker')
+const {uniq} = require('lodash')
+const {createCallablePromise} = require('./lib/util')
+const {
+ Server,
+ Connection,
+ RequestMessage,
+ ResponseMessage
+} = require('./lib/server')
+const {
+ Worker,
+ STATUS_MANUAL,
+ JOB_NOTFOUND,
+ JOB_ACCEPTED,
+ JOB_IGNORED
+} = require('./lib/worker')
const package_json = require('../package.json')
/**
@@ -23,9 +36,9 @@ let logger
let server
/**
- * @type {object.<string, {connection: Connection, requestNo: number}>}
+ * @type {object.<string, Promise>}
*/
-let jobDoneAwaiters = {}
+let jobPromises = {}
main().catch(e => {
@@ -95,16 +108,14 @@ async function main() {
}
}
worker.on('job-done', (data) => {
- if (jobDoneAwaiters[data.id] !== undefined) {
- const {connection, requestNo} = jobDoneAwaiters[data.id]
-
- connection.send(
- new ResponseMessage(requestNo)
- .setData(data)
- )
- connection.close()
-
- delete jobDoneAwaiters[data.id]
+ if (jobPromises[data.id] !== undefined) {
+ const P = jobPromises[data.id]
+ delete jobPromises[data.id]
+
+ logger.trace(`job-done: resolving promise of job ${data.id}`)
+ P.resolve(data)
+ } else {
+ logger.warn(`job-done: jobPromises[${data.id}] is undefined`)
}
})
logger.info('queue initialized')
@@ -133,7 +144,7 @@ async function onRequestMessage(message, connection) {
logger.info('onMessage:', message)
switch (message.requestType) {
- case 'poll':
+ case 'poll': {
const targets = message.requestData?.targets || []
if (!targets.length) {
connection.send(
@@ -161,49 +172,107 @@ async function onRequestMessage(message, connection) {
.setData('ok')
)
break
+ }
- case 'status':
+ case 'status': {
const qs = worker.getStatus()
connection.send(
new ResponseMessage(message.requestNo)
.setData({
queue: qs,
- jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
+ jobDoneAwaitersCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
})
)
break
+ }
- case 'run-manual':
- const {id} = message.requestData
- if (id in jobDoneAwaiters) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError('another client is already waiting this job')
- )
- break
+ case 'run-manual': {
+ let {ids: jobIds} = message.requestData
+ jobIds = uniq(jobIds)
+
+ // if at least one of the jobs is already being run, reject
+ // if at least one item is not a number, reject
+ for (const id of jobIds) {
+ if (typeof id !== 'number') {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`all ids must be numbers, got ${typeof id}`)
+ )
+ return
+ }
+
+ if (id in jobPromises) {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`another client is already waiting for job ${id}`)
+ )
+ return
+ }
}
- jobDoneAwaiters[id] = {
- connection,
- requestNo: message.requestNo
+ // create a bunch of promises, one per job
+ let promises = []
+ for (const id of jobIds) {
+ const P = createCallablePromise()
+ jobPromises[id] = P
+ promises.push(P)
}
- const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id})
- if (!accepted) {
- delete jobDoneAwaiters[id]
+ // get jobs from database and enqueue for execution
+ const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
+
+ // wait till all jobs are done (or failed), then send a response
+ Promise.allSettled(promises).then(results => {
+ const response = {}
+
+ for (let i = 0; i < results.length; i++) {
+ let jobId = jobIds[i]
+ let result = results[i]
+
+ if (result.status === 'fulfilled') {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (result.value?.id !== undefined)
+ delete result.value.id
+
+ response.jobs[jobId] = result.value
+ } else if (result.status === 'rejected') {
+ if (!('errors' in response))
+ response.errors = {}
- let message = 'failed to run task'
- if (typeof error === 'string')
- message += `: ${error}`
+ response.errors[jobId] = result.reason?.message
+ }
+ }
connection.send(
new ResponseMessage(message.requestNo)
- .setError(message)
+ .setData(response)
)
+ })
+
+ // reject all ignored / non-found jobs
+ for (const [id, value] of results.entries()) {
+ if (!(id in jobPromises)) {
+ this.logger.error(`run-manual: ${id} not found in jobPromises`)
+ continue
+ }
+
+ if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
+ const P = jobPromises[id]
+ delete jobPromises[id]
+
+ if (value.result === JOB_IGNORED)
+ P.reject(new Error(value.reason))
+
+ else if (value.result === JOB_NOTFOUND)
+ P.reject(new Error(`job ${id} not found`))
+ }
}
break
+ }
default:
connection.send(