summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-02 01:48:32 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-02 21:31:57 +0300
commit1b803a258a32b87e3589b950702b005fb3062632 (patch)
tree4149722fdb0dc515d109973cf0bcee80cf8dd314
parent7307003e57eebffbd063c097e2a19e57a79ff05e (diff)
jobd-master: support poll_workers in status()
-rw-r--r--README.md4
-rwxr-xr-xsrc/jobd-master.js14
-rw-r--r--src/lib/server.js2
-rw-r--r--src/lib/workers-list.js50
4 files changed, 59 insertions, 11 deletions
diff --git a/README.md b/README.md
index b107382..61df993 100644
--- a/README.md
+++ b/README.md
@@ -70,7 +70,9 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s.
* **`poke(targets: string[])`** — send `poll` requests to all registered workers that serve
specified `targets`.
-* **`status()`** — returns list of registered workers and memory usage.
+* **`status(poll_workers=false: bool)`** — returns list of registered workers and
+ memory usage. If `pollWorkers` is true, sends `status()` request to all registered
+ workers and includes their responses.
* **`run-manual(jobs: {id: int, target: string}[])`** — send `run-manual`
requests to registered jobd instances serving specified targets, and return
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 602af9a..05da7f4 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -125,14 +125,18 @@ async function onRequestMessage(message, connection) {
}
case 'status':
- const info = workers.getInfo()
+ const info = await workers.getInfo(message.requestData?.poll_workers || false)
+
+ let status = {
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ }
+
connection.send(
new ResponseMessage(message.requestNo)
- .setData({
- workers: info,
- memoryUsage: process.memoryUsage()
- })
+ .setData(status)
)
+
break
default:
diff --git a/src/lib/server.js b/src/lib/server.js
index ca06a6d..dec9f06 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -521,7 +521,7 @@ class Connection extends EventEmitter {
* Send request
*
* @param {RequestMessage} message
- * @return {Promise}
+ * @return {Promise<ResponseMessage>}
*/
sendRequest(message) {
if (!(message instanceof RequestMessage))
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index 67df2f8..c7b3ab1 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -110,24 +110,66 @@ class WorkersList {
*/
_pokeWorkerConnection(connection, targets) {
this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
+
connection.sendRequest(
new RequestMessage('poll', {
targets
})
)
+ .then(error => {
+ this.logger.error('_pokeWorkerConnection:', error)
+ })
}
/**
* @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
*/
- getInfo() {
- return this.workers.map(worker => {
- return {
+ async getInfo(pollWorkers = false) {
+ const promises = []
+
+ const workers = [...this.workers]
+
+ for (let i = 0; i < workers.length; i++) {
+ let worker = workers[i]
+
+ let P
+ if (pollWorkers) {
+ P = worker.connection.sendRequest(new RequestMessage('status'))
+ } else {
+ P = Promise.resolve()
+ }
+
+ promises.push(P)
+ }
+
+ const results = await Promise.allSettled(promises)
+
+ let info = []
+ for (let i = 0; i < results.length; i++) {
+ const result = results[i]
+ const worker = workers[i]
+ const workerInfo = {
remoteAddr: worker.connection.socket?.remoteAddress,
remotePort: worker.connection.socket?.remotePort,
targets: worker.targets
}
- })
+
+ if (pollWorkers) {
+ if (result.status === 'fulfilled') {
+ /**
+ * @type {ResponseMessage}
+ */
+ let response = result.value
+ workerInfo.workerStatus = response.data
+ } else if (result.status === 'rejected') {
+ workerInfo.workerStatusError = result.reason?.message
+ }
+ }
+
+ info.push(workerInfo)
+ }
+
+ return info
}
/**