aboutsummaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/worker.js25
-rw-r--r--src/lib/workers-list.js88
2 files changed, 90 insertions, 23 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js
index 3a4bb83..673f0eb 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -44,6 +44,11 @@ class Worker extends EventEmitter {
* @type {Logger}
*/
this.logger = getLogger('Worker')
+
+ /**
+ * @type {{}}
+ */
+ this.runningProcesses = {}
}
/**
@@ -480,6 +485,7 @@ class Worker extends EventEmitter {
cwd,
env
})
+ this.runningProcesses[id] = process
let stdoutChunks = []
let stderrChunks = []
@@ -490,6 +496,8 @@ class Worker extends EventEmitter {
* @param {null|string} signal
*/
(code, signal) => {
+ delete this.runningProcesses[id]
+
let stdout = stdoutChunks.join('')
let stderr = stderrChunks.join('')
@@ -505,6 +513,7 @@ class Worker extends EventEmitter {
})
process.on('error', (error) => {
+ delete this.runningProcesses[id]
reject(error)
})
@@ -601,6 +610,22 @@ class Worker extends EventEmitter {
}
}
+ /**
+ * @param {number} id
+ * @param {number} signal
+ * @return {boolean}
+ */
+ killJobProcess(id, signal) {
+ if (this.runningProcesses[id] !== undefined) {
+ try {
+ return this.runningProcesses[id].kill(signal)
+ } catch (error) {
+ this.logger.error(`killJobProcess(${id}, ${signal})`, error)
+ }
+ }
+ return false
+ }
+
}
module.exports = {
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c779ec2..4fc5c53 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -3,6 +3,18 @@ const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
+const MANUAL_CALL_TYPE_RUN = 0
+const MANUAL_CALL_TYPE_SIGNALS = 1
+
+function validateManualCallType(type) {
+ if (![
+ MANUAL_CALL_TYPE_RUN,
+ MANUAL_CALL_TYPE_SIGNALS
+ ].includes(type)) {
+ throw new Error('invalid manual call type')
+ }
+}
+
class WorkersList {
constructor() {
@@ -190,8 +202,9 @@ class WorkersList {
* @param {{id: int, target: string}[]} jobs
* @return {Promise<{jobs: {}, errors: {}}>}
*/
- async runManual(jobs) {
- this.logger.debug('runManual:', jobs)
+ async _runManualCall(callType, jobs) {
+ validateManualCallType(callType)
+ this.logger.debug(`runManualCall[${callType}]:`, jobs)
const workers = [...this.workers]
@@ -211,7 +224,7 @@ class WorkersList {
}
}
- this.logger.trace('runManual: targetWorkers:', targetWorkers)
+ this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers)
/**
* List of job IDs with unsupported targets.
@@ -219,10 +232,6 @@ class WorkersList {
* @type {int[]}
*/
const exceptions = []
-
- /**
- * @type {object.<int, int[]>}
- */
const callMap = {}
/**
@@ -246,11 +255,11 @@ class WorkersList {
if (callMap[workerIndex] === undefined)
callMap[workerIndex] = []
- callMap[workerIndex].push(id)
+ callMap[workerIndex].push(job)
}
- this.logger.trace('runManual: callMap:', callMap)
- this.logger.trace('runManual: exceptions:', exceptions)
+ this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap)
+ this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions)
/**
* @type {Promise[]}
@@ -266,23 +275,38 @@ class WorkersList {
if (!callMap.hasOwnProperty(workerIndex))
continue
- let workerJobIds = callMap[workerIndex]
+ let workerJobsData = callMap[workerIndex]
let worker = workers[workerIndex]
let conn = worker.connection
- let P = conn.sendRequest(
- new RequestMessage('run-manual', {ids: workerJobIds})
- )
+ let P
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)})
+ )
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ const data = {}
+ for (let jobData of workerJobsData)
+ data[jobData.id] = jobData.signal
+
+ P = conn.sendRequest(
+ new RequestMessage('send-signal', {jobs: data})
+ )
+ break
+ }
promises.push(P)
- jobsByPromise.push(workerJobIds)
+ jobsByPromise.push(workerJobsData.map(j => j.id))
}
- this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+ this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise)
const results = await Promise.allSettled(promises)
- this.logger.trace('runManual: Promise.allSettled results:', results)
+ this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results)
const response = {}
const setError = (id, value) => {
@@ -314,14 +338,24 @@ class WorkersList {
*/
const responseMessage = result.value
- const {jobs, errors} = responseMessage.data
- this.logger.trace(`[${i}]:`, jobs, errors)
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
- if (jobs)
- setData(jobs)
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ Object.assign(response, responseMessage.data)
+ break
+ }
- if (errors)
- setError(errors)
} else if (result.status === 'rejected') {
for (let jobIds of jobsByPromise[i]) {
@@ -340,6 +374,14 @@ class WorkersList {
return response
}
+ async runManual(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs)
+ }
+
+ async sendSignals(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs)
+ }
+
/**
* @param {null|string[]} targets
*/