diff options
-rw-r--r-- | README.md | 4 | ||||
-rwxr-xr-x | src/jobd-master.js | 14 | ||||
-rw-r--r-- | src/lib/server.js | 2 | ||||
-rw-r--r-- | src/lib/workers-list.js | 50 |
4 files changed, 59 insertions, 11 deletions
@@ -70,7 +70,9 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s. * **`poke(targets: string[])`** — send `poll` requests to all registered workers that serve specified `targets`. -* **`status()`** — returns list of registered workers and memory usage. +* **`status(poll_workers=false: bool)`** — returns list of registered workers and + memory usage. If `pollWorkers` is true, sends `status()` request to all registered + workers and includes their responses. * **`run-manual(jobs: {id: int, target: string}[])`** — send `run-manual` requests to registered jobd instances serving specified targets, and return diff --git a/src/jobd-master.js b/src/jobd-master.js index 602af9a..05da7f4 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -125,14 +125,18 @@ async function onRequestMessage(message, connection) { } case 'status': - const info = workers.getInfo() + const info = await workers.getInfo(message.requestData?.poll_workers || false) + + let status = { + workers: info, + memoryUsage: process.memoryUsage() + } + connection.send( new ResponseMessage(message.requestNo) - .setData({ - workers: info, - memoryUsage: process.memoryUsage() - }) + .setData(status) ) + break default: diff --git a/src/lib/server.js b/src/lib/server.js index ca06a6d..dec9f06 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -521,7 +521,7 @@ class Connection extends EventEmitter { * Send request * * @param {RequestMessage} message - * @return {Promise} + * @return {Promise<ResponseMessage>} */ sendRequest(message) { if (!(message instanceof RequestMessage)) diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 67df2f8..c7b3ab1 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -110,24 +110,66 @@ class WorkersList { */ _pokeWorkerConnection(connection, targets) { this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.sendRequest( new RequestMessage('poll', { targets }) ) + .then(error => { + this.logger.error('_pokeWorkerConnection:', error) + }) } /** * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} */ - getInfo() { - return this.workers.map(worker => { - return { + async getInfo(pollWorkers = false) { + const promises = [] + + const workers = [...this.workers] + + for (let i = 0; i < workers.length; i++) { + let worker = workers[i] + + let P + if (pollWorkers) { + P = worker.connection.sendRequest(new RequestMessage('status')) + } else { + P = Promise.resolve() + } + + promises.push(P) + } + + const results = await Promise.allSettled(promises) + + let info = [] + for (let i = 0; i < results.length; i++) { + const result = results[i] + const worker = workers[i] + const workerInfo = { remoteAddr: worker.connection.socket?.remoteAddress, remotePort: worker.connection.socket?.remotePort, targets: worker.targets } - }) + + if (pollWorkers) { + if (result.status === 'fulfilled') { + /** + * @type {ResponseMessage} + */ + let response = result.value + workerInfo.workerStatus = response.data + } else if (result.status === 'rejected') { + workerInfo.workerStatusError = result.reason?.message + } + } + + info.push(workerInfo) + } + + return info } /** |