From 12a2dda2b801487ccb10690d19d9f28aed90c57c Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 2 Mar 2021 23:54:12 +0300 Subject: jobd-master: support run-manual(); improve data validation here and there --- src/lib/workers-list.js | 159 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 157 insertions(+), 2 deletions(-) (limited to 'src/lib/workers-list.js') diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c7b3ab1..41b13e2 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -1,8 +1,7 @@ -const intersection = require('lodash/intersection') +const {intersection, throttle, sample} = require('lodash') const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') -const throttle = require('lodash/throttle') class WorkersList { @@ -172,6 +171,162 @@ class WorkersList { return info } + /** + * Send run-manual() requests to workers, aggregate and return results. + * + * @param {{id: int, target: string}[]} jobs + * @return {Promise<{jobs: {}, errors: {}}>} + */ + async runManual(jobs) { + this.logger.debug('runManual:', jobs) + + const workers = [...this.workers] + + /** + * @type {object.} + */ + const targetWorkers = {} + + for (let workerIndex = 0; workerIndex < workers.length; workerIndex++) { + const worker = workers[workerIndex] + + for (let target of worker.targets) { + if (targetWorkers[target] === undefined) + targetWorkers[target] = [] + + targetWorkers[target].push(workerIndex) + } + } + + this.logger.trace('runManual: targetWorkers:', targetWorkers) + + /** + * List of job IDs with unsupported targets. + * + * @type {int[]} + */ + const exceptions = [] + + /** + * @type {object.} + */ + const callMap = {} + + /** + * @type {object.} + */ + const jobToTargetMap = {} + + for (const job of jobs) { + const {id, target} = job + + jobToTargetMap[id] = target + + // if worker serving this target not found, skip the job + if (targetWorkers[target] === undefined) { + exceptions.push(id) + continue + } + + // get random worker index + let workerIndex = sample(targetWorkers[target]) + if (callMap[workerIndex] === undefined) + callMap[workerIndex] = [] + + callMap[workerIndex].push(id) + } + + this.logger.trace('runManual: callMap:', callMap) + this.logger.trace('runManual: exceptions:', exceptions) + + /** + * @type {Promise[]} + */ + const promises = [] + + /** + * @type {int[][]} + */ + const jobsByPromise = [] + + for (const workerIndex in callMap) { + if (!callMap.hasOwnProperty(workerIndex)) + continue + + let workerJobIds = callMap[workerIndex] + let worker = workers[workerIndex] + let conn = worker.connection + + let P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobIds}) + ) + + promises.push(P) + jobsByPromise.push(workerJobIds) + } + + this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + + const results = await Promise.allSettled(promises) + + this.logger.trace('runManual: Promise.allSettled results:', results) + + const response = {} + const setError = (id, value) => { + if (!('errors' in response)) + response.errors = {} + + if (typeof id === 'object') { + Object.assign(response.errors, id) + } else { + response.errors[id] = value + } + } + const setData = (id, value) => { + if (!('jobs' in response)) + response.jobs = {} + + if (typeof id === 'object') { + Object.assign(response.jobs, id) + } else { + response.jobs[id] = value + } + } + + for (let i = 0; i < results.length; i++) { + let result = results[i] + if (result.status === 'fulfilled') { + /** + * @type {ResponseMessage} + */ + const responseMessage = result.value + + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) + + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + } else if (result.status === 'rejected') { + for (let jobIds of jobsByPromise[i]) { + for (let jobId of jobIds) + setError(jobId, result.reason?.message) + } + } + } + + // don't forget about skipped jobs + if (exceptions.length) { + for (let id of exceptions) + setError(id, `worker serving target for ${jobToTargetMap[id]} not found`) + } + + return response + } + /** * @private */ -- cgit v1.2.3