From a6bdd77f06f4d6e6b7876017d4c29bb41da8545f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Mon, 3 Apr 2023 13:54:30 +0300 Subject: signals support --- src/lib/workers-list.js | 88 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 23 deletions(-) (limited to 'src/lib/workers-list.js') diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c779ec2..4fc5c53 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -3,6 +3,18 @@ const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') +const MANUAL_CALL_TYPE_RUN = 0 +const MANUAL_CALL_TYPE_SIGNALS = 1 + +function validateManualCallType(type) { + if (![ + MANUAL_CALL_TYPE_RUN, + MANUAL_CALL_TYPE_SIGNALS + ].includes(type)) { + throw new Error('invalid manual call type') + } +} + class WorkersList { constructor() { @@ -190,8 +202,9 @@ class WorkersList { * @param {{id: int, target: string}[]} jobs * @return {Promise<{jobs: {}, errors: {}}>} */ - async runManual(jobs) { - this.logger.debug('runManual:', jobs) + async _runManualCall(callType, jobs) { + validateManualCallType(callType) + this.logger.debug(`runManualCall[${callType}]:`, jobs) const workers = [...this.workers] @@ -211,7 +224,7 @@ class WorkersList { } } - this.logger.trace('runManual: targetWorkers:', targetWorkers) + this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers) /** * List of job IDs with unsupported targets. @@ -219,10 +232,6 @@ class WorkersList { * @type {int[]} */ const exceptions = [] - - /** - * @type {object.} - */ const callMap = {} /** @@ -246,11 +255,11 @@ class WorkersList { if (callMap[workerIndex] === undefined) callMap[workerIndex] = [] - callMap[workerIndex].push(id) + callMap[workerIndex].push(job) } - this.logger.trace('runManual: callMap:', callMap) - this.logger.trace('runManual: exceptions:', exceptions) + this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap) + this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions) /** * @type {Promise[]} @@ -266,23 +275,38 @@ class WorkersList { if (!callMap.hasOwnProperty(workerIndex)) continue - let workerJobIds = callMap[workerIndex] + let workerJobsData = callMap[workerIndex] let worker = workers[workerIndex] let conn = worker.connection - let P = conn.sendRequest( - new RequestMessage('run-manual', {ids: workerJobIds}) - ) + let P + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)}) + ) + break + + case MANUAL_CALL_TYPE_SIGNALS: + const data = {} + for (let jobData of workerJobsData) + data[jobData.id] = jobData.signal + + P = conn.sendRequest( + new RequestMessage('send-signal', {jobs: data}) + ) + break + } promises.push(P) - jobsByPromise.push(workerJobIds) + jobsByPromise.push(workerJobsData.map(j => j.id)) } - this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise) const results = await Promise.allSettled(promises) - this.logger.trace('runManual: Promise.allSettled results:', results) + this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results) const response = {} const setError = (id, value) => { @@ -314,14 +338,24 @@ class WorkersList { */ const responseMessage = result.value - const {jobs, errors} = responseMessage.data - this.logger.trace(`[${i}]:`, jobs, errors) + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) - if (jobs) - setData(jobs) + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + break + + case MANUAL_CALL_TYPE_SIGNALS: + Object.assign(response, responseMessage.data) + break + } - if (errors) - setError(errors) } else if (result.status === 'rejected') { for (let jobIds of jobsByPromise[i]) { @@ -340,6 +374,14 @@ class WorkersList { return response } + async runManual(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs) + } + + async sendSignals(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs) + } + /** * @param {null|string[]} targets */ -- cgit v1.2.3