From 1b803a258a32b87e3589b950702b005fb3062632 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 2 Mar 2021 01:48:32 +0300 Subject: jobd-master: support poll_workers in status() --- src/jobd-master.js | 14 +++++++++----- src/lib/server.js | 2 +- src/lib/workers-list.js | 50 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 56 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/jobd-master.js b/src/jobd-master.js index 602af9a..05da7f4 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -125,14 +125,18 @@ async function onRequestMessage(message, connection) { } case 'status': - const info = workers.getInfo() + const info = await workers.getInfo(message.requestData?.poll_workers || false) + + let status = { + workers: info, + memoryUsage: process.memoryUsage() + } + connection.send( new ResponseMessage(message.requestNo) - .setData({ - workers: info, - memoryUsage: process.memoryUsage() - }) + .setData(status) ) + break default: diff --git a/src/lib/server.js b/src/lib/server.js index ca06a6d..dec9f06 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -521,7 +521,7 @@ class Connection extends EventEmitter { * Send request * * @param {RequestMessage} message - * @return {Promise} + * @return {Promise} */ sendRequest(message) { if (!(message instanceof RequestMessage)) diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 67df2f8..c7b3ab1 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -110,24 +110,66 @@ class WorkersList { */ _pokeWorkerConnection(connection, targets) { this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.sendRequest( new RequestMessage('poll', { targets }) ) + .then(error => { + this.logger.error('_pokeWorkerConnection:', error) + }) } /** * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} */ - getInfo() { - return this.workers.map(worker => { - return { + async getInfo(pollWorkers = false) { + const promises = [] + + const workers = [...this.workers] + + for (let i = 0; i < workers.length; i++) { + let worker = workers[i] + + let P + if (pollWorkers) { + P = worker.connection.sendRequest(new RequestMessage('status')) + } else { + P = Promise.resolve() + } + + promises.push(P) + } + + const results = await Promise.allSettled(promises) + + let info = [] + for (let i = 0; i < results.length; i++) { + const result = results[i] + const worker = workers[i] + const workerInfo = { remoteAddr: worker.connection.socket?.remoteAddress, remotePort: worker.connection.socket?.remotePort, targets: worker.targets } - }) + + if (pollWorkers) { + if (result.status === 'fulfilled') { + /** + * @type {ResponseMessage} + */ + let response = result.value + workerInfo.workerStatus = response.data + } else if (result.status === 'rejected') { + workerInfo.workerStatusError = result.reason?.message + } + } + + info.push(workerInfo) + } + + return info } /** -- cgit v1.2.3