summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2023-04-03 13:54:30 +0300
committerEvgeny Zinoviev <me@ch1p.io>2023-04-13 02:19:43 +0300
commita6bdd77f06f4d6e6b7876017d4c29bb41da8545f (patch)
treea8117d611a48e6e49b6b96f1d5b99338ebfba4e1 /src
parent0775fb2439a71a9bea3bb6e603c24ddd908a12a3 (diff)
signals supportHEADmaster
Diffstat (limited to 'src')
-rwxr-xr-xsrc/jobd-master.js23
-rwxr-xr-xsrc/jobd.js18
-rw-r--r--src/lib/worker.js25
-rw-r--r--src/lib/workers-list.js88
4 files changed, 130 insertions, 24 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 34e03ba..3bdc15a 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -114,6 +114,7 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
+ requestHandler.set('send-signal', onSendSignal)
}
function usage() {
@@ -223,3 +224,25 @@ function onContinue(data, requestNo, connection) {
return 'ok'
}
+
+/**
+ * @param {object} data
+ * @return {Promise<*>}
+ */
+async function onSendSignal(data) {
+ const {jobs} = data
+
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
+
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['signal', 'i', true],
+ ['target', 's', true],
+ ])
+ }
+
+ return await workers.sendSignals(jobs)
+} \ No newline at end of file
diff --git a/src/jobd.js b/src/jobd.js
index 7c63607..5dd0d6d 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -109,7 +109,10 @@ async function initApp(appName) {
})
logger = loggerModule.getLogger(appName)
- process.title = appName
+ let processTitle = `${appName}`
+ if (config.get('name'))
+ processTitle += ` ${config.get('name')}`
+ process.title = processTitle
}
function initWorker() {
@@ -141,6 +144,7 @@ function initRequestHandler() {
requestHandler.set('poll', onPollRequest)
requestHandler.set('status', onStatus)
requestHandler.set('run-manual', onRunManual)
+ requestHandler.set('send-signal', onSendSignal)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
requestHandler.set('add-target', onAddTarget)
@@ -345,6 +349,18 @@ async function onRunManual(data) {
return P
}
+async function onSendSignal(data) {
+ const {jobs: jobToSignalMap} = data
+ const results = {}
+ for (const id in jobToSignalMap) {
+ if (!jobToSignalMap.hasOwnProperty(id))
+ continue
+ const signal = jobToSignalMap[id]
+ results[id] = worker.killJobProcess(id, signal)
+ }
+ return results
+}
+
/**
* @param {{targets: string[]}} data
*/
diff --git a/src/lib/worker.js b/src/lib/worker.js
index 3a4bb83..673f0eb 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -44,6 +44,11 @@ class Worker extends EventEmitter {
* @type {Logger}
*/
this.logger = getLogger('Worker')
+
+ /**
+ * @type {{}}
+ */
+ this.runningProcesses = {}
}
/**
@@ -480,6 +485,7 @@ class Worker extends EventEmitter {
cwd,
env
})
+ this.runningProcesses[id] = process
let stdoutChunks = []
let stderrChunks = []
@@ -490,6 +496,8 @@ class Worker extends EventEmitter {
* @param {null|string} signal
*/
(code, signal) => {
+ delete this.runningProcesses[id]
+
let stdout = stdoutChunks.join('')
let stderr = stderrChunks.join('')
@@ -505,6 +513,7 @@ class Worker extends EventEmitter {
})
process.on('error', (error) => {
+ delete this.runningProcesses[id]
reject(error)
})
@@ -601,6 +610,22 @@ class Worker extends EventEmitter {
}
}
+ /**
+ * @param {number} id
+ * @param {number} signal
+ * @return {boolean}
+ */
+ killJobProcess(id, signal) {
+ if (this.runningProcesses[id] !== undefined) {
+ try {
+ return this.runningProcesses[id].kill(signal)
+ } catch (error) {
+ this.logger.error(`killJobProcess(${id}, ${signal})`, error)
+ }
+ }
+ return false
+ }
+
}
module.exports = {
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c779ec2..4fc5c53 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -3,6 +3,18 @@ const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
+const MANUAL_CALL_TYPE_RUN = 0
+const MANUAL_CALL_TYPE_SIGNALS = 1
+
+function validateManualCallType(type) {
+ if (![
+ MANUAL_CALL_TYPE_RUN,
+ MANUAL_CALL_TYPE_SIGNALS
+ ].includes(type)) {
+ throw new Error('invalid manual call type')
+ }
+}
+
class WorkersList {
constructor() {
@@ -190,8 +202,9 @@ class WorkersList {
* @param {{id: int, target: string}[]} jobs
* @return {Promise<{jobs: {}, errors: {}}>}
*/
- async runManual(jobs) {
- this.logger.debug('runManual:', jobs)
+ async _runManualCall(callType, jobs) {
+ validateManualCallType(callType)
+ this.logger.debug(`runManualCall[${callType}]:`, jobs)
const workers = [...this.workers]
@@ -211,7 +224,7 @@ class WorkersList {
}
}
- this.logger.trace('runManual: targetWorkers:', targetWorkers)
+ this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers)
/**
* List of job IDs with unsupported targets.
@@ -219,10 +232,6 @@ class WorkersList {
* @type {int[]}
*/
const exceptions = []
-
- /**
- * @type {object.<int, int[]>}
- */
const callMap = {}
/**
@@ -246,11 +255,11 @@ class WorkersList {
if (callMap[workerIndex] === undefined)
callMap[workerIndex] = []
- callMap[workerIndex].push(id)
+ callMap[workerIndex].push(job)
}
- this.logger.trace('runManual: callMap:', callMap)
- this.logger.trace('runManual: exceptions:', exceptions)
+ this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap)
+ this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions)
/**
* @type {Promise[]}
@@ -266,23 +275,38 @@ class WorkersList {
if (!callMap.hasOwnProperty(workerIndex))
continue
- let workerJobIds = callMap[workerIndex]
+ let workerJobsData = callMap[workerIndex]
let worker = workers[workerIndex]
let conn = worker.connection
- let P = conn.sendRequest(
- new RequestMessage('run-manual', {ids: workerJobIds})
- )
+ let P
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)})
+ )
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ const data = {}
+ for (let jobData of workerJobsData)
+ data[jobData.id] = jobData.signal
+
+ P = conn.sendRequest(
+ new RequestMessage('send-signal', {jobs: data})
+ )
+ break
+ }
promises.push(P)
- jobsByPromise.push(workerJobIds)
+ jobsByPromise.push(workerJobsData.map(j => j.id))
}
- this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+ this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise)
const results = await Promise.allSettled(promises)
- this.logger.trace('runManual: Promise.allSettled results:', results)
+ this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results)
const response = {}
const setError = (id, value) => {
@@ -314,14 +338,24 @@ class WorkersList {
*/
const responseMessage = result.value
- const {jobs, errors} = responseMessage.data
- this.logger.trace(`[${i}]:`, jobs, errors)
+ switch (callType) {
+ case MANUAL_CALL_TYPE_RUN:
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
- if (jobs)
- setData(jobs)
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ break
+
+ case MANUAL_CALL_TYPE_SIGNALS:
+ Object.assign(response, responseMessage.data)
+ break
+ }
- if (errors)
- setError(errors)
} else if (result.status === 'rejected') {
for (let jobIds of jobsByPromise[i]) {
@@ -340,6 +374,14 @@ class WorkersList {
return response
}
+ async runManual(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs)
+ }
+
+ async sendSignals(jobs) {
+ return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs)
+ }
+
/**
* @param {null|string[]} targets
*/