summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-02 23:54:12 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-02 23:54:12 +0300
commit12a2dda2b801487ccb10690d19d9f28aed90c57c (patch)
treec03da32d532d50ad7b07034a40a56d905ff240b9 /src/lib
parent7247c31d8ad2a22f95a870939d25ae267376e0de (diff)
jobd-master: support run-manual(); improve data validation here and there
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/data-validator.js22
-rw-r--r--src/lib/server.js16
-rw-r--r--src/lib/workers-list.js159
3 files changed, 189 insertions, 8 deletions
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 276ea9b..6d1d8f0 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -36,8 +36,9 @@ function checkType(expectedType, value) {
/**
* @param {object} data
* @param {array} schema
+ * @throws Error
*/
-function validateMessageData(data, schema) {
+function validateObjectSchema(data, schema) {
if (!isObject(data))
throw new Error(`data is not an object`)
@@ -68,4 +69,21 @@ function validateMessageData(data, schema) {
}
}
-module.exports = {validateMessageData} \ No newline at end of file
+function validateTargetsList(targets) {
+ if (!Array.isArray(targets))
+ throw new Error('targets must be array')
+
+ if (!targets.length)
+ throw new Error('targets are empty')
+
+ for (const t of targets) {
+ const type = typeof t
+ if (type !== 'string')
+ throw new Error(`all targets must be strings, ${type} given`)
+ }
+}
+
+module.exports = {
+ validateObjectSchema,
+ validateTargetsList
+} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index dec9f06..81c2c84 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -4,7 +4,7 @@ const {getLogger} = require('./logger')
const random = require('lodash/random')
const config = require('./config')
const {createCallablePromise} = require('./util')
-const {validateMessageData} = require('./data-validator')
+const {validateObjectSchema} = require('./data-validator')
const EOT = 0x04
const REQUEST_NO_LIMIT = 999999
@@ -43,7 +43,15 @@ class ResponseMessage extends Message {
super(Message.RESPONSE)
this.requestNo = requestNo
+
+ /**
+ * @type {null|string}
+ */
this.error = null
+
+ /**
+ * @type {null|string|number|object|array}
+ */
this.data = null
}
@@ -466,7 +474,7 @@ class Connection extends EventEmitter {
let data = json.shift()
try {
- validateMessageData(data, [
+ validateObjectSchema(data, [
// name type required
['type', 's', true],
['no', 'i', true],
@@ -489,7 +497,7 @@ class Connection extends EventEmitter {
let data = json.shift()
try {
- validateMessageData(data, [
+ validateObjectSchema(data, [
// name type required
['no', 'i', true],
['data', 'snoa', false],
@@ -641,7 +649,7 @@ class Connection extends EventEmitter {
this._handleClose()
this.logger.warn(`socket error:`, error)
}
-
+
}
module.exports = {
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c7b3ab1..41b13e2 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -1,8 +1,7 @@
-const intersection = require('lodash/intersection')
+const {intersection, throttle, sample} = require('lodash')
const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
-const throttle = require('lodash/throttle')
class WorkersList {
@@ -173,6 +172,162 @@ class WorkersList {
}
/**
+ * Send run-manual() requests to workers, aggregate and return results.
+ *
+ * @param {{id: int, target: string}[]} jobs
+ * @return {Promise<{jobs: {}, errors: {}}>}
+ */
+ async runManual(jobs) {
+ this.logger.debug('runManual:', jobs)
+
+ const workers = [...this.workers]
+
+ /**
+ * @type {object.<string, int[]>}
+ */
+ const targetWorkers = {}
+
+ for (let workerIndex = 0; workerIndex < workers.length; workerIndex++) {
+ const worker = workers[workerIndex]
+
+ for (let target of worker.targets) {
+ if (targetWorkers[target] === undefined)
+ targetWorkers[target] = []
+
+ targetWorkers[target].push(workerIndex)
+ }
+ }
+
+ this.logger.trace('runManual: targetWorkers:', targetWorkers)
+
+ /**
+ * List of job IDs with unsupported targets.
+ *
+ * @type {int[]}
+ */
+ const exceptions = []
+
+ /**
+ * @type {object.<int, int[]>}
+ */
+ const callMap = {}
+
+ /**
+ * @type {object.<int, string>}
+ */
+ const jobToTargetMap = {}
+
+ for (const job of jobs) {
+ const {id, target} = job
+
+ jobToTargetMap[id] = target
+
+ // if worker serving this target not found, skip the job
+ if (targetWorkers[target] === undefined) {
+ exceptions.push(id)
+ continue
+ }
+
+ // get random worker index
+ let workerIndex = sample(targetWorkers[target])
+ if (callMap[workerIndex] === undefined)
+ callMap[workerIndex] = []
+
+ callMap[workerIndex].push(id)
+ }
+
+ this.logger.trace('runManual: callMap:', callMap)
+ this.logger.trace('runManual: exceptions:', exceptions)
+
+ /**
+ * @type {Promise[]}
+ */
+ const promises = []
+
+ /**
+ * @type {int[][]}
+ */
+ const jobsByPromise = []
+
+ for (const workerIndex in callMap) {
+ if (!callMap.hasOwnProperty(workerIndex))
+ continue
+
+ let workerJobIds = callMap[workerIndex]
+ let worker = workers[workerIndex]
+ let conn = worker.connection
+
+ let P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobIds})
+ )
+
+ promises.push(P)
+ jobsByPromise.push(workerJobIds)
+ }
+
+ this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+
+ const results = await Promise.allSettled(promises)
+
+ this.logger.trace('runManual: Promise.allSettled results:', results)
+
+ const response = {}
+ const setError = (id, value) => {
+ if (!('errors' in response))
+ response.errors = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.errors, id)
+ } else {
+ response.errors[id] = value
+ }
+ }
+ const setData = (id, value) => {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.jobs, id)
+ } else {
+ response.jobs[id] = value
+ }
+ }
+
+ for (let i = 0; i < results.length; i++) {
+ let result = results[i]
+ if (result.status === 'fulfilled') {
+ /**
+ * @type {ResponseMessage}
+ */
+ const responseMessage = result.value
+
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
+
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ } else if (result.status === 'rejected') {
+ for (let jobIds of jobsByPromise[i]) {
+ for (let jobId of jobIds)
+ setError(jobId, result.reason?.message)
+ }
+ }
+ }
+
+ // don't forget about skipped jobs
+ if (exceptions.length) {
+ for (let id of exceptions)
+ setError(id, `worker serving target for ${jobToTargetMap[id]} not found`)
+ }
+
+ return response
+ }
+
+ /**
* @private
*/
sendPings = () => {