diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/worker.js | 25 | ||||
-rw-r--r-- | src/lib/workers-list.js | 88 |
2 files changed, 90 insertions, 23 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js index 3a4bb83..673f0eb 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -44,6 +44,11 @@ class Worker extends EventEmitter { * @type {Logger} */ this.logger = getLogger('Worker') + + /** + * @type {{}} + */ + this.runningProcesses = {} } /** @@ -480,6 +485,7 @@ class Worker extends EventEmitter { cwd, env }) + this.runningProcesses[id] = process let stdoutChunks = [] let stderrChunks = [] @@ -490,6 +496,8 @@ class Worker extends EventEmitter { * @param {null|string} signal */ (code, signal) => { + delete this.runningProcesses[id] + let stdout = stdoutChunks.join('') let stderr = stderrChunks.join('') @@ -505,6 +513,7 @@ class Worker extends EventEmitter { }) process.on('error', (error) => { + delete this.runningProcesses[id] reject(error) }) @@ -601,6 +610,22 @@ class Worker extends EventEmitter { } } + /** + * @param {number} id + * @param {number} signal + * @return {boolean} + */ + killJobProcess(id, signal) { + if (this.runningProcesses[id] !== undefined) { + try { + return this.runningProcesses[id].kill(signal) + } catch (error) { + this.logger.error(`killJobProcess(${id}, ${signal})`, error) + } + } + return false + } + } module.exports = { diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c779ec2..4fc5c53 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -3,6 +3,18 @@ const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') +const MANUAL_CALL_TYPE_RUN = 0 +const MANUAL_CALL_TYPE_SIGNALS = 1 + +function validateManualCallType(type) { + if (![ + MANUAL_CALL_TYPE_RUN, + MANUAL_CALL_TYPE_SIGNALS + ].includes(type)) { + throw new Error('invalid manual call type') + } +} + class WorkersList { constructor() { @@ -190,8 +202,9 @@ class WorkersList { * @param {{id: int, target: string}[]} jobs * @return {Promise<{jobs: {}, errors: {}}>} */ - async runManual(jobs) { - this.logger.debug('runManual:', jobs) + async _runManualCall(callType, jobs) { + validateManualCallType(callType) + this.logger.debug(`runManualCall[${callType}]:`, jobs) const workers = [...this.workers] @@ -211,7 +224,7 @@ class WorkersList { } } - this.logger.trace('runManual: targetWorkers:', targetWorkers) + this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers) /** * List of job IDs with unsupported targets. @@ -219,10 +232,6 @@ class WorkersList { * @type {int[]} */ const exceptions = [] - - /** - * @type {object.<int, int[]>} - */ const callMap = {} /** @@ -246,11 +255,11 @@ class WorkersList { if (callMap[workerIndex] === undefined) callMap[workerIndex] = [] - callMap[workerIndex].push(id) + callMap[workerIndex].push(job) } - this.logger.trace('runManual: callMap:', callMap) - this.logger.trace('runManual: exceptions:', exceptions) + this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap) + this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions) /** * @type {Promise[]} @@ -266,23 +275,38 @@ class WorkersList { if (!callMap.hasOwnProperty(workerIndex)) continue - let workerJobIds = callMap[workerIndex] + let workerJobsData = callMap[workerIndex] let worker = workers[workerIndex] let conn = worker.connection - let P = conn.sendRequest( - new RequestMessage('run-manual', {ids: workerJobIds}) - ) + let P + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)}) + ) + break + + case MANUAL_CALL_TYPE_SIGNALS: + const data = {} + for (let jobData of workerJobsData) + data[jobData.id] = jobData.signal + + P = conn.sendRequest( + new RequestMessage('send-signal', {jobs: data}) + ) + break + } promises.push(P) - jobsByPromise.push(workerJobIds) + jobsByPromise.push(workerJobsData.map(j => j.id)) } - this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise) const results = await Promise.allSettled(promises) - this.logger.trace('runManual: Promise.allSettled results:', results) + this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results) const response = {} const setError = (id, value) => { @@ -314,14 +338,24 @@ class WorkersList { */ const responseMessage = result.value - const {jobs, errors} = responseMessage.data - this.logger.trace(`[${i}]:`, jobs, errors) + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) - if (jobs) - setData(jobs) + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + break + + case MANUAL_CALL_TYPE_SIGNALS: + Object.assign(response, responseMessage.data) + break + } - if (errors) - setError(errors) } else if (result.status === 'rejected') { for (let jobIds of jobsByPromise[i]) { @@ -340,6 +374,14 @@ class WorkersList { return response } + async runManual(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs) + } + + async sendSignals(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs) + } + /** * @param {null|string[]} targets */ |