aboutsummaryrefslogtreecommitdiff
path: root/src/lib/workers-list.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/workers-list.js')
-rw-r--r--src/lib/workers-list.js88
1 files changed, 65 insertions, 23 deletions
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c779ec2..4fc5c53 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -3,6 +3,18 @@ const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
+const MANUAL_CALL_TYPE_RUN = 0
+const MANUAL_CALL_TYPE_SIGNALS = 1
+
+function validateManualCallType(type) {
+ if (![
+ MANUAL_CALL_TYPE_RUN,
+ MANUAL_CALL_TYPE_SIGNALS
+ ].includes(type)) {
+ throw new Error('invalid manual call type')
+ }
+}
+
class WorkersList {
constructor() {
@@ -190,8 +202,9 @@ class WorkersList {
* @param {{id: int, target: string}[]} jobs
* @return {Promise<{jobs: {}, errors: {}}>}
*/
- async runManual(jobs) {
- this.logger.debug('runManual:', jobs)
+ async _runManualCall(callType, jobs) {
+ validateManualCallType(callType)
+ this.logger.debug(`runManualCall[${callType}]:`, jobs)
const workers = [...this.workers]
@@ -211,7 +224,7 @@ class WorkersList {
}
}
- this.logger.trace('runManual: targetWorkers:', targetWorkers)
+ this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers)
/**
* List of job IDs with unsupported targets.
@@ -219,10 +232,6 @@ class WorkersList {
* @type {int[]}
*/
const exceptions = []
-
- /**
- * @type {object.<int, int[]>}
- */
const callMap = {}
/**
@@ -246,11 +255,11 @@ class WorkersList {
if (callMap[workerIndex] === undefined)
callMap[workerIndex] = []
- callMap[workerIndex].push(id)
+ callMap[workerIndex].push(job)
}
- this.logger.trace('runManual: callMap:', callMap)
- this.logger.trace('runManual: exceptions:', exceptions)
+ this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap)
+ this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions)
/**
* @type {Promise[]}
@@ -266,23 +275,38 @@ class WorkersList {
if (!callMap.hasOwnProperty(workerIndex))
continue
- let workerJobIds = callMap[workerIndex]
+ let workerJobsData = callMap[workerIndex]
let worker = workers[workerIndex]
let conn = worker.connection
- let P = conn.sendRequest(
- new RequestMessage('run-manual', {ids: workerJobIds})
- )
+ let P
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)})
+ )
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ const data = {}
+ for (let jobData of workerJobsData)
+ data[jobData.id] = jobData.signal
+
+ P = conn.sendRequest(
+ new RequestMessage('send-signal', {jobs: data})
+ )
+ break
+ }
promises.push(P)
- jobsByPromise.push(workerJobIds)
+ jobsByPromise.push(workerJobsData.map(j => j.id))
}
- this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+ this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise)
const results = await Promise.allSettled(promises)
- this.logger.trace('runManual: Promise.allSettled results:', results)
+ this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results)
const response = {}
const setError = (id, value) => {
@@ -314,14 +338,24 @@ class WorkersList {
*/
const responseMessage = result.value
- const {jobs, errors} = responseMessage.data
- this.logger.trace(`[${i}]:`, jobs, errors)
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
- if (jobs)
- setData(jobs)
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ Object.assign(response, responseMessage.data)
+ break
+ }
- if (errors)
- setError(errors)
} else if (result.status === 'rejected') {
for (let jobIds of jobsByPromise[i]) {
@@ -340,6 +374,14 @@ class WorkersList {
return response
}
+ async runManual(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs)
+ }
+
+ async sendSignals(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs)
+ }
+
/**
* @param {null|string[]} targets
*/