From a6bdd77f06f4d6e6b7876017d4c29bb41da8545f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Mon, 3 Apr 2023 13:54:30 +0300 Subject: signals support --- README.md | 13 +++- jobd.conf.example | 2 +- package-lock.json | 162 +++++++++++++++++++++++------------------------- package.json | 8 +-- src/jobd-master.js | 23 +++++++ src/jobd.js | 18 +++++- src/lib/worker.js | 25 ++++++++ src/lib/workers-list.js | 88 +++++++++++++++++++------- 8 files changed, 225 insertions(+), 114 deletions(-) diff --git a/README.md b/README.md index 9e0d835..5da68ba 100644 --- a/README.md +++ b/README.md @@ -609,6 +609,12 @@ Here is the list of supported requests, using `type(arguments)` notation. An object whose keys represent failed job IDs and whose values are error messages. +* ##### `send-signal(jobs: object)` + Send signals to jobs which are still executing and return results. + + Response [data](#data-array--object--string--int) type: **object** with job IDs as keys and + kill status (boolean where true means that signal is successfully delivered) as values. + #### jobd-master requests * ##### `register-worker(targets: string[], name: string)` @@ -636,6 +642,10 @@ Here is the list of supported requests, using `type(arguments)` notation. Send [`run-manual()`](#run-manualids-int) requests to registered jobd instances serving specified targets, aggregate and return results. +* ##### `send-signal(jobs: {id: int, signal: int, target: string}[])` + Send [`send-signal()`](#send-signal-jobs) requests to registered jobd instances + serving specified targets, aggregate and return results. + ### Response Message `DATA` is a JSON object with following keys: @@ -665,7 +675,8 @@ Example (w/o trailing `EOT`): ## TODO -- graceful shutdown +- graceful shutdown of jobd +- support signals in jobctl ## License diff --git a/jobd.conf.example b/jobd.conf.example index 57e0701..9746385 100644 --- a/jobd.conf.example +++ b/jobd.conf.example @@ -14,7 +14,7 @@ log_level_file = info log_level_console = debug ; mysql settings -mysql_host = 10.211.55.6 +mysql_host = 127.0.0.1 mysql_port = 3306 mysql_user = jobd mysql_password = password diff --git a/package-lock.json b/package-lock.json index de731e3..ba9f699 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,11 +1,12 @@ { "name": "jobd", - "version": "1.11.0", + "version": "1.12.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "version": "1.11.0", + "name": "jobd", + "version": "1.12.0", "license": "BSD-2-Clause", "os": [ "darwin", @@ -15,8 +16,8 @@ "columnify": "^1.5.4", "ini": "^2.0.0", "lodash": "^4.17.21", - "log4js": "^6.3.0", - "minimist": "^1.2.5", + "log4js": "^6.4.0", + "minimist": "^1.2.8", "mysql": "^2.18.1", "promise-mysql": "^5.0.2", "queue": "^6.0.2" @@ -92,22 +93,27 @@ "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" }, "node_modules/date-format": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz", - "integrity": "sha512-eyTcpKOcamdhWJXj56DpQMo1ylSQpcGtGKXcU0Tb97+K56/CF5amAqqqNj0+KvA0iw2ynxtHWFsPDSClCxe48w==", + "version": "4.0.14", + "resolved": "https://registry.npmjs.org/date-format/-/date-format-4.0.14.tgz", + "integrity": "sha512-39BOQLs9ZjKh0/patS9nrT8wc3ioX3/eA/zgbKNopnF2wCqJEoxywwwElATYvRsXdnOxA/OQeQoFZ3rFjVajhg==", "engines": { "node": ">=4.0" } }, "node_modules/debug": { - "version": "4.3.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", - "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", "dependencies": { "ms": "2.1.2" }, "engines": { "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } } }, "node_modules/defaults": { @@ -119,9 +125,9 @@ } }, "node_modules/flatted": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/flatted/-/flatted-2.0.2.tgz", - "integrity": "sha512-r5wGx7YeOwNWNlCA0wQ86zKyDLMQr+/RB8xy74M4hTphfmjlijTSSXGuH8rnvKZnfT9i+75zmd8jcKdMR4O6jA==" + "version": "3.2.7", + "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.2.7.tgz", + "integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==" }, "node_modules/fs-extra": { "version": "8.1.0", @@ -137,9 +143,9 @@ } }, "node_modules/graceful-fs": { - "version": "4.2.6", - "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz", - "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==" + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" }, "node_modules/inherits": { "version": "2.0.4", @@ -162,8 +168,8 @@ "node_modules/jsonfile": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz", - "integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=", - "dependencies": { + "integrity": "sha512-m6F1R3z8jjlf2imQHS2Qez5sjKWQzbuuhuJ/FKYFRZvPE3PuHcSMVZzfsLhGVOkfd20obL5SWEBew5ShlquNxg==", + "optionalDependencies": { "graceful-fs": "^4.1.6" } }, @@ -173,24 +179,27 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "node_modules/log4js": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/log4js/-/log4js-6.3.0.tgz", - "integrity": "sha512-Mc8jNuSFImQUIateBFwdOQcmC6Q5maU0VVvdC2R6XMb66/VnT+7WS4D/0EeNMZu1YODmJe5NIn2XftCzEocUgw==", + "version": "6.9.1", + "resolved": "https://registry.npmjs.org/log4js/-/log4js-6.9.1.tgz", + "integrity": "sha512-1somDdy9sChrr9/f4UlzhdaGfDR2c/SaD2a4T7qEkG4jTS57/B3qmnjLYePwQ8cqWnUHZI0iAKxMBpCZICiZ2g==", "dependencies": { - "date-format": "^3.0.0", - "debug": "^4.1.1", - "flatted": "^2.0.1", - "rfdc": "^1.1.4", - "streamroller": "^2.2.4" + "date-format": "^4.0.14", + "debug": "^4.3.4", + "flatted": "^3.2.7", + "rfdc": "^1.3.0", + "streamroller": "^3.1.5" }, "engines": { "node": ">=8.0" } }, "node_modules/minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==" + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", + "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } }, "node_modules/ms": { "version": "2.1.2", @@ -250,9 +259,9 @@ } }, "node_modules/rfdc": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.2.0.tgz", - "integrity": "sha512-ijLyszTMmUrXvjSooucVQwimGUk84eRcmCuLV8Xghe3UO85mjUtRAHRyoMM6XtyqbECaXuBWx18La3523sXINA==" + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.3.0.tgz", + "integrity": "sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA==" }, "node_modules/safe-buffer": { "version": "5.1.2", @@ -268,26 +277,18 @@ } }, "node_modules/streamroller": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/streamroller/-/streamroller-2.2.4.tgz", - "integrity": "sha512-OG79qm3AujAM9ImoqgWEY1xG4HX+Lw+yY6qZj9R1K2mhF5bEmQ849wvrb+4vt4jLMLzwXttJlQbOdPOQVRv7DQ==", + "version": "3.1.5", + "resolved": "https://registry.npmjs.org/streamroller/-/streamroller-3.1.5.tgz", + "integrity": "sha512-KFxaM7XT+irxvdqSP1LGLgNWbYN7ay5owZ3r/8t77p+EtSUAfUgtl7be3xtqtOmGUl9K9YPO2ca8133RlTjvKw==", "dependencies": { - "date-format": "^2.1.0", - "debug": "^4.1.1", + "date-format": "^4.0.14", + "debug": "^4.3.4", "fs-extra": "^8.1.0" }, "engines": { "node": ">=8.0" } }, - "node_modules/streamroller/node_modules/date-format": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/date-format/-/date-format-2.1.0.tgz", - "integrity": "sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA==", - "engines": { - "node": ">=4.0" - } - }, "node_modules/string_decoder": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", @@ -383,14 +384,14 @@ "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" }, "date-format": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/date-format/-/date-format-3.0.0.tgz", - "integrity": "sha512-eyTcpKOcamdhWJXj56DpQMo1ylSQpcGtGKXcU0Tb97+K56/CF5amAqqqNj0+KvA0iw2ynxtHWFsPDSClCxe48w==" + "version": "4.0.14", + "resolved": "https://registry.npmjs.org/date-format/-/date-format-4.0.14.tgz", + "integrity": "sha512-39BOQLs9ZjKh0/patS9nrT8wc3ioX3/eA/zgbKNopnF2wCqJEoxywwwElATYvRsXdnOxA/OQeQoFZ3rFjVajhg==" }, "debug": { - "version": "4.3.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", - "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", "requires": { "ms": "2.1.2" } @@ -404,9 +405,9 @@ } }, "flatted": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/flatted/-/flatted-2.0.2.tgz", - "integrity": "sha512-r5wGx7YeOwNWNlCA0wQ86zKyDLMQr+/RB8xy74M4hTphfmjlijTSSXGuH8rnvKZnfT9i+75zmd8jcKdMR4O6jA==" + "version": "3.2.7", + "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.2.7.tgz", + "integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==" }, "fs-extra": { "version": "8.1.0", @@ -419,9 +420,9 @@ } }, "graceful-fs": { - "version": "4.2.6", - "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz", - "integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ==" + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==" }, "inherits": { "version": "2.0.4", @@ -441,7 +442,7 @@ "jsonfile": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz", - "integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=", + "integrity": "sha512-m6F1R3z8jjlf2imQHS2Qez5sjKWQzbuuhuJ/FKYFRZvPE3PuHcSMVZzfsLhGVOkfd20obL5SWEBew5ShlquNxg==", "requires": { "graceful-fs": "^4.1.6" } @@ -452,21 +453,21 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "log4js": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/log4js/-/log4js-6.3.0.tgz", - "integrity": "sha512-Mc8jNuSFImQUIateBFwdOQcmC6Q5maU0VVvdC2R6XMb66/VnT+7WS4D/0EeNMZu1YODmJe5NIn2XftCzEocUgw==", + "version": "6.9.1", + "resolved": "https://registry.npmjs.org/log4js/-/log4js-6.9.1.tgz", + "integrity": "sha512-1somDdy9sChrr9/f4UlzhdaGfDR2c/SaD2a4T7qEkG4jTS57/B3qmnjLYePwQ8cqWnUHZI0iAKxMBpCZICiZ2g==", "requires": { - "date-format": "^3.0.0", - "debug": "^4.1.1", - "flatted": "^2.0.1", - "rfdc": "^1.1.4", - "streamroller": "^2.2.4" + "date-format": "^4.0.14", + "debug": "^4.3.4", + "flatted": "^3.2.7", + "rfdc": "^1.3.0", + "streamroller": "^3.1.5" } }, "minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==" + "version": "1.2.8", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.8.tgz", + "integrity": "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==" }, "ms": { "version": "2.1.2", @@ -523,9 +524,9 @@ } }, "rfdc": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.2.0.tgz", - "integrity": "sha512-ijLyszTMmUrXvjSooucVQwimGUk84eRcmCuLV8Xghe3UO85mjUtRAHRyoMM6XtyqbECaXuBWx18La3523sXINA==" + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/rfdc/-/rfdc-1.3.0.tgz", + "integrity": "sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA==" }, "safe-buffer": { "version": "5.1.2", @@ -538,20 +539,13 @@ "integrity": "sha1-R1OT/56RR5rqYtyvDKPRSYOn+0A=" }, "streamroller": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/streamroller/-/streamroller-2.2.4.tgz", - "integrity": "sha512-OG79qm3AujAM9ImoqgWEY1xG4HX+Lw+yY6qZj9R1K2mhF5bEmQ849wvrb+4vt4jLMLzwXttJlQbOdPOQVRv7DQ==", + "version": "3.1.5", + "resolved": "https://registry.npmjs.org/streamroller/-/streamroller-3.1.5.tgz", + "integrity": "sha512-KFxaM7XT+irxvdqSP1LGLgNWbYN7ay5owZ3r/8t77p+EtSUAfUgtl7be3xtqtOmGUl9K9YPO2ca8133RlTjvKw==", "requires": { - "date-format": "^2.1.0", - "debug": "^4.1.1", + "date-format": "^4.0.14", + "debug": "^4.3.4", "fs-extra": "^8.1.0" - }, - "dependencies": { - "date-format": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/date-format/-/date-format-2.1.0.tgz", - "integrity": "sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA==" - } } }, "string_decoder": { diff --git a/package.json b/package.json index 16ea6b6..1c2f2d1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "jobd", - "version": "1.11.0", + "version": "1.12.0", "description": "job queue daemon", "main": "src/jobd", "homepage": "https://github.com/gch1p/jobd#readme", @@ -18,7 +18,7 @@ }, "keywords": [], "author": "Evgeny Zinoviev", - "license": "BSD-2-Clause", + "license": "MIT", "os": [ "darwin", "linux" @@ -30,8 +30,8 @@ "columnify": "^1.5.4", "ini": "^2.0.0", "lodash": "^4.17.21", - "log4js": "^6.3.0", - "minimist": "^1.2.5", + "log4js": "^6.4.0", + "minimist": "^1.2.8", "mysql": "^2.18.1", "promise-mysql": "^5.0.2", "queue": "^6.0.2" diff --git a/src/jobd-master.js b/src/jobd-master.js index 34e03ba..3bdc15a 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -114,6 +114,7 @@ function initRequestHandler() { requestHandler.set('run-manual', onRunManual) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) + requestHandler.set('send-signal', onSendSignal) } function usage() { @@ -223,3 +224,25 @@ function onContinue(data, requestNo, connection) { return 'ok' } + +/** + * @param {object} data + * @return {Promise<*>} + */ +async function onSendSignal(data) { + const {jobs} = data + + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['signal', 'i', true], + ['target', 's', true], + ]) + } + + return await workers.sendSignals(jobs) +} \ No newline at end of file diff --git a/src/jobd.js b/src/jobd.js index 7c63607..5dd0d6d 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -109,7 +109,10 @@ async function initApp(appName) { }) logger = loggerModule.getLogger(appName) - process.title = appName + let processTitle = `${appName}` + if (config.get('name')) + processTitle += ` ${config.get('name')}` + process.title = processTitle } function initWorker() { @@ -141,6 +144,7 @@ function initRequestHandler() { requestHandler.set('poll', onPollRequest) requestHandler.set('status', onStatus) requestHandler.set('run-manual', onRunManual) + requestHandler.set('send-signal', onSendSignal) requestHandler.set('pause', onPause) requestHandler.set('continue', onContinue) requestHandler.set('add-target', onAddTarget) @@ -345,6 +349,18 @@ async function onRunManual(data) { return P } +async function onSendSignal(data) { + const {jobs: jobToSignalMap} = data + const results = {} + for (const id in jobToSignalMap) { + if (!jobToSignalMap.hasOwnProperty(id)) + continue + const signal = jobToSignalMap[id] + results[id] = worker.killJobProcess(id, signal) + } + return results +} + /** * @param {{targets: string[]}} data */ diff --git a/src/lib/worker.js b/src/lib/worker.js index 3a4bb83..673f0eb 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -44,6 +44,11 @@ class Worker extends EventEmitter { * @type {Logger} */ this.logger = getLogger('Worker') + + /** + * @type {{}} + */ + this.runningProcesses = {} } /** @@ -480,6 +485,7 @@ class Worker extends EventEmitter { cwd, env }) + this.runningProcesses[id] = process let stdoutChunks = [] let stderrChunks = [] @@ -490,6 +496,8 @@ class Worker extends EventEmitter { * @param {null|string} signal */ (code, signal) => { + delete this.runningProcesses[id] + let stdout = stdoutChunks.join('') let stderr = stderrChunks.join('') @@ -505,6 +513,7 @@ class Worker extends EventEmitter { }) process.on('error', (error) => { + delete this.runningProcesses[id] reject(error) }) @@ -601,6 +610,22 @@ class Worker extends EventEmitter { } } + /** + * @param {number} id + * @param {number} signal + * @return {boolean} + */ + killJobProcess(id, signal) { + if (this.runningProcesses[id] !== undefined) { + try { + return this.runningProcesses[id].kill(signal) + } catch (error) { + this.logger.error(`killJobProcess(${id}, ${signal})`, error) + } + } + return false + } + } module.exports = { diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c779ec2..4fc5c53 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -3,6 +3,18 @@ const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') +const MANUAL_CALL_TYPE_RUN = 0 +const MANUAL_CALL_TYPE_SIGNALS = 1 + +function validateManualCallType(type) { + if (![ + MANUAL_CALL_TYPE_RUN, + MANUAL_CALL_TYPE_SIGNALS + ].includes(type)) { + throw new Error('invalid manual call type') + } +} + class WorkersList { constructor() { @@ -190,8 +202,9 @@ class WorkersList { * @param {{id: int, target: string}[]} jobs * @return {Promise<{jobs: {}, errors: {}}>} */ - async runManual(jobs) { - this.logger.debug('runManual:', jobs) + async _runManualCall(callType, jobs) { + validateManualCallType(callType) + this.logger.debug(`runManualCall[${callType}]:`, jobs) const workers = [...this.workers] @@ -211,7 +224,7 @@ class WorkersList { } } - this.logger.trace('runManual: targetWorkers:', targetWorkers) + this.logger.trace(`runManualCall[${callType}]: targetWorkers:`, targetWorkers) /** * List of job IDs with unsupported targets. @@ -219,10 +232,6 @@ class WorkersList { * @type {int[]} */ const exceptions = [] - - /** - * @type {object.} - */ const callMap = {} /** @@ -246,11 +255,11 @@ class WorkersList { if (callMap[workerIndex] === undefined) callMap[workerIndex] = [] - callMap[workerIndex].push(id) + callMap[workerIndex].push(job) } - this.logger.trace('runManual: callMap:', callMap) - this.logger.trace('runManual: exceptions:', exceptions) + this.logger.trace(`runManualCall[${callType}]: callMap:`, callMap) + this.logger.trace(`runManualCall[${callType}]: exceptions:`, exceptions) /** * @type {Promise[]} @@ -266,23 +275,38 @@ class WorkersList { if (!callMap.hasOwnProperty(workerIndex)) continue - let workerJobIds = callMap[workerIndex] + let workerJobsData = callMap[workerIndex] let worker = workers[workerIndex] let conn = worker.connection - let P = conn.sendRequest( - new RequestMessage('run-manual', {ids: workerJobIds}) - ) + let P + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobsData.map(j => j.id)}) + ) + break + + case MANUAL_CALL_TYPE_SIGNALS: + const data = {} + for (let jobData of workerJobsData) + data[jobData.id] = jobData.signal + + P = conn.sendRequest( + new RequestMessage('send-signal', {jobs: data}) + ) + break + } promises.push(P) - jobsByPromise.push(workerJobIds) + jobsByPromise.push(workerJobsData.map(j => j.id)) } - this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + this.logger.trace(`runManualCall[${callType}]: jobsByPromise:`, jobsByPromise) const results = await Promise.allSettled(promises) - this.logger.trace('runManual: Promise.allSettled results:', results) + this.logger.trace(`runManualCall[${callType}]: Promise.allSettled results:`, results) const response = {} const setError = (id, value) => { @@ -314,14 +338,24 @@ class WorkersList { */ const responseMessage = result.value - const {jobs, errors} = responseMessage.data - this.logger.trace(`[${i}]:`, jobs, errors) + switch (callType) { + case MANUAL_CALL_TYPE_RUN: + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) - if (jobs) - setData(jobs) + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + break + + case MANUAL_CALL_TYPE_SIGNALS: + Object.assign(response, responseMessage.data) + break + } - if (errors) - setError(errors) } else if (result.status === 'rejected') { for (let jobIds of jobsByPromise[i]) { @@ -340,6 +374,14 @@ class WorkersList { return response } + async runManual(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_RUN, jobs) + } + + async sendSignals(jobs) { + return await this._runManualCall(MANUAL_CALL_TYPE_SIGNALS, jobs) + } + /** * @param {null|string[]} targets */ -- cgit v1.2.3