aboutsummaryrefslogtreecommitdiff
path: root/src/lib/workers-list.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/workers-list.js')
-rw-r--r--src/lib/workers-list.js159
1 files changed, 157 insertions, 2 deletions
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c7b3ab1..41b13e2 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -1,8 +1,7 @@
-const intersection = require('lodash/intersection')
+const {intersection, throttle, sample} = require('lodash')
const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
-const throttle = require('lodash/throttle')
class WorkersList {
@@ -173,6 +172,162 @@ class WorkersList {
}
/**
+ * Send run-manual() requests to workers, aggregate and return results.
+ *
+ * @param {{id: int, target: string}[]} jobs
+ * @return {Promise<{jobs: {}, errors: {}}>}
+ */
+ async runManual(jobs) {
+ this.logger.debug('runManual:', jobs)
+
+ const workers = [...this.workers]
+
+ /**
+ * @type {object.<string, int[]>}
+ */
+ const targetWorkers = {}
+
+ for (let workerIndex = 0; workerIndex < workers.length; workerIndex++) {
+ const worker = workers[workerIndex]
+
+ for (let target of worker.targets) {
+ if (targetWorkers[target] === undefined)
+ targetWorkers[target] = []
+
+ targetWorkers[target].push(workerIndex)
+ }
+ }
+
+ this.logger.trace('runManual: targetWorkers:', targetWorkers)
+
+ /**
+ * List of job IDs with unsupported targets.
+ *
+ * @type {int[]}
+ */
+ const exceptions = []
+
+ /**
+ * @type {object.<int, int[]>}
+ */
+ const callMap = {}
+
+ /**
+ * @type {object.<int, string>}
+ */
+ const jobToTargetMap = {}
+
+ for (const job of jobs) {
+ const {id, target} = job
+
+ jobToTargetMap[id] = target
+
+ // if worker serving this target not found, skip the job
+ if (targetWorkers[target] === undefined) {
+ exceptions.push(id)
+ continue
+ }
+
+ // get random worker index
+ let workerIndex = sample(targetWorkers[target])
+ if (callMap[workerIndex] === undefined)
+ callMap[workerIndex] = []
+
+ callMap[workerIndex].push(id)
+ }
+
+ this.logger.trace('runManual: callMap:', callMap)
+ this.logger.trace('runManual: exceptions:', exceptions)
+
+ /**
+ * @type {Promise[]}
+ */
+ const promises = []
+
+ /**
+ * @type {int[][]}
+ */
+ const jobsByPromise = []
+
+ for (const workerIndex in callMap) {
+ if (!callMap.hasOwnProperty(workerIndex))
+ continue
+
+ let workerJobIds = callMap[workerIndex]
+ let worker = workers[workerIndex]
+ let conn = worker.connection
+
+ let P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobIds})
+ )
+
+ promises.push(P)
+ jobsByPromise.push(workerJobIds)
+ }
+
+ this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+
+ const results = await Promise.allSettled(promises)
+
+ this.logger.trace('runManual: Promise.allSettled results:', results)
+
+ const response = {}
+ const setError = (id, value) => {
+ if (!('errors' in response))
+ response.errors = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.errors, id)
+ } else {
+ response.errors[id] = value
+ }
+ }
+ const setData = (id, value) => {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.jobs, id)
+ } else {
+ response.jobs[id] = value
+ }
+ }
+
+ for (let i = 0; i < results.length; i++) {
+ let result = results[i]
+ if (result.status === 'fulfilled') {
+ /**
+ * @type {ResponseMessage}
+ */
+ const responseMessage = result.value
+
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
+
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ } else if (result.status === 'rejected') {
+ for (let jobIds of jobsByPromise[i]) {
+ for (let jobId of jobIds)
+ setError(jobId, result.reason?.message)
+ }
+ }
+ }
+
+ // don't forget about skipped jobs
+ if (exceptions.length) {
+ for (let id of exceptions)
+ setError(id, `worker serving target for ${jobToTargetMap[id]} not found`)
+ }
+
+ return response
+ }
+
+ /**
* @private
*/
sendPings = () => {