aboutsummaryrefslogtreecommitdiff
path: root/src/jobd-master.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/jobd-master.js')
-rwxr-xr-xsrc/jobd-master.js254
1 files changed, 138 insertions, 116 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 8628764..f603b92 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -2,9 +2,10 @@
const minimist = require('minimist')
const loggerModule = require('./lib/logger')
const config = require('./lib/config')
-const {Server, ResponseMessage, RequestMessage} = require('./lib/server')
+const {Server, ResponseMessage} = require('./lib/server')
const WorkersList = require('./lib/workers-list')
const {validateObjectSchema, validateTargetsList} = require('./lib/data-validator')
+const RequestHandler = require('./lib/request-handler')
const package_json = require('../package.json')
/**
@@ -22,6 +23,11 @@ let server
*/
let workers
+/**
+ * @type {RequestHandler}
+ */
+let requestHandler
+
main().catch(e => {
console.error(e)
@@ -30,6 +36,13 @@ main().catch(e => {
async function main() {
+ await initApp('jobd-master')
+ initWorkers()
+ initRequestHandler()
+ initServer()
+}
+
+async function initApp(appName) {
if (process.argv.length < 3) {
usage()
process.exit(0)
@@ -66,140 +79,149 @@ async function main() {
levelFile: config.get('log_level_file'),
levelConsole: config.get('log_level_console'),
})
- logger = loggerModule.getLogger('jobd-master')
+ logger = loggerModule.getLogger(appName)
- workers = new WorkersList()
+ process.title = appName
+}
- // start server
+function initServer() {
server = new Server()
server.on('new-connection', (connection) => {
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
})
server.start(config.get('port'), config.get('host'))
- logger.info('server started')
+}
+
+function initWorkers() {
+ workers = new WorkersList()
+}
+
+function initRequestHandler() {
+ requestHandler = new RequestHandler()
+ requestHandler.set('poke', onPoke)
+ requestHandler.set('register-worker', onRegisterWorker)
+ requestHandler.set('status', onStatus)
+ requestHandler.set('run-manual', onRunManual)
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onRegisterWorker(data, requestNo, connection) {
+ const targets = data.targets || []
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ // register worker and reply with OK
+ workers.add(connection, targets)
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
}
/**
- * @param {RequestMessage|ResponseMessage} message
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onPoke(data, requestNo, connection) {
+ const targets = data.targets || []
+
+ // validate data
+ try {
+ validateTargetsList(targets)
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+
+ // poke workers
+ workers.poke(targets)
+
+ // reply to user
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onRequestMessage(message, connection) {
+async function onStatus(data, requestNo, connection) {
+ const info = await workers.getInfo(data.poll_workers || false)
+
+ let status = {
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ }
+
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(status)
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ * @return {Promise<*>}
+ */
+async function onRunManual(data, requestNo, connection) {
+ const {jobs} = data
+
+ // validate data
try {
- logger.info('onMessage:', message)
-
- switch (message.requestType) {
- case 'register-worker': {
- const targets = message.requestData?.targets || []
-
- // validate data
- try {
- validateTargetsList(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- workers.add(connection, targets)
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
-
- case 'poke': {
- const targets = message.requestData?.targets || []
-
- // validate data
- try {
- validateTargetsList(targets)
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- // poke workers
- workers.poke(targets)
-
- // reply to user
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
-
- case 'status':
- const info = await workers.getInfo(message.requestData?.poll_workers || false)
-
- let status = {
- workers: info,
- memoryUsage: process.memoryUsage()
- }
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(status)
- )
-
- break
-
- case 'run-manual':
- const jobs = message.requestData.jobs
-
- // validate data
- try {
- if (!Array.isArray(jobs))
- throw new Error('jobs must be array')
-
- for (let job of jobs) {
- validateObjectSchema(job, [
- // name // type // required
- ['id', 'i', true],
- ['target', 's', true],
- ])
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
-
- // run jobs on workers
- const data = await workers.runManual(jobs)
-
- // send result to the client
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(data)
- )
- break
-
- default:
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`unknown request type: '${message.requestType}'`)
- )
- break
+ if (!Array.isArray(jobs))
+ throw new Error('jobs must be array')
+
+ for (let job of jobs) {
+ validateObjectSchema(job, [
+ // name // type // required
+ ['id', 'i', true],
+ ['target', 's', true],
+ ])
}
- } catch (error) {
- logger.error(`error while handling message:`, message, error)
+ } catch (e) {
connection.send(
- new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ new ResponseMessage(requestNo)
+ .setError(e.message)
)
+ return
}
+
+ // run jobs on workers
+ const jobsData = await workers.runManual(jobs)
+
+ // send result to the client
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(jobsData)
+ )
}
+
function usage() {
let s = `${process.argv[1]} OPTIONS