diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/jobd-master.js | 23 | ||||
-rwxr-xr-x | src/jobd.js | 18 | ||||
-rw-r--r-- | src/lib/worker.js | 25 | ||||
-rw-r--r-- | src/lib/workers-list.js | 88 |
4 files changed, 130 insertions, 24 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js index 34e03ba..3bdc15a 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -114,6 +114,7 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('send-signal', onSendSignal) } function usage() { @@ -223,3 +224,25 @@ function onContinue(data, requestNo, connection) { return 'ok' } + +/** + * @param {object} data + * @return {Promise<*>} + */ +async function onSendSignal(data) { + const {jobs} = data + + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['signal', 'i', true], + ['target', 's', true], + ]) + } + + return await workers.sendSignals(jobs) +}
\ No newline at end of file diff --git a/src/jobd.js b/src/jobd.js index 7c63607..5dd0d6d 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -109,7 +109,10 @@ async function initApp(appName) { }) logger = loggerModule.getLogger(appName) - process.title = appName + let processTitle = `${appName}` + if (config.get('name')) + processTitle += ` ${config.get('name')}` + process.title = processTitle } function initWorker() { @@ -141,6 +144,7 @@ function initRequestHandler() { requestHandler.set('poll', onPollRequest) requestHandler.set('status', onStatus) requestHandler.set('run-manual', onRunManual) + requestHandler.set('send-signal', onSendSignal) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) requestHandler.set('add-target', onAddTarget) @@ -345,6 +349,18 @@ async function onRunManual(data) { return P } +async function onSendSignal(data) { + const {jobs: jobToSignalMap} = data + const results = {} + for (const id in jobToSignalMap) { + if (!jobToSignalMap.hasOwnProperty(id)) + continue + const signal = jobToSignalMap[id] + results[id] = worker.killJobProcess(id, signal) + } + return results +} + /** * @param {{targets: string[]}} data */ diff --git a/src/lib/worker.js b/src/lib/worker.js index 3a4bb83..673f0eb 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -44,6 +44,11 @@ class Worker extends EventEmitter { * @type {Logger} */ this.logger = getLogger('Worker') + + /** + * @type {{}} + */ + this.runningProcesses = {} } /** @@ -480,6 +485,7 @@ class Worker extends EventEmitter { cwd, env }) + this.runningProcesses[id] = process let stdoutChunks = [] let stderrChunks = [] @@ -490,6 +496,8 @@ class Worker extends EventEmitter { * @param {null|string} signal */ (code, signal) => { + delete this.runningProcesses[id] + let stdout = stdoutChunks.join('') let stderr = stderrChunks.join('') @@ -505,6 +513,7 @@ class Worker extends EventEmitter { }) process.on('error', (error) => { + delete this.runningProcesses[id] reject(error) }) @@ -601,6 +610,22 @@ class Worker extends EventEmitter { } } + /** + * @param {number} id + * @param {number} signal + * @return {boolean} + */ + killJobProcess(id, signal) { + if (this.runningProcesses[id] !== undefined) { + try { + return this.runningProcesses[id].kill(signal) + } catch (error) { + this.logger.error(`killJobProcess(${id}, ${signal})`, error) + } + } + return false + } + } module.exports = { diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c779ec2..4fc5c53 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -3,6 +3,18 @@ const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') +const MANUAL_CALL_TYPE_RUN = 0 +const MANUAL_CALL_TYPE_SIGNALS = 1 + +function validateManualCallType(type) { + if (![ + MANUAL_CALL_TYPE_RUN, + MANUAL_CALL_TYPE_SIGNALS + ].includes(type)) { + throw new Error('invalid manual call type') + } +} + class WorkersList { constructor() { @@ -190,8 +202,9 @@ class WorkersList { * @param {{id: int, target: string}[]} jobs * @return {Promise<{jobs: {}, errors: {}}>} */ - async runManual(jobs) { - this.logger.debug('runManual:', jobs) + async _runManualCall(callType, jobs) { + validateManualCallType(callType) + this.logger.debug(`runManualCall[${callType}]:`, jobs) const workers = [...this.workers] @@ -211,7 +224,7 @@ class WorkersList { } } - this.logger.trace('runManual: targetWorkers:', targetWorkers) + this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers) /** * List of job IDs with unsupported targets. @@ -219,10 +232,6 @@ class WorkersList { * @type {int[]} */ const exceptions = [] - - /** - * @type {object.<int, int[]>} - */ const callMap = {} /** @@ -246,11 +255,11 @@ class WorkersList { if (callMap[workerIndex] === undefined) callMap[workerIndex] = [] - callMap[workerIndex].push(id) + callMap[workerIndex].push(job) } - this.logger.trace('runManual: callMap:', callMap) - this.logger.trace('runManual: exceptions:', exceptions) + this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap) + this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions) /** * @type {Promise[]} @@ -266,23 +275,38 @@ class WorkersList { if (!callMap.hasOwnProperty(workerIndex)) continue - let workerJobIds = callMap[workerIndex] + let workerJobsData = callMap[workerIndex] let worker = workers[workerIndex] let conn = worker.connection - let P = conn.sendRequest( - new RequestMessage('run-manual', {ids: workerJobIds}) - ) + let P + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)}) + ) + break + + case MANUAL_CALL_TYPE_SIGNALS: + const data = {} + for (let jobData of workerJobsData) + data[jobData.id] = jobData.signal + + P = conn.sendRequest( + new RequestMessage('send-signal', {jobs: data}) + ) + break + } promises.push(P) - jobsByPromise.push(workerJobIds) + jobsByPromise.push(workerJobsData.map(j => j.id)) } - this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise) const results = await Promise.allSettled(promises) - this.logger.trace('runManual: Promise.allSettled results:', results) + this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results) const response = {} const setError = (id, value) => { @@ -314,14 +338,24 @@ class WorkersList { */ const responseMessage = result.value - const {jobs, errors} = responseMessage.data - this.logger.trace(`[${i}]:`, jobs, errors) + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) - if (jobs) - setData(jobs) + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + break + + case MANUAL_CALL_TYPE_SIGNALS: + Object.assign(response, responseMessage.data) + break + } - if (errors) - setError(errors) } else if (result.status === 'rejected') { for (let jobIds of jobsByPromise[i]) { @@ -340,6 +374,14 @@ class WorkersList { return response } + async runManual(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs) + } + + async sendSignals(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs) + } + /** * @param {null|string[]} targets */ |