From 12a2dda2b801487ccb10690d19d9f28aed90c57c Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 2 Mar 2021 23:54:12 +0300 Subject: jobd-master: support run-manual(); improve data validation here and there --- README.md | 5 +- src/jobd-master.js | 57 +++++++++++++++-- src/jobd.js | 29 +++++---- src/lib/data-validator.js | 22 ++++++- src/lib/server.js | 16 +++-- src/lib/workers-list.js | 159 +++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 259 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index d21a215..b008058 100644 --- a/README.md +++ b/README.md @@ -53,8 +53,8 @@ For optimization purposes, you can turn fields `target` and `slot` into `ENUM`s. ### jobd requests -* **`poll(targets=[]: string[])`** — get new tasks for specified `targets` from database. - If `targets` is empty or not specified, get tasks for all serving targets. +* **`poll(targets: string[])`** — get new tasks for specified `targets` from database. + If `targets` argument is not specified, get tasks for all serving targets. * **`status()`** — returns status of internal queues and memory usage. @@ -93,7 +93,6 @@ other: - reload config at runtime - jobctl - ## License BSD-2c \ No newline at end of file diff --git a/src/jobd-master.js b/src/jobd-master.js index 05da7f4..8628764 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -4,6 +4,7 @@ const loggerModule = require('./lib/logger') const config = require('./lib/config') const {Server, ResponseMessage, RequestMessage} = require('./lib/server') const WorkersList = require('./lib/workers-list') +const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator') const package_json = require('../package.json') /** @@ -90,12 +91,16 @@ async function onRequestMessage(message, connection) { switch (message.requestType) { case 'register-worker': { const targets = message.requestData?.targets || [] - if (!targets.length) { + + // validate data + try { + validateTargetsList(targets) + } catch (e) { connection.send( new ResponseMessage(message.requestNo) - .setError(`targets are empty`) + .setError(e.message) ) - break + return } workers.add(connection, targets) @@ -108,15 +113,22 @@ async function onRequestMessage(message, connection) { case 'poke': { const targets = message.requestData?.targets || [] - if (!targets.length) { + + // validate data + try { + validateTargetsList(targets) + } catch (e) { connection.send( new ResponseMessage(message.requestNo) - .setError(`targets are empty`) + .setError(e.message) ) - break + return } + // poke workers workers.poke(targets) + + // reply to user connection.send( new ResponseMessage(message.requestNo) .setData('ok') @@ -139,6 +151,39 @@ async function onRequestMessage(message, connection) { break + case 'run-manual': + const jobs = message.requestData.jobs + + // validate data + try { + if (!Array.isArray(jobs)) + throw new Error('jobs must be array') + + for (let job of jobs) { + validateObjectSchema(job, [ + // name // type // required + ['id', 'i', true], + ['target', 's', true], + ]) + } + } catch (e) { + connection.send( + new ResponseMessage(message.requestNo) + .setError(e.message) + ) + return + } + + // run jobs on workers + const data = await workers.runManual(jobs) + + // send result to the client + connection.send( + new ResponseMessage(message.requestNo) + .setData(data) + ) + break + default: connection.send( new ResponseMessage(message.requestNo) diff --git a/src/jobd.js b/src/jobd.js index e341f3d..4605137 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -5,6 +5,7 @@ const config = require('./lib/config') const db = require('./lib/db') const {uniq} = require('lodash') const {createCallablePromise} = require('./lib/util') +const {validateTargetsList} = require('./lib/data-validator') const { Server, Connection, @@ -145,22 +146,26 @@ async function onRequestMessage(message, connection) { switch (message.requestType) { case 'poll': { - const targets = message.requestData?.targets || [] - if (!targets.length) { - connection.send( - new ResponseMessage(message.requestNo) - .setError('empty targets') - ) - break - } + // null means all + let targets = null - for (const t of targets) { - if (!worker.hasTarget(t)) { + if (message.requestData?.targets !== undefined) { + targets = message.requestData?.targets + + // validate data + try { + validateTargetsList(targets) + + for (const t of targets) { + if (!worker.hasTarget(t)) + throw new Error(`invalid target '${t}'`) + } + } catch (e) { connection.send( new ResponseMessage(message.requestNo) - .setError(`invalid target '${t}'`) + .setError(e.message) ) - break + return } } diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js index 276ea9b..6d1d8f0 100644 --- a/src/lib/data-validator.js +++ b/src/lib/data-validator.js @@ -36,8 +36,9 @@ function checkType(expectedType, value) { /** * @param {object} data * @param {array} schema + * @throws Error */ -function validateMessageData(data, schema) { +function validateObjectSchema(data, schema) { if (!isObject(data)) throw new Error(`data is not an object`) @@ -68,4 +69,21 @@ function validateMessageData(data, schema) { } } -module.exports = {validateMessageData} \ No newline at end of file +function validateTargetsList(targets) { + if (!Array.isArray(targets)) + throw new Error('targets must be array') + + if (!targets.length) + throw new Error('targets are empty') + + for (const t of targets) { + const type = typeof t + if (type !== 'string') + throw new Error(`all targets must be strings, ${type} given`) + } +} + +module.exports = { + validateObjectSchema, + validateTargetsList +} \ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index dec9f06..81c2c84 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -4,7 +4,7 @@ const {getLogger} = require('./logger') const random = require('lodash/random') const config = require('./config') const {createCallablePromise} = require('./util') -const {validateMessageData} = require('./data-validator') +const {validateObjectSchema} = require('./data-validator') const EOT = 0x04 const REQUEST_NO_LIMIT = 999999 @@ -43,7 +43,15 @@ class ResponseMessage extends Message { super(Message.RESPONSE) this.requestNo = requestNo + + /** + * @type {null|string} + */ this.error = null + + /** + * @type {null|string|number|object|array} + */ this.data = null } @@ -466,7 +474,7 @@ class Connection extends EventEmitter { let data = json.shift() try { - validateMessageData(data, [ + validateObjectSchema(data, [ // name type required ['type', 's', true], ['no', 'i', true], @@ -489,7 +497,7 @@ class Connection extends EventEmitter { let data = json.shift() try { - validateMessageData(data, [ + validateObjectSchema(data, [ // name type required ['no', 'i', true], ['data', 'snoa', false], @@ -641,7 +649,7 @@ class Connection extends EventEmitter { this._handleClose() this.logger.warn(`socket error:`, error) } - + } module.exports = { diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index c7b3ab1..41b13e2 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -1,8 +1,7 @@ -const intersection = require('lodash/intersection') +const {intersection, throttle, sample} = require('lodash') const config = require('./config') const {getLogger} = require('./logger') const {RequestMessage, PingMessage} = require('./server') -const throttle = require('lodash/throttle') class WorkersList { @@ -172,6 +171,162 @@ class WorkersList { return info } + /** + * Send run-manual() requests to workers, aggregate and return results. + * + * @param {{id: int, target: string}[]} jobs + * @return {Promise<{jobs: {}, errors: {}}>} + */ + async runManual(jobs) { + this.logger.debug('runManual:', jobs) + + const workers = [...this.workers] + + /** + * @type {object.} + */ + const targetWorkers = {} + + for (let workerIndex = 0; workerIndex < workers.length; workerIndex++) { + const worker = workers[workerIndex] + + for (let target of worker.targets) { + if (targetWorkers[target] === undefined) + targetWorkers[target] = [] + + targetWorkers[target].push(workerIndex) + } + } + + this.logger.trace('runManual: targetWorkers:', targetWorkers) + + /** + * List of job IDs with unsupported targets. + * + * @type {int[]} + */ + const exceptions = [] + + /** + * @type {object.} + */ + const callMap = {} + + /** + * @type {object.} + */ + const jobToTargetMap = {} + + for (const job of jobs) { + const {id, target} = job + + jobToTargetMap[id] = target + + // if worker serving this target not found, skip the job + if (targetWorkers[target] === undefined) { + exceptions.push(id) + continue + } + + // get random worker index + let workerIndex = sample(targetWorkers[target]) + if (callMap[workerIndex] === undefined) + callMap[workerIndex] = [] + + callMap[workerIndex].push(id) + } + + this.logger.trace('runManual: callMap:', callMap) + this.logger.trace('runManual: exceptions:', exceptions) + + /** + * @type {Promise[]} + */ + const promises = [] + + /** + * @type {int[][]} + */ + const jobsByPromise = [] + + for (const workerIndex in callMap) { + if (!callMap.hasOwnProperty(workerIndex)) + continue + + let workerJobIds = callMap[workerIndex] + let worker = workers[workerIndex] + let conn = worker.connection + + let P = conn.sendRequest( + new RequestMessage('run-manual', {ids: workerJobIds}) + ) + + promises.push(P) + jobsByPromise.push(workerJobIds) + } + + this.logger.trace('runManual: jobsByPromise:', jobsByPromise) + + const results = await Promise.allSettled(promises) + + this.logger.trace('runManual: Promise.allSettled results:', results) + + const response = {} + const setError = (id, value) => { + if (!('errors' in response)) + response.errors = {} + + if (typeof id === 'object') { + Object.assign(response.errors, id) + } else { + response.errors[id] = value + } + } + const setData = (id, value) => { + if (!('jobs' in response)) + response.jobs = {} + + if (typeof id === 'object') { + Object.assign(response.jobs, id) + } else { + response.jobs[id] = value + } + } + + for (let i = 0; i < results.length; i++) { + let result = results[i] + if (result.status === 'fulfilled') { + /** + * @type {ResponseMessage} + */ + const responseMessage = result.value + + const {jobs, errors} = responseMessage.data + this.logger.trace(`[${i}]:`, jobs, errors) + + if (jobs) + setData(jobs) + + if (errors) + setError(errors) + + } else if (result.status === 'rejected') { + for (let jobIds of jobsByPromise[i]) { + for (let jobId of jobIds) + setError(jobId, result.reason?.message) + } + } + } + + // don't forget about skipped jobs + if (exceptions.length) { + for (let id of exceptions) + setError(id, `worker serving target for ${jobToTargetMap[id]} not found`) + } + + return response + } + /** * @private */ -- cgit v1.2.3