From 2be11422804aaf5a088e0c5ffdba53fe1df37365 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 2 Mar 2021 21:06:51 +0300 Subject: run-manual: support multiple jobs --- src/jobd.js | 139 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 35 deletions(-) (limited to 'src/jobd.js') diff --git a/src/jobd.js b/src/jobd.js index 6f3c85e..e341f3d 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -3,8 +3,21 @@ const minimist = require('minimist') const loggerModule = require('./lib/logger') const config = require('./lib/config') const db = require('./lib/db') -const {Server, Connection, RequestMessage, ResponseMessage} = require('./lib/server') -const {Worker, STATUS_MANUAL} = require('./lib/worker') +const {uniq} = require('lodash') +const {createCallablePromise} = require('./lib/util') +const { + Server, + Connection, + RequestMessage, + ResponseMessage +} = require('./lib/server') +const { + Worker, + STATUS_MANUAL, + JOB_NOTFOUND, + JOB_ACCEPTED, + JOB_IGNORED +} = require('./lib/worker') const package_json = require('../package.json') /** @@ -23,9 +36,9 @@ let logger let server /** - * @type {object.} + * @type {object.} */ -let jobDoneAwaiters = {} +let jobPromises = {} main().catch(e => { @@ -95,16 +108,14 @@ async function main() { } } worker.on('job-done', (data) => { - if (jobDoneAwaiters[data.id] !== undefined) { - const {connection, requestNo} = jobDoneAwaiters[data.id] - - connection.send( - new ResponseMessage(requestNo) - .setData(data) - ) - connection.close() - - delete jobDoneAwaiters[data.id] + if (jobPromises[data.id] !== undefined) { + const P = jobPromises[data.id] + delete jobPromises[data.id] + + logger.trace(`job-done: resolving promise of job ${data.id}`) + P.resolve(data) + } else { + logger.warn(`job-done: jobPromises[${data.id}] is undefined`) } }) logger.info('queue initialized') @@ -133,7 +144,7 @@ async function onRequestMessage(message, connection) { logger.info('onMessage:', message) switch (message.requestType) { - case 'poll': + case 'poll': { const targets = message.requestData?.targets || [] if (!targets.length) { connection.send( @@ -161,49 +172,107 @@ async function onRequestMessage(message, connection) { .setData('ok') ) break + } - case 'status': + case 'status': { const qs = worker.getStatus() connection.send( new ResponseMessage(message.requestNo) .setData({ queue: qs, - jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + jobDoneAwaitersCount: Object.keys(jobPromises).length, memoryUsage: process.memoryUsage() }) ) break + } - case 'run-manual': - const {id} = message.requestData - if (id in jobDoneAwaiters) { - connection.send( - new ResponseMessage(message.requestNo) - .setError('another client is already waiting this job') - ) - break + case 'run-manual': { + let {ids: jobIds} = message.requestData + jobIds = uniq(jobIds) + + // if at least one of the jobs is already being run, reject + // if at least one item is not a number, reject + for (const id of jobIds) { + if (typeof id !== 'number') { + connection.send( + new ResponseMessage(message.requestNo) + .setError(`all ids must be numbers, got ${typeof id}`) + ) + return + } + + if (id in jobPromises) { + connection.send( + new ResponseMessage(message.requestNo) + .setError(`another client is already waiting for job ${id}`) + ) + return + } } - jobDoneAwaiters[id] = { - connection, - requestNo: message.requestNo + // create a bunch of promises, one per job + let promises = [] + for (const id of jobIds) { + const P = createCallablePromise() + jobPromises[id] = P + promises.push(P) } - const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id}) - if (!accepted) { - delete jobDoneAwaiters[id] + // get jobs from database and enqueue for execution + const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) + + // wait till all jobs are done (or failed), then send a response + Promise.allSettled(promises).then(results => { + const response = {} + + for (let i = 0; i < results.length; i++) { + let jobId = jobIds[i] + let result = results[i] + + if (result.status === 'fulfilled') { + if (!('jobs' in response)) + response.jobs = {} + + if (result.value?.id !== undefined) + delete result.value.id + + response.jobs[jobId] = result.value + } else if (result.status === 'rejected') { + if (!('errors' in response)) + response.errors = {} - let message = 'failed to run task' - if (typeof error === 'string') - message += `: ${error}` + response.errors[jobId] = result.reason?.message + } + } connection.send( new ResponseMessage(message.requestNo) - .setError(message) + .setData(response) ) + }) + + // reject all ignored / non-found jobs + for (const [id, value] of results.entries()) { + if (!(id in jobPromises)) { + this.logger.error(`run-manual: ${id} not found in jobPromises`) + continue + } + + if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { + const P = jobPromises[id] + delete jobPromises[id] + + if (value.result === JOB_IGNORED) + P.reject(new Error(value.reason)) + + else if (value.result === JOB_NOTFOUND) + P.reject(new Error(`job ${id} not found`)) + } } break + } default: connection.send( -- cgit v1.2.3