aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-16 01:06:14 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-16 01:06:14 +0300
commit23c16a2c80f0614d0b31cba363bca66e1a60687b (patch)
tree6d019e63b5bc4d703d4319f444927ccc7c93f208
parentcbbe60df32bc591758dd98e624a9f3b3c53f1d23 (diff)
jobd: add add-target()/remove-target(); code refactoring
-rw-r--r--.gitignore1
-rw-r--r--README.md4
-rwxr-xr-xsrc/jobd-master.js214
-rwxr-xr-xsrc/jobd.js319
-rw-r--r--src/lib/data-validator.js58
-rw-r--r--src/lib/logger.js3
-rw-r--r--src/lib/request-handler.js17
-rw-r--r--src/lib/worker.js19
8 files changed, 280 insertions, 355 deletions
diff --git a/.gitignore b/.gitignore
index 7a1537b..1bc0523 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
.idea
node_modules
+Inspections.xml \ No newline at end of file
diff --git a/README.md b/README.md
index 9a4bc6f..a158235 100644
--- a/README.md
+++ b/README.md
@@ -132,6 +132,10 @@ Here is the list of supported requests, using `type(arguments)` notation.
* **`run-manual(ids: int[])`** — enqueue and run jobs with specified IDs and
`status` set to `manual`, and return results.
+* **`add-target(target: string, concurrency: int)`** — add target
+
+* **`remove-target(target: string, concurrency: int)`** — remove target
+
* **`set-target-concurrency(target: string, concurrency: int)`** — set concurrency
of target `target`.
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 9827126..eeac085 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -4,8 +4,12 @@ const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
-const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
-const RequestHandler = require('./lib/request-handler')
+const {
+ validateObjectSchema,
+ validateInputTargetsListFormat,
+ validateInputTargets
+} = require('./lib/data-validator')
+const {RequestHandler} = require('./lib/request-handler')
const package_json = require('../package.json')
const DEFAULT_CONFIG_PATH = "/etc/jobd-master.conf"
@@ -112,136 +116,94 @@ function initRequestHandler() {
requestHandler.set('continue', onContinue)
}
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
+
+ console.log(s)
+}
+
+async function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ await loggerModule.shutdown()
+ process.exit()
+}
+
+
+
+/****************************************/
+/** **/
+/** Request handlers **/
+/** **/
+/****************************************/
+
/**
* @param {object} data
- * @param {number} requestNo
* @param {Connection} connection
*/
-function onRegisterWorker(data, requestNo, connection) {
- const targets = data.targets || []
-
- // validate data
- try {
- validateTargetsListFormat(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- // register worker and reply with OK
+async function onRegisterWorker(data, connection) {
+ const targets = validateInputTargets(data, null)
workers.add(connection, targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
*/
-function onPoke(data, requestNo, connection) {
- const targets = data.targets || []
-
- // validate data
- try {
- validateTargetsListFormat(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- // poke workers
+async function onPoke(data) {
+ const targets = validateInputTargets(data, null)
workers.poke(targets)
-
- // reply to user
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
* @return {Promise<*>}
*/
-async function onStatus(data, requestNo, connection) {
+async function onStatus(data) {
const info = await workers.getInfo(data.poll_workers || false)
-
- let status = {
+ return {
workers: info,
memoryUsage: process.memoryUsage()
}
-
- connection.send(
- new ResponseMessage(requestNo)
- .setData(status)
- )
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
* @return {Promise<*>}
*/
-async function onRunManual(data, requestNo, connection) {
+async function onRunManual(data) {
const {jobs} = data
- // validate data
- try {
- if (!Array.isArray(jobs))
- throw new Error('jobs must be array')
-
- for (let job of jobs) {
- validateObjectSchema(job, [
- // name // type // required
- ['id', 'i', true],
- ['target', 's', true],
- ])
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
+ // validate input
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
- // run jobs on workers
- const jobsData = await workers.runManual(jobs)
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['target', 's', true],
+ ])
+ }
- // send result to the client
- connection.send(
- new ResponseMessage(requestNo)
- .setData(jobsData)
- )
+ // run jobs, wait for results and send a response
+ return await workers.runManual(jobs)
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
*/
-function onPause(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+function onPause(data) {
+ const targets = validateInputTargets(data, null)
workers.pauseTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
@@ -250,66 +212,8 @@ function onPause(data, requestNo, connection) {
* @param {Connection} connection
*/
function onContinue(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+ const targets = validateInputTargets(data, null)
workers.continueTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
-}
-
-
-/**
- * @private
- * @param data
- * @param requestNo
- * @param connection
- * @return {null|boolean|string[]}
- */
-function validateInputTargets(data, requestNo, connection) {
- // null means all targets
- let targets = null
-
- if (data.targets !== undefined) {
- targets = data.targets
-
- // validate data
- try {
- validateTargetsListFormat(targets)
-
- // note: we don't check target names here
- // as in jobd
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return false
- }
- }
-
- return targets
+ return 'ok'
}
-
-function usage() {
- let s = `${process.argv[1]} OPTIONS
-
-Options:
- --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
- --help Show this help.
- --version Print version.`
-
- console.log(s)
-}
-
-async function term() {
- if (logger)
- logger.info('shutdown')
-
- await loggerModule.shutdown()
- process.exit()
-}
diff --git a/src/jobd.js b/src/jobd.js
index e1331d1..bb912fc 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -5,8 +5,11 @@ const config = require('./lib/config')
const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
-const {validateObjectSchema, validateTargetsListFormat} = require('./lib/data-validator')
-const RequestHandler = require('./lib/request-handler')
+const {
+ validateInputTargetAndConcurrency,
+ validateInputTargets
+} = require('./lib/data-validator')
+const {RequestHandler} = require('./lib/request-handler')
const {
Server,
Connection,
@@ -139,6 +142,8 @@ function initRequestHandler() {
requestHandler.set('run-manual', onRunManual)
requestHandler.set('pause', onPause)
requestHandler.set('continue', onContinue)
+ requestHandler.set('add-target', onAddTarget)
+ requestHandler.set('remove-target', onRemoveTarget)
requestHandler.set('set-target-concurrency', onSetTargetConcurrency)
}
@@ -162,69 +167,119 @@ async function initDatabase() {
logger.info('db initialized')
}
+function connectToMaster() {
+ const port = config.get('master_port')
+ const host = config.get('master_host')
+
+ if (!host || !port) {
+ logger.debug('connectToMaster: master host or port is not defined')
+ return
+ }
+
+ async function connect() {
+ const connection = new Connection()
+ await connection.connect(host, port)
+
+ try {
+ let response = await connection.sendRequest(
+ new RequestMessage('register-worker', {
+ targets: worker.getTargets()
+ })
+ )
+ logger.debug('connectToMaster: response:', response)
+ } catch (error) {
+ logger.error('connectToMaster: error while awaiting response:', error)
+ }
+
+ connection.on('close', () => {
+ logger.warn(`connectToMaster: connection closed`)
+ tryToConnect()
+ })
+
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
+ }
+
+ function tryToConnect(now = false) {
+ setTimeout(() => {
+ connect().catch(error => {
+ logger.warn(`connectToMaster: connection failed`, error)
+ })
+ }, now ? 0 : config.get('master_reconnect_timeout') * 1000)
+ }
+
+ tryToConnect(true)
+}
+
+function usage() {
+ let s = `${process.argv[1]} OPTIONS
+
+Options:
+ --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
+ --help Show this help.
+ --version Print version.`
+
+ console.log(s)
+}
+
+async function term() {
+ if (logger)
+ logger.info('shutdown')
+
+ await loggerModule.shutdown()
+ process.exit()
+}
+
+
+
+/****************************************/
+/** **/
+/** Request handlers **/
+/** **/
+/****************************************/
+
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @return {Promise<string>}
*/
-function onPollRequest(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
+async function onPollRequest(data) {
+ let targets = validateInputTargets(data, worker)
worker.setPollTargets(targets)
worker.poll()
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
* @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @return {Promise<object>}
*/
-function onStatus(data, requestNo, connection) {
- connection.send(
- new ResponseMessage(requestNo)
- .setData({
- targets: worker.getStatus(),
- jobPromisesCount: Object.keys(jobPromises).length,
- memoryUsage: process.memoryUsage()
- })
- )
+async function onStatus(data) {
+ return {
+ targets: worker.getStatus(),
+ jobPromisesCount: Object.keys(jobPromises).length,
+ memoryUsage: process.memoryUsage()
+ }
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
- * @return {Promise<void>}
+ * @param {{ids: number[]}} data
+ * @return {Promise}
*/
-async function onRunManual(data, requestNo, connection) {
+async function onRunManual(data) {
let {ids: jobIds} = data
jobIds = uniq(jobIds)
- // if at least one of the jobs is already being run, reject
- // if at least one item is not a number, reject
for (const id of jobIds) {
- if (typeof id !== 'number') {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(`all ids must be numbers, got ${typeof id}`)
- )
- return
- }
+ // if at least one item is not a number, reject
+ if (typeof id !== 'number')
+ throw new Error(`all ids must be numbers, got ${typeof id}`)
- if (id in jobPromises) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(`another client is already waiting for job ${id}`)
- )
- return
- }
+ // if at least one of the jobs is already being run, reject
+ if (id in jobPromises)
+ throw new Error(`another client is already waiting for job ${id}`)
}
// create a bunch of promises, one per job
@@ -239,7 +294,7 @@ async function onRunManual(data, requestNo, connection) {
const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
// wait till all jobs are done (or failed), then send a response
- Promise.allSettled(promises).then(results => {
+ const P = Promise.allSettled(promises).then(results => {
const response = {}
for (let i = 0; i < results.length; i++) {
@@ -262,10 +317,7 @@ async function onRunManual(data, requestNo, connection) {
}
}
- connection.send(
- new ResponseMessage(requestNo)
- .setData(response)
- )
+ return response
})
// reject all ignored / non-found jobs
@@ -286,33 +338,25 @@ async function onRunManual(data, requestNo, connection) {
P.reject(new Error(`job ${id} not found`))
}
}
+
+ return P
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{targets: string[]}} data
*/
-function onPause(data, requestNo, connection) {
- let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
- return
-
+async function onPause(data) {
+ let targets = validateInputTargets(data, worker)
worker.pauseTargets(targets)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{targets: string[]}} data
*/
-function onContinue(data, requestNo, connection) {
+async function onContinue(data) {
let targets
- if ((targets = validateInputTargets(data, requestNo, connection)) === false)
+ if ((targets = validateInputTargets(data, worker)) === false)
return
// continue queues
@@ -321,137 +365,32 @@ function onContinue(data, requestNo, connection) {
// poll just in case
worker.poll()
- // ok
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+ return 'ok'
}
/**
- * @param {object} data
- * @param {number} requestNo
- * @param {Connection} connection
+ * @param {{target: string, concurrency: int}} data
*/
-function onSetTargetConcurrency(data, requestNo, connection) {
- try {
- validateObjectSchema(data, [
- // name // type // required
- ['concurrency', 'i', true],
- ['target', 's', true],
- ])
-
- if (data.concurrency <= 0)
- throw new Error('Invalid concurrency value.')
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return
- }
-
- worker.setTargetConcurrency(data.target, data.concurrency)
- connection.send(
- new ResponseMessage(requestNo)
- .setData('ok')
- )
+async function onAddTarget(data) {
+ validateInputTargetAndConcurrency(data)
+ worker.addTarget(data.target, data.concurrency)
+ return 'ok'
}
/**
- * @private
- * @param data
- * @param requestNo
- * @param connection
- * @return {null|boolean|string[]}
+ * @param {{target: string}} data
*/
-function validateInputTargets(data, requestNo, connection) {
- // null means all targets
- let targets = null
-
- if (data.targets !== undefined) {
- targets = data.targets
-
- // validate data
- try {
- validateTargetsListFormat(targets)
-
- for (const t of targets) {
- if (!worker.hasTarget(t))
- throw new Error(`invalid target '${t}'`)
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(requestNo)
- .setError(e.message)
- )
- return false
- }
- }
-
- return targets
+async function onRemoveTarget(data) {
+ validateInputTargetAndConcurrency(data, true)
+ worker.removeTarget(data.target)
+ return 'ok'
}
-function connectToMaster() {
- const port = config.get('master_port')
- const host = config.get('master_host')
-
- if (!host || !port) {
- logger.debug('connectToMaster: master host or port is not defined')
- return
- }
-
- async function connect() {
- const connection = new Connection()
- await connection.connect(host, port)
-
- try {
- let response = await connection.sendRequest(
- new RequestMessage('register-worker', {
- targets: worker.getTargets()
- })
- )
- logger.debug('connectToMaster: response:', response)
- } catch (error) {
- logger.error('connectToMaster: error while awaiting response:', error)
- }
-
- connection.on('close', () => {
- logger.warn(`connectToMaster: connection closed`)
- tryToConnect()
- })
-
- connection.on('request-message', (message, connection) => {
- requestHandler.process(message, connection)
- })
- }
-
- function tryToConnect(now = false) {
- setTimeout(() => {
- connect().catch(error => {
- logger.warn(`connectToMaster: connection failed`, error)
- })
- }, now ? 0 : config.get('master_reconnect_timeout') * 1000)
- }
-
- tryToConnect(true)
-}
-
-function usage() {
- let s = `${process.argv[1]} OPTIONS
-
-Options:
- --config <path> Path to config. Default: ${DEFAULT_CONFIG_PATH}
- --help Show this help.
- --version Print version.`
-
- console.log(s)
-}
-
-async function term() {
- if (logger)
- logger.info('shutdown')
-
- await loggerModule.shutdown()
- process.exit()
-}
+/**
+ * @param {object} data
+ */
+async function onSetTargetConcurrency(data) {
+ validateInputTargetAndConcurrency(data)
+ worker.setTargetConcurrency(data.target, data.concurrency)
+ return 'ok'
+} \ No newline at end of file
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 7419b34..74827c1 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -11,6 +11,11 @@ const typeNames = {
const logger = getLogger('data-validator')
+
+/**************************************/
+/** Common Functions **/
+/**************************************/
+
/**
* @param {string} expectedType
* @param value
@@ -69,7 +74,12 @@ function validateObjectSchema(data, schema) {
}
}
-function validateTargetsListFormat(targets) {
+
+/********************************************/
+/** Request input data validators */
+/********************************************/
+
+function validateInputTargetsListFormat(targets) {
if (!Array.isArray(targets))
throw new Error('targets must be array')
@@ -83,7 +93,51 @@ function validateTargetsListFormat(targets) {
}
}
+function validateInputTargetAndConcurrency(data, onlyTarget = false) {
+ const schema = [
+ ['target', 's', true],
+ ]
+
+ if (!onlyTarget) {
+ schema.push(
+ ['concurrency', 'i', true]
+ )
+ }
+
+ validateObjectSchema(data, schema)
+
+ if (!onlyTarget && data.concurrency <= 0)
+ throw new Error('Invalid concurrency value.')
+}
+
+/**
+ * @param data
+ * @param {Worker|null} worker
+ * @return {null|string[]}
+ */
+function validateInputTargets(data, worker) {
+ // null means all targets
+ let targets = null
+
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ validateInputTargetsListFormat(targets)
+
+ if (worker !== null) {
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
+ }
+ }
+ }
+
+ return targets
+}
+
module.exports = {
validateObjectSchema,
- validateTargetsListFormat
+ validateInputTargetsListFormat,
+ validateInputTargetAndConcurrency,
+ validateInputTargets,
} \ No newline at end of file
diff --git a/src/lib/logger.js b/src/lib/logger.js
index f886e0c..b71020c 100644
--- a/src/lib/logger.js
+++ b/src/lib/logger.js
@@ -101,10 +101,9 @@ module.exports = {
},
/**
- * @param cb
* @return {Promise}
*/
- shutdown(cb) {
+ shutdown() {
return new Promise((resolve, reject) => {
log4js.shutdown(resolve)
})
diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js
index 4330b6b..e7f9fe2 100644
--- a/src/lib/request-handler.js
+++ b/src/lib/request-handler.js
@@ -35,14 +35,17 @@ class RequestHandler {
if (this.handlers.has(message.requestType)) {
const f = this.handlers.get(message.requestType)
- const result = f(message.requestData || {}, message.requestNo, connection)
+ const result = f(message.requestData || {}, connection)
if (result instanceof Promise) {
- result.catch(error => {
- this.logger.error(`${message.requestType}:`, error)
-
+ result.then(data => {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData(data)
+ )
+ }).catch(error => {
connection.send(
new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ .setError(error?.message)
)
})
}
@@ -56,4 +59,6 @@ class RequestHandler {
}
-module.exports = RequestHandler \ No newline at end of file
+module.exports = {
+ RequestHandler
+} \ No newline at end of file
diff --git a/src/lib/worker.js b/src/lib/worker.js
index e53d03f..0be6a19 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -73,6 +73,25 @@ class Worker extends EventEmitter {
}
/**
+ * Deletes a queue.
+ *
+ * @param {string} target
+ */
+ removeTarget(target) {
+ if (!(target in this.targets))
+ throw new Error(`target '${target}' not found`)
+
+ const {queue} = this.targets[target]
+ if (queue.length > 0)
+ throw new Error(`queue is not empty`)
+
+ this.logger.debug(`deleteTarget: deleting target' ${target}'`)
+ queue.removeAllListeners()
+ queue.end()
+ delete this.targets[target]
+ }
+
+ /**
* @param {string} target
* @param {number} concurrency
*/