summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsrc/jobd-master.js254
-rwxr-xr-xsrc/jobd.js354
-rw-r--r--src/lib/request-handler.js59
-rw-r--r--src/lib/server.js4
4 files changed, 384 insertions, 287 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 8628764..f603b92 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -2,9 +2,10 @@
const minimist = require('minimist')
const loggerModule = require('./lib/logger')
const config = require('./lib/config')
-const {Server, ResponseMessage, RequestMessage} = require('./lib/server')
+const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator')
+const RequestHandler = require('./lib/request-handler')
const package_json = require('../package.json')
/**
@@ -22,6 +23,11 @@ let server
*/
let workers
+/**
+ * @type {RequestHandler}
+ */
+let requestHandler
+
main().catch(e => {
console.error(e)
@@ -30,6 +36,13 @@ main().catch(e => {
async function main() {
+ await initApp('jobd-master')
+ initWorkers()
+ initRequestHandler()
+ initServer()
+}
+
+async function initApp(appName) {
if (process.argv.length < 3) {
usage()
process.exit(0)
@@ -66,140 +79,149 @@ async function main() {
levelFile: config.get('log_level_file'),
levelConsole: config.get('log_level_console'),
})
- logger = loggerModule.getLogger('jobd-master')
+ logger = loggerModule.getLogger(appName)
- workers = new WorkersList()
+ process.title = appName
+}
- // start server
+function initServer() {
server = new Server()
server.on('new-connection', (connection) => {
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
})
server.start(config.get('port'), config.get('host'))
- logger.info('server started')
+}
+
+function initWorkers() {
+ workers = new WorkersList()
+}
+
+function initRequestHandler() {
+ requestHandler = new RequestHandler()
+ requestHandler.set('poke', onPoke)
+ requestHandler.set('register-worker', onRegisterWorker)
+ requestHandler.set('status', onStatus)
+ requestHandler.set('run-manual', onRunManual)
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onRegisterWorker(data, requestNo, connection) {
+ const targets = data.targets || []
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ // register worker and reply with OK
+ workers.add(connection, targets)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
}
/**
- * @param {RequestMessage|ResponseMessage} message
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onPoke(data, requestNo, connection) {
+ const targets = data.targets || []
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ // poke workers
+ workers.poke(targets)
+
+ // reply to user
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onRequestMessage(message, connection) {
+async function onStatus(data, requestNo, connection) {
+ const info = await workers.getInfo(data.poll_workers || false)
+
+ let status = {
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ }
+
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(status)
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ * @return {Promise<*>}
+ */
+async function onRunManual(data, requestNo, connection) {
+ const {jobs} = data
+
+ // validate data
try {
- logger.info('onMessage:', message)
-
- switch (message.requestType) {
- case 'register-worker': {
- const targets = message.requestData?.targets || []
-
- // validate data
- try {
- validateTargetsList(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- workers.add(connection, targets)
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
-
- case 'poke': {
- const targets = message.requestData?.targets || []
-
- // validate data
- try {
- validateTargetsList(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- // poke workers
- workers.poke(targets)
-
- // reply to user
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
-
- case 'status':
- const info = await workers.getInfo(message.requestData?.poll_workers || false)
-
- let status = {
- workers: info,
- memoryUsage: process.memoryUsage()
- }
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(status)
- )
-
- break
-
- case 'run-manual':
- const jobs = message.requestData.jobs
-
- // validate data
- try {
- if (!Array.isArray(jobs))
- throw new Error('jobs must be array')
-
- for (let job of jobs) {
- validateObjectSchema(job, [
- // name // type // required
- ['id', 'i', true],
- ['target', 's', true],
- ])
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- // run jobs on workers
- const data = await workers.runManual(jobs)
-
- // send result to the client
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(data)
- )
- break
-
- default:
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`unknown request type: '${message.requestType}'`)
- )
- break
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
+
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['target', 's', true],
+ ])
}
- } catch (error) {
- logger.error(`error while handling message:`, message, error)
+ } catch (e) {
connection.send(
- new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ new ResponseMessage(requestNo)
+ .setError(e.message)
)
+ return
}
+
+ // run jobs on workers
+ const jobsData = await workers.runManual(jobs)
+
+ // send result to the client
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(jobsData)
+ )
}
+
function usage() {
let s = `${process.argv[1]} OPTIONS
diff --git a/src/jobd.js b/src/jobd.js
index 4605137..0b980b7 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -6,6 +6,7 @@ const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
const {validateTargetsList} = require('./lib/data-validator')
+const RequestHandler = require('./lib/request-handler')
const {
Server,
Connection,
@@ -37,6 +38,11 @@ let logger
let server
/**
+ * @type {RequestHandler}
+ */
+let requestHandler
+
+/**
* @type {object.<string, Promise>}
*/
let jobPromises = {}
@@ -49,6 +55,15 @@ main().catch(e => {
async function main() {
+ await initApp('jobd')
+ await initDatabase()
+ initWorker()
+ initRequestHandler()
+ initServer()
+ connectToMaster()
+}
+
+async function initApp(appName) {
if (process.argv.length < 3) {
usage()
process.exit(0)
@@ -85,18 +100,12 @@ async function main() {
levelFile: config.get('log_level_file'),
levelConsole: config.get('log_level_console'),
})
- logger = loggerModule.getLogger('jobd')
+ logger = loggerModule.getLogger(appName)
- // init database
- try {
- await db.init()
- } catch (error) {
- logger.error('failed to connect to MySQL', error)
- process.exit(1)
- }
- logger.info('db initialized')
+ process.title = appName
+}
- // init queue
+function initWorker() {
worker = new Worker()
for (let targetName in config.get('targets')) {
let slots = config.get('targets')[targetName].slots
@@ -119,186 +128,192 @@ async function main() {
logger.warn(`job-done: jobPromises[${data.id}] is undefined`)
}
})
- logger.info('queue initialized')
+}
- // start server
+function initRequestHandler() {
+ requestHandler = new RequestHandler()
+ requestHandler.set('poll', onPollRequest)
+ requestHandler.set('status', onStatus)
+ requestHandler.set('run-manual', onRunManual)
+}
+
+function initServer() {
server = new Server()
server.on('new-connection', (connection) => {
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
})
server.start(config.get('port'), config.get('host'))
- logger.info('server started')
-
- // connect to master
- if (config.get('master_port') && config.get('master_host'))
- connectToMaster()
}
+async function initDatabase() {
+ try {
+ await db.init()
+ } catch (error) {
+ logger.error('failed to connect to MySQL', error)
+ process.exit(1)
+ }
+ logger.info('db initialized')
+}
/**
- * @param {RequestMessage|ResponseMessage} message
+ * @param {object} data
+ * @param {number} requestNo
* @param {Connection} connection
- * @return {Promise<*>}
*/
-async function onRequestMessage(message, connection) {
- try {
- logger.info('onMessage:', message)
-
- switch (message.requestType) {
- case 'poll': {
- // null means all
- let targets = null
-
- if (message.requestData?.targets !== undefined) {
- targets = message.requestData?.targets
-
- // validate data
- try {
- validateTargetsList(targets)
-
- for (const t of targets) {
- if (!worker.hasTarget(t))
- throw new Error(`invalid target '${t}'`)
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
- }
-
- worker.setPollTargets(targets)
- worker.poll()
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
+function onPollRequest(data, requestNo, connection) {
+ // null means all targets
+ let targets = null
- case 'status': {
- const qs = worker.getStatus()
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData({
- queue: qs,
- jobDoneAwaitersCount: Object.keys(jobPromises).length,
- memoryUsage: process.memoryUsage()
- })
- )
- break
- }
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ // validate data
+ try {
+ validateTargetsList(targets)
- case 'run-manual': {
- let {ids: jobIds} = message.requestData
- jobIds = uniq(jobIds)
-
- // if at least one of the jobs is already being run, reject
- // if at least one item is not a number, reject
- for (const id of jobIds) {
- if (typeof id !== 'number') {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`all ids must be numbers, got ${typeof id}`)
- )
- return
- }
-
- if (id in jobPromises) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`another client is already waiting for job ${id}`)
- )
- return
- }
- }
-
- // create a bunch of promises, one per job
- let promises = []
- for (const id of jobIds) {
- const P = createCallablePromise()
- jobPromises[id] = P
- promises.push(P)
- }
-
- // get jobs from database and enqueue for execution
- const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
-
- // wait till all jobs are done (or failed), then send a response
- Promise.allSettled(promises).then(results => {
- const response = {}
-
- for (let i = 0; i < results.length; i++) {
- let jobId = jobIds[i]
- let result = results[i]
-
- if (result.status === 'fulfilled') {
- if (!('jobs' in response))
- response.jobs = {}
-
- if (result.value?.id !== undefined)
- delete result.value.id
-
- response.jobs[jobId] = result.value
- } else if (result.status === 'rejected') {
- if (!('errors' in response))
- response.errors = {}
-
- response.errors[jobId] = result.reason?.message
- }
- }
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(response)
- )
- })
-
- // reject all ignored / non-found jobs
- for (const [id, value] of results.entries()) {
- if (!(id in jobPromises)) {
- this.logger.error(`run-manual: ${id} not found in jobPromises`)
- continue
- }
-
- if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
- const P = jobPromises[id]
- delete jobPromises[id]
-
- if (value.result === JOB_IGNORED)
- P.reject(new Error(value.reason))
-
- else if (value.result === JOB_NOTFOUND)
- P.reject(new Error(`job ${id} not found`))
- }
- }
-
- break
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
}
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+ }
+
+ worker.setPollTargets(targets)
+ worker.poll()
- default:
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`unknown request type: '${message.requestType}'`)
- )
- break
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onStatus(data, requestNo, connection) {
+ const qs = worker.getStatus()
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData({
+ queue: qs,
+ jobPromisesCount: Object.keys(jobPromises).length,
+ memoryUsage: process.memoryUsage()
+ })
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ * @return {Promise<void>}
+ */
+async function onRunManual(data, requestNo, connection) {
+ let {ids: jobIds} = data
+ jobIds = uniq(jobIds)
+
+ // if at least one of the jobs is already being run, reject
+ // if at least one item is not a number, reject
+ for (const id of jobIds) {
+ if (typeof id !== 'number') {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(`all ids must be numbers, got ${typeof id}`)
+ )
+ return
}
- } catch (error) {
- logger.error(`error while handling message:`, message, error)
+
+ if (id in jobPromises) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(`another client is already waiting for job ${id}`)
+ )
+ return
+ }
+ }
+
+ // create a bunch of promises, one per job
+ let promises = []
+ for (const id of jobIds) {
+ const P = createCallablePromise()
+ jobPromises[id] = P
+ promises.push(P)
+ }
+
+ // get jobs from database and enqueue for execution
+ const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
+
+ // wait till all jobs are done (or failed), then send a response
+ Promise.allSettled(promises).then(results => {
+ const response = {}
+
+ for (let i = 0; i < results.length; i++) {
+ let jobId = jobIds[i]
+ let result = results[i]
+
+ if (result.status === 'fulfilled') {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (result.value?.id !== undefined)
+ delete result.value.id
+
+ response.jobs[jobId] = result.value
+ } else if (result.status === 'rejected') {
+ if (!('errors' in response))
+ response.errors = {}
+
+ response.errors[jobId] = result.reason?.message
+ }
+ }
+
connection.send(
- new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ new ResponseMessage(requestNo)
+ .setData(response)
)
+ })
+
+ // reject all ignored / non-found jobs
+ for (const [id, value] of results.entries()) {
+ if (!(id in jobPromises)) {
+ this.logger.error(`run-manual: ${id} not found in jobPromises`)
+ continue
+ }
+
+ if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
+ const P = jobPromises[id]
+ delete jobPromises[id]
+
+ if (value.result === JOB_IGNORED)
+ P.reject(new Error(value.reason))
+
+ else if (value.result === JOB_NOTFOUND)
+ P.reject(new Error(`job ${id} not found`))
+ }
}
}
-
function connectToMaster() {
+ const port = config.get('master_port')
+ const host = config.get('master_host')
+
+ if (!host || port) {
+ logger.debug('connectToMaster: master host or port is not defined')
+ return
+ }
+
const connection = new Connection()
- connection.connect(config.get('master_host'), config.get('master_port'))
+ connection.connect(host, port)
connection.on('connect', function() {
connection.sendRequest(
@@ -321,10 +336,11 @@ function connectToMaster() {
}, config.get('master_reconnect_timeout') * 1000)
})
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
}
-
function usage() {
let s = `${process.argv[1]} OPTIONS
diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js
new file mode 100644
index 0000000..4ab06fb
--- /dev/null
+++ b/src/lib/request-handler.js
@@ -0,0 +1,59 @@
+const {getLogger} = require('./logger')
+const {ResponseMessage} = require('./server')
+
+class RequestHandler {
+
+ constructor() {
+ /**
+ * @type {Map<string, Function>}
+ */
+ this.handlers = new Map()
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('RequestHandler')
+ }
+
+ /**
+ * @param {string} requestType
+ * @param {Function} handler
+ */
+ set(requestType, handler) {
+ if (this.handlers.has(requestType))
+ throw new Error(`handler for '${requestType}' has already been set`)
+
+ this.handlers.set(requestType, handler)
+ }
+
+ /**
+ * @param {RequestMessage} message
+ * @param {Connection} connection
+ */
+ process(message, connection) {
+ this.logger.info('process:', message)
+
+ if (this.handlers.has(message.requestType)) {
+ const f = this.handlers.get(message.requestType)
+ const result = f(message.requestData || {}, message.requestNo, connection)
+ if (result instanceof Promise) {
+ result.catch(error => {
+ this.logger.error(`${message.requestType}:`, error)
+
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('server error: ' + error?.message)
+ )
+ })
+ }
+ } else {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`unknown request type: '${message.requestType}'`)
+ )
+ }
+ }
+
+}
+
+module.exports = RequestHandler \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 81c2c84..1d923a8 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -50,7 +50,7 @@ class ResponseMessage extends Message {
this.error = null
/**
- * @type {null|string|number|object|array}
+ * @type {null|object}
*/
this.data = null
}
@@ -108,7 +108,7 @@ class RequestMessage extends Message {
this.requestType = type
/**
- * @type any
+ * @type {null|string|number|object|array}
*/
this.requestData = data