aboutsummaryrefslogtreecommitdiff
path: root/src/jobd.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-03 01:05:25 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-03 01:05:25 +0300
commit4cc2e3029fedca4f48f4f112282a7e381b6292e4 (patch)
tree27f18feaabb6fd649e04f2a36898166db0e5f91a /src/jobd.js
parent04b28698f2267f324354abb44c4508ac2c5c1332 (diff)
refactor code and set process.title
Diffstat (limited to 'src/jobd.js')
-rwxr-xr-xsrc/jobd.js354
1 files changed, 185 insertions, 169 deletions
diff --git a/src/jobd.js b/src/jobd.js
index 4605137..0b980b7 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -6,6 +6,7 @@ const db = require('./lib/db')
const {uniq} = require('lodash')
const {createCallablePromise} = require('./lib/util')
const {validateTargetsList} = require('./lib/data-validator')
+const RequestHandler = require('./lib/request-handler')
const {
Server,
Connection,
@@ -37,6 +38,11 @@ let logger
let server
/**
+ * @type {RequestHandler}
+ */
+let requestHandler
+
+/**
* @type {object.<string, Promise>}
*/
let jobPromises = {}
@@ -49,6 +55,15 @@ main().catch(e => {
async function main() {
+ await initApp('jobd')
+ await initDatabase()
+ initWorker()
+ initRequestHandler()
+ initServer()
+ connectToMaster()
+}
+
+async function initApp(appName) {
if (process.argv.length < 3) {
usage()
process.exit(0)
@@ -85,18 +100,12 @@ async function main() {
levelFile: config.get('log_level_file'),
levelConsole: config.get('log_level_console'),
})
- logger = loggerModule.getLogger('jobd')
+ logger = loggerModule.getLogger(appName)
- // init database
- try {
- await db.init()
- } catch (error) {
- logger.error('failed to connect to MySQL', error)
- process.exit(1)
- }
- logger.info('db initialized')
+ process.title = appName
+}
- // init queue
+function initWorker() {
worker = new Worker()
for (let targetName in config.get('targets')) {
let slots = config.get('targets')[targetName].slots
@@ -119,186 +128,192 @@ async function main() {
logger.warn(`job-done: jobPromises[${data.id}] is undefined`)
}
})
- logger.info('queue initialized')
+}
- // start server
+function initRequestHandler() {
+ requestHandler = new RequestHandler()
+ requestHandler.set('poll', onPollRequest)
+ requestHandler.set('status', onStatus)
+ requestHandler.set('run-manual', onRunManual)
+}
+
+function initServer() {
server = new Server()
server.on('new-connection', (connection) => {
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
})
server.start(config.get('port'), config.get('host'))
- logger.info('server started')
-
- // connect to master
- if (config.get('master_port') && config.get('master_host'))
- connectToMaster()
}
+async function initDatabase() {
+ try {
+ await db.init()
+ } catch (error) {
+ logger.error('failed to connect to MySQL', error)
+ process.exit(1)
+ }
+ logger.info('db initialized')
+}
/**
- * @param {RequestMessage|ResponseMessage} message
+ * @param {object} data
+ * @param {number} requestNo
* @param {Connection} connection
- * @return {Promise<*>}
*/
-async function onRequestMessage(message, connection) {
- try {
- logger.info('onMessage:', message)
-
- switch (message.requestType) {
- case 'poll': {
- // null means all
- let targets = null
-
- if (message.requestData?.targets !== undefined) {
- targets = message.requestData?.targets
-
- // validate data
- try {
- validateTargetsList(targets)
-
- for (const t of targets) {
- if (!worker.hasTarget(t))
- throw new Error(`invalid target '${t}'`)
- }
- } catch (e) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(e.message)
- )
- return
- }
- }
-
- worker.setPollTargets(targets)
- worker.poll()
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData('ok')
- )
- break
- }
+function onPollRequest(data, requestNo, connection) {
+ // null means all targets
+ let targets = null
- case 'status': {
- const qs = worker.getStatus()
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData({
- queue: qs,
- jobDoneAwaitersCount: Object.keys(jobPromises).length,
- memoryUsage: process.memoryUsage()
- })
- )
- break
- }
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ // validate data
+ try {
+ validateTargetsList(targets)
- case 'run-manual': {
- let {ids: jobIds} = message.requestData
- jobIds = uniq(jobIds)
-
- // if at least one of the jobs is already being run, reject
- // if at least one item is not a number, reject
- for (const id of jobIds) {
- if (typeof id !== 'number') {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`all ids must be numbers, got ${typeof id}`)
- )
- return
- }
-
- if (id in jobPromises) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`another client is already waiting for job ${id}`)
- )
- return
- }
- }
-
- // create a bunch of promises, one per job
- let promises = []
- for (const id of jobIds) {
- const P = createCallablePromise()
- jobPromises[id] = P
- promises.push(P)
- }
-
- // get jobs from database and enqueue for execution
- const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
-
- // wait till all jobs are done (or failed), then send a response
- Promise.allSettled(promises).then(results => {
- const response = {}
-
- for (let i = 0; i < results.length; i++) {
- let jobId = jobIds[i]
- let result = results[i]
-
- if (result.status === 'fulfilled') {
- if (!('jobs' in response))
- response.jobs = {}
-
- if (result.value?.id !== undefined)
- delete result.value.id
-
- response.jobs[jobId] = result.value
- } else if (result.status === 'rejected') {
- if (!('errors' in response))
- response.errors = {}
-
- response.errors[jobId] = result.reason?.message
- }
- }
-
- connection.send(
- new ResponseMessage(message.requestNo)
- .setData(response)
- )
- })
-
- // reject all ignored / non-found jobs
- for (const [id, value] of results.entries()) {
- if (!(id in jobPromises)) {
- this.logger.error(`run-manual: ${id} not found in jobPromises`)
- continue
- }
-
- if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
- const P = jobPromises[id]
- delete jobPromises[id]
-
- if (value.result === JOB_IGNORED)
- P.reject(new Error(value.reason))
-
- else if (value.result === JOB_NOTFOUND)
- P.reject(new Error(`job ${id} not found`))
- }
- }
-
- break
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
}
+ } catch (e) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(e.message)
+ )
+ return
+ }
+ }
+
+ worker.setPollTargets(targets)
+ worker.poll()
- default:
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError(`unknown request type: '${message.requestType}'`)
- )
- break
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData('ok')
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ */
+function onStatus(data, requestNo, connection) {
+ const qs = worker.getStatus()
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData({
+ queue: qs,
+ jobPromisesCount: Object.keys(jobPromises).length,
+ memoryUsage: process.memoryUsage()
+ })
+ )
+}
+
+/**
+ * @param {object} data
+ * @param {number} requestNo
+ * @param {Connection} connection
+ * @return {Promise<void>}
+ */
+async function onRunManual(data, requestNo, connection) {
+ let {ids: jobIds} = data
+ jobIds = uniq(jobIds)
+
+ // if at least one of the jobs is already being run, reject
+ // if at least one item is not a number, reject
+ for (const id of jobIds) {
+ if (typeof id !== 'number') {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(`all ids must be numbers, got ${typeof id}`)
+ )
+ return
}
- } catch (error) {
- logger.error(`error while handling message:`, message, error)
+
+ if (id in jobPromises) {
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setError(`another client is already waiting for job ${id}`)
+ )
+ return
+ }
+ }
+
+ // create a bunch of promises, one per job
+ let promises = []
+ for (const id of jobIds) {
+ const P = createCallablePromise()
+ jobPromises[id] = P
+ promises.push(P)
+ }
+
+ // get jobs from database and enqueue for execution
+ const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
+
+ // wait till all jobs are done (or failed), then send a response
+ Promise.allSettled(promises).then(results => {
+ const response = {}
+
+ for (let i = 0; i < results.length; i++) {
+ let jobId = jobIds[i]
+ let result = results[i]
+
+ if (result.status === 'fulfilled') {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (result.value?.id !== undefined)
+ delete result.value.id
+
+ response.jobs[jobId] = result.value
+ } else if (result.status === 'rejected') {
+ if (!('errors' in response))
+ response.errors = {}
+
+ response.errors[jobId] = result.reason?.message
+ }
+ }
+
connection.send(
- new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ new ResponseMessage(requestNo)
+ .setData(response)
)
+ })
+
+ // reject all ignored / non-found jobs
+ for (const [id, value] of results.entries()) {
+ if (!(id in jobPromises)) {
+ this.logger.error(`run-manual: ${id} not found in jobPromises`)
+ continue
+ }
+
+ if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
+ const P = jobPromises[id]
+ delete jobPromises[id]
+
+ if (value.result === JOB_IGNORED)
+ P.reject(new Error(value.reason))
+
+ else if (value.result === JOB_NOTFOUND)
+ P.reject(new Error(`job ${id} not found`))
+ }
}
}
-
function connectToMaster() {
+ const port = config.get('master_port')
+ const host = config.get('master_host')
+
+ if (!host || port) {
+ logger.debug('connectToMaster: master host or port is not defined')
+ return
+ }
+
const connection = new Connection()
- connection.connect(config.get('master_host'), config.get('master_port'))
+ connection.connect(host, port)
connection.on('connect', function() {
connection.sendRequest(
@@ -321,10 +336,11 @@ function connectToMaster() {
}, config.get('master_reconnect_timeout') * 1000)
})
- connection.on('request-message', onRequestMessage)
+ connection.on('request-message', (message, connection) => {
+ requestHandler.process(message, connection)
+ })
}
-
function usage() {
let s = `${process.argv[1]} OPTIONS