summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-02 23:54:12 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-02 23:54:12 +0300
commit12a2dda2b801487ccb10690d19d9f28aed90c57c (patch)
treec03da32d532d50ad7b07034a40a56d905ff240b9
parent7247c31d8ad2a22f95a870939d25ae267376e0de (diff)
jobd-master: support run-manual(); improve data validation here and there
-rw-r--r--README.md5
-rwxr-xr-xsrc/jobd-master.js57
-rwxr-xr-xsrc/jobd.js29
-rw-r--r--src/lib/data-validator.js22
-rw-r--r--src/lib/server.js16
-rw-r--r--src/lib/workers-list.js159
6 files changed, 259 insertions, 29 deletions
diff --git a/README.md b/README.md
index d21a215..b008058 100644
--- a/README.md
+++ b/README.md
@@ -53,8 +53,8 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s.
### jobd requests
-* **`poll(targets=[]: string[])`** — get new tasks for specified `targets` from database.
- If `targets` is empty or not specified, get tasks for all serving targets.
+* **`poll(targets: string[])`** — get new tasks for specified `targets` from database.
+ If `targets` argument is not specified, get tasks for all serving targets.
* **`status()`** — returns status of internal queues and memory usage.
@@ -93,7 +93,6 @@ other:
- reload config at runtime
- jobctl
-
## License
BSD-2c \ No newline at end of file
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 05da7f4..8628764 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -4,6 +4,7 @@ const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const {Server, ResponseMessage, RequestMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
+const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator')
const package_json = require('../package.json')
/**
@@ -90,12 +91,16 @@ async function onRequestMessage(message, connection) {
switch (message.requestType) {
case 'register-worker': {
const targets = message.requestData?.targets || []
- if (!targets.length) {
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
connection.send(
new ResponseMessage(message.requestNo)
- .setError(`targets are empty`)
+ .setError(e.message)
)
- break
+ return
}
workers.add(connection, targets)
@@ -108,15 +113,22 @@ async function onRequestMessage(message, connection) {
case 'poke': {
const targets = message.requestData?.targets || []
- if (!targets.length) {
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
connection.send(
new ResponseMessage(message.requestNo)
- .setError(`targets are empty`)
+ .setError(e.message)
)
- break
+ return
}
+ // poke workers
workers.poke(targets)
+
+ // reply to user
connection.send(
new ResponseMessage(message.requestNo)
.setData('ok')
@@ -139,6 +151,39 @@ async function onRequestMessage(message, connection) {
break
+ case 'run-manual':
+ const jobs = message.requestData.jobs
+
+ // validate data
+ try {
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
+
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['target', 's', true],
+ ])
+ }
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ // run jobs on workers
+ const data = await workers.runManual(jobs)
+
+ // send result to the client
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData(data)
+ )
+ break
+
default:
connection.send(
new ResponseMessage(message.requestNo)
diff --git a/src/jobd.js b/src/jobd.js
index e341f3d..4605137 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -5,6 +5,7 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
+const {validateTargetsList} = require('./lib/data-validator')
const {
Server,
Connection,
@@ -145,22 +146,26 @@ async function onRequestMessage(message, connection) {
switch (message.requestType) {
case 'poll': {
- const targets = message.requestData?.targets || []
- if (!targets.length) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError('empty targets')
- )
- break
- }
+ // null means all
+ let targets = null
- for (const t of targets) {
- if (!worker.hasTarget(t)) {
+ if (message.requestData?.targets !== undefined) {
+ targets = message.requestData?.targets
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
+ }
+ } catch (e) {
connection.send(
new ResponseMessage(message.requestNo)
- .setError(`invalid target '${t}'`)
+ .setError(e.message)
)
- break
+ return
}
}
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 276ea9b..6d1d8f0 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -36,8 +36,9 @@ function checkType(expectedType, value) {
/**
* @param {object} data
* @param {array} schema
+ * @throws Error
*/
-function validateMessageData(data, schema) {
+function validateObjectSchema(data, schema) {
if (!isObject(data))
throw new Error(`data is not an object`)
@@ -68,4 +69,21 @@ function validateMessageData(data, schema) {
}
}
-module.exports = {validateMessageData} \ No newline at end of file
+function validateTargetsList(targets) {
+ if (!Array.isArray(targets))
+ throw new Error('targets must be array')
+
+ if (!targets.length)
+ throw new Error('targets are empty')
+
+ for (const t of targets) {
+ const type = typeof t
+ if (type !== 'string')
+ throw new Error(`all targets must be strings, ${type} given`)
+ }
+}
+
+module.exports = {
+ validateObjectSchema,
+ validateTargetsList
+} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index dec9f06..81c2c84 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -4,7 +4,7 @@ const {getLogger} = require('./logger')
const random = require('lodash/random')
const config = require('./config')
const {createCallablePromise} = require('./util')
-const {validateMessageData} = require('./data-validator')
+const {validateObjectSchema} = require('./data-validator')
const EOT = 0x04
const REQUEST_NO_LIMIT = 999999
@@ -43,7 +43,15 @@ class ResponseMessage extends Message {
super(Message.RESPONSE)
this.requestNo = requestNo
+
+ /**
+ * @type {null|string}
+ */
this.error = null
+
+ /**
+ * @type {null|string|number|object|array}
+ */
this.data = null
}
@@ -466,7 +474,7 @@ class Connection extends EventEmitter {
let data = json.shift()
try {
- validateMessageData(data, [
+ validateObjectSchema(data, [
// name type required
['type', 's', true],
['no', 'i', true],
@@ -489,7 +497,7 @@ class Connection extends EventEmitter {
let data = json.shift()
try {
- validateMessageData(data, [
+ validateObjectSchema(data, [
// name type required
['no', 'i', true],
['data', 'snoa', false],
@@ -641,7 +649,7 @@ class Connection extends EventEmitter {
this._handleClose()
this.logger.warn(`socket error:`, error)
}
-
+
}
module.exports = {
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index c7b3ab1..41b13e2 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -1,8 +1,7 @@
-const intersection = require('lodash/intersection')
+const {intersection, throttle, sample} = require('lodash')
const config = require('./config')
const {getLogger} = require('./logger')
const {RequestMessage, PingMessage} = require('./server')
-const throttle = require('lodash/throttle')
class WorkersList {
@@ -173,6 +172,162 @@ class WorkersList {
}
/**
+ * Send run-manual() requests to workers, aggregate and return results.
+ *
+ * @param {{id: int, target: string}[]} jobs
+ * @return {Promise<{jobs: {}, errors: {}}>}
+ */
+ async runManual(jobs) {
+ this.logger.debug('runManual:', jobs)
+
+ const workers = [...this.workers]
+
+ /**
+ * @type {object.<string, int[]>}
+ */
+ const targetWorkers = {}
+
+ for (let workerIndex = 0; workerIndex < workers.length; workerIndex++) {
+ const worker = workers[workerIndex]
+
+ for (let target of worker.targets) {
+ if (targetWorkers[target] === undefined)
+ targetWorkers[target] = []
+
+ targetWorkers[target].push(workerIndex)
+ }
+ }
+
+ this.logger.trace('runManual: targetWorkers:', targetWorkers)
+
+ /**
+ * List of job IDs with unsupported targets.
+ *
+ * @type {int[]}
+ */
+ const exceptions = []
+
+ /**
+ * @type {object.<int, int[]>}
+ */
+ const callMap = {}
+
+ /**
+ * @type {object.<int, string>}
+ */
+ const jobToTargetMap = {}
+
+ for (const job of jobs) {
+ const {id, target} = job
+
+ jobToTargetMap[id] = target
+
+ // if worker serving this target not found, skip the job
+ if (targetWorkers[target] === undefined) {
+ exceptions.push(id)
+ continue
+ }
+
+ // get random worker index
+ let workerIndex = sample(targetWorkers[target])
+ if (callMap[workerIndex] === undefined)
+ callMap[workerIndex] = []
+
+ callMap[workerIndex].push(id)
+ }
+
+ this.logger.trace('runManual: callMap:', callMap)
+ this.logger.trace('runManual: exceptions:', exceptions)
+
+ /**
+ * @type {Promise[]}
+ */
+ const promises = []
+
+ /**
+ * @type {int[][]}
+ */
+ const jobsByPromise = []
+
+ for (const workerIndex in callMap) {
+ if (!callMap.hasOwnProperty(workerIndex))
+ continue
+
+ let workerJobIds = callMap[workerIndex]
+ let worker = workers[workerIndex]
+ let conn = worker.connection
+
+ let P = conn.sendRequest(
+ new RequestMessage('run-manual', {ids: workerJobIds})
+ )
+
+ promises.push(P)
+ jobsByPromise.push(workerJobIds)
+ }
+
+ this.logger.trace('runManual: jobsByPromise:', jobsByPromise)
+
+ const results = await Promise.allSettled(promises)
+
+ this.logger.trace('runManual: Promise.allSettled results:', results)
+
+ const response = {}
+ const setError = (id, value) => {
+ if (!('errors' in response))
+ response.errors = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.errors, id)
+ } else {
+ response.errors[id] = value
+ }
+ }
+ const setData = (id, value) => {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (typeof id === 'object') {
+ Object.assign(response.jobs, id)
+ } else {
+ response.jobs[id] = value
+ }
+ }
+
+ for (let i = 0; i < results.length; i++) {
+ let result = results[i]
+ if (result.status === 'fulfilled') {
+ /**
+ * @type {ResponseMessage}
+ */
+ const responseMessage = result.value
+
+ const {jobs, errors} = responseMessage.data
+ this.logger.trace(`[${i}]:`, jobs, errors)
+
+ if (jobs)
+ setData(jobs)
+
+ if (errors)
+ setError(errors)
+
+ } else if (result.status === 'rejected') {
+ for (let jobIds of jobsByPromise[i]) {
+ for (let jobId of jobIds)
+ setError(jobId, result.reason?.message)
+ }
+ }
+ }
+
+ // don't forget about skipped jobs
+ if (exceptions.length) {
+ for (let id of exceptions)
+ setError(id, `worker serving target for ${jobToTargetMap[id]} not found`)
+ }
+
+ return response
+ }
+
+ /**
* @private
*/
sendPings = () => {