summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsrc/jobd.js139
-rw-r--r--src/lib/worker.js249
2 files changed, 266 insertions, 122 deletions
diff --git a/src/jobd.js b/src/jobd.js
index 6f3c85e..e341f3d 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -3,8 +3,21 @@ const minimist = require('minimist')
const loggerModule = require('./lib/logger')
const config = require('./lib/config')
const db = require('./lib/db')
-const {Server, Connection, RequestMessage, ResponseMessage} = require('./lib/server')
-const {Worker, STATUS_MANUAL} = require('./lib/worker')
+const {uniq} = require('lodash')
+const {createCallablePromise} = require('./lib/util')
+const {
+ Server,
+ Connection,
+ RequestMessage,
+ ResponseMessage
+} = require('./lib/server')
+const {
+ Worker,
+ STATUS_MANUAL,
+ JOB_NOTFOUND,
+ JOB_ACCEPTED,
+ JOB_IGNORED
+} = require('./lib/worker')
const package_json = require('../package.json')
/**
@@ -23,9 +36,9 @@ let logger
let server
/**
- * @type {object.<string, {connection: Connection, requestNo: number}>}
+ * @type {object.<string, Promise>}
*/
-let jobDoneAwaiters = {}
+let jobPromises = {}
main().catch(e => {
@@ -95,16 +108,14 @@ async function main() {
}
}
worker.on('job-done', (data) => {
- if (jobDoneAwaiters[data.id] !== undefined) {
- const {connection, requestNo} = jobDoneAwaiters[data.id]
-
- connection.send(
- new ResponseMessage(requestNo)
- .setData(data)
- )
- connection.close()
-
- delete jobDoneAwaiters[data.id]
+ if (jobPromises[data.id] !== undefined) {
+ const P = jobPromises[data.id]
+ delete jobPromises[data.id]
+
+ logger.trace(`job-done: resolving promise of job ${data.id}`)
+ P.resolve(data)
+ } else {
+ logger.warn(`job-done: jobPromises[${data.id}] is undefined`)
}
})
logger.info('queue initialized')
@@ -133,7 +144,7 @@ async function onRequestMessage(message, connection) {
logger.info('onMessage:', message)
switch (message.requestType) {
- case 'poll':
+ case 'poll': {
const targets = message.requestData?.targets || []
if (!targets.length) {
connection.send(
@@ -161,49 +172,107 @@ async function onRequestMessage(message, connection) {
.setData('ok')
)
break
+ }
- case 'status':
+ case 'status': {
const qs = worker.getStatus()
connection.send(
new ResponseMessage(message.requestNo)
.setData({
queue: qs,
- jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
+ jobDoneAwaitersCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
})
)
break
+ }
- case 'run-manual':
- const {id} = message.requestData
- if (id in jobDoneAwaiters) {
- connection.send(
- new ResponseMessage(message.requestNo)
- .setError('another client is already waiting this job')
- )
- break
+ case 'run-manual': {
+ let {ids: jobIds} = message.requestData
+ jobIds = uniq(jobIds)
+
+ // if at least one of the jobs is already being run, reject
+ // if at least one item is not a number, reject
+ for (const id of jobIds) {
+ if (typeof id !== 'number') {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`all ids must be numbers, got ${typeof id}`)
+ )
+ return
+ }
+
+ if (id in jobPromises) {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`another client is already waiting for job ${id}`)
+ )
+ return
+ }
}
- jobDoneAwaiters[id] = {
- connection,
- requestNo: message.requestNo
+ // create a bunch of promises, one per job
+ let promises = []
+ for (const id of jobIds) {
+ const P = createCallablePromise()
+ jobPromises[id] = P
+ promises.push(P)
}
- const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id})
- if (!accepted) {
- delete jobDoneAwaiters[id]
+ // get jobs from database and enqueue for execution
+ const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds})
+
+ // wait till all jobs are done (or failed), then send a response
+ Promise.allSettled(promises).then(results => {
+ const response = {}
+
+ for (let i = 0; i < results.length; i++) {
+ let jobId = jobIds[i]
+ let result = results[i]
+
+ if (result.status === 'fulfilled') {
+ if (!('jobs' in response))
+ response.jobs = {}
+
+ if (result.value?.id !== undefined)
+ delete result.value.id
+
+ response.jobs[jobId] = result.value
+ } else if (result.status === 'rejected') {
+ if (!('errors' in response))
+ response.errors = {}
- let message = 'failed to run task'
- if (typeof error === 'string')
- message += `: ${error}`
+ response.errors[jobId] = result.reason?.message
+ }
+ }
connection.send(
new ResponseMessage(message.requestNo)
- .setError(message)
+ .setData(response)
)
+ })
+
+ // reject all ignored / non-found jobs
+ for (const [id, value] of results.entries()) {
+ if (!(id in jobPromises)) {
+ this.logger.error(`run-manual: ${id} not found in jobPromises`)
+ continue
+ }
+
+ if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) {
+ const P = jobPromises[id]
+ delete jobPromises[id]
+
+ if (value.result === JOB_IGNORED)
+ P.reject(new Error(value.reason))
+
+ else if (value.result === JOB_NOTFOUND)
+ P.reject(new Error(`job ${id} not found`))
+ }
}
break
+ }
default:
connection.send(
diff --git a/src/lib/worker.js b/src/lib/worker.js
index b4beeab..3713202 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -6,16 +6,20 @@ const {getLogger} = require('./logger')
const EventEmitter = require('events')
const config = require('./config')
-const STATUS_WAITING = 'waiting'
-const STATUS_MANUAL = 'manual'
+const STATUS_WAITING = 'waiting'
+const STATUS_MANUAL = 'manual'
const STATUS_ACCEPTED = 'accepted'
-const STATUS_IGNORED = 'ignored'
-const STATUS_RUNNING = 'running'
-const STATUS_DONE = 'done'
+const STATUS_IGNORED = 'ignored'
+const STATUS_RUNNING = 'running'
+const STATUS_DONE = 'done'
-const RESULT_OK = 'ok'
+const RESULT_OK = 'ok'
const RESULT_FAIL = 'fail'
+const JOB_ACCEPTED = 0x01
+const JOB_IGNORED = 0x02
+const JOB_NOTFOUND = 0x03
+
class Worker extends EventEmitter {
constructor() {
@@ -43,6 +47,8 @@ class Worker extends EventEmitter {
}
/**
+ * Creates new queue.
+ *
* @param {string} target
* @param {string} slot
* @param {number} limit
@@ -68,6 +74,8 @@ class Worker extends EventEmitter {
}
/**
+ * Checks whether target is being served.
+ *
* @param {string} target
* @returns {boolean}
*/
@@ -98,6 +106,8 @@ class Worker extends EventEmitter {
}
/**
+ * Returns list of serving targets.
+ *
* @return {string[]}
*/
getTargets() {
@@ -138,9 +148,9 @@ class Worker extends EventEmitter {
this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`)
this.getTasks(targets)
- .then(({rows}) => {
- let message = `${LOGPREFIX} ${rows} processed`
- if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) {
+ .then(({rowsCount}) => {
+ let message = `${LOGPREFIX} ${rowsCount} processed`
+ if (config.get('mysql_fetch_limit') && rowsCount >= config.get('mysql_fetch_limit')) {
// it seems, there are more, so we'll need to perform another query
this.setPollTargets(targets)
message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
@@ -207,23 +217,32 @@ class Worker extends EventEmitter {
}
/**
+ * Get new tasks from database.
+ *
* @param {string|null|string[]} target
- * @param {string} reqstatus
- * @param {object} data
- * @returns {Promise<{ignored: number, accepted: number, rows: number}>}
+ * @param {string} neededStatus
+ * @param {{ids: number[]}} data
+ * @returns
+ * {Promise<{
+ * results: Map<number, {status: number, reason: string, slot: string, target: string}>,
+ * rowsCount: number
+ * }>}
*/
- async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) {
- const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):`
-
+ async getTasks(target = null, neededStatus = STATUS_WAITING, data = {}) {
+ const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${neededStatus}', ${JSON.stringify(data)}):`
+
// get new jobs in transaction
await db.beginTransaction()
- let error = null
+ /**
+ * @type {Map<number, {status: number, reason: string, slot: string, target: string}>}
+ */
+ const jobsResults = new Map()
let sqlFields = `id, status, target, slot`
let sql
- if (data.id) {
- sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE`
+ if (data.ids) {
+ sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE`
} else {
let targets
if (target === null) {
@@ -234,109 +253,157 @@ class Worker extends EventEmitter {
targets = target
}
let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : ''
- let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
+ let sqlWhere = `status=${db.escape(neededStatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
}
/** @type {object[]} results */
- let results = await db.query(sql)
- this.logger.trace(`${LOGPREFIX} query result:`, results)
-
- /**
- * @type {{target: string, slot: string, id: number}[]}
- */
- let accepted = []
-
- /**
- * @type {number[]}
- */
- let ignored = []
-
- for (let result of results) {
- let {id, slot, target, status} = result
- id = parseInt(id)
+ let rows = await db.query(sql)
+ this.logger.trace(`${LOGPREFIX} query result:`, rows)
+
+ for (let result of rows) {
+ const id = parseInt(result.id)
+ const slot = String(result.slot)
+ const target = String(result.target)
+ const status = String(result.status)
+
+ if (status !== neededStatus) {
+ let reason = `status = ${status} != ${neededStatus}`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
- if (status !== reqstatus) {
- error = `status = ${status} != ${reqstatus}`
- this.logger.warn(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ this.logger.warn(`${LOGPREFIX} ${reason}`)
continue
}
if (!target || this.targets[target] === undefined) {
- error = `target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ let reason = `target '${target}' not found (job id=${id})`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
+
+ this.logger.error(`${LOGPREFIX} ${reason}`)
continue
}
if (!slot || this.targets[target].slots[slot] === undefined) {
- error = `slot '${slot}' of target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ let reason = `slot '${slot}' of target '${target}' not found (job id=${id})`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
+
+ this.logger.error(`${LOGPREFIX} ${reason}`)
continue
}
this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
- accepted.push({target, slot, id})
+
+ jobsResults.set(id, {
+ result: JOB_ACCEPTED,
+ target,
+ slot
+ })
+ }
+
+ if (data.ids) {
+ for (const id of data.ids) {
+ if (!jobsResults.has(id))
+ jobsResults.set(id, {
+ result: JOB_NOTFOUND
+ })
+ }
+ }
+
+ let accepted = [], ignored = []
+ for (const [id, jobResult] of jobsResults.entries()) {
+ const {result} = jobResult
+ switch (result) {
+ case JOB_ACCEPTED:
+ accepted.push(id)
+ break
+
+ case JOB_IGNORED:
+ ignored.push(id)
+ break
+ }
}
if (accepted.length)
- await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
+ await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.join(',')+`)`)
if (ignored.length)
await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
await db.commit()
- accepted.forEach(({id, target, slot}) => {
- let q = this.targets[target].slots[slot].queue
- q.push(async (cb) => {
- let data = {
- code: null,
- signal: null,
- stdout: '',
- stderr: ''
- }
- let result = RESULT_OK
+ for (const [id, jobResult] of jobsResults.entries()) {
+ const {result} = jobResult
+ if (result !== JOB_ACCEPTED)
+ continue
- try {
- await this.setJobStatus(id, STATUS_RUNNING)
+ const {slot, target} = jobResult
+ this.enqueueJob(id, target, slot)
+ }
- Object.assign(data, (await this.run(id)))
- if (data.code !== 0)
- result = RESULT_FAIL
- } catch (error) {
- this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error)
+ return {
+ results: jobsResults,
+ rowsCount: rows.length,
+ }
+ }
+
+ /**
+ * Enqueue job.
+ *
+ * @param {int} id
+ * @param {string} target
+ * @param {string} slot
+ */
+ enqueueJob(id, target, slot) {
+ const queue = this.targets[target].slots[slot].queue
+ queue.push(async (cb) => {
+ let data = {
+ code: null,
+ signal: null,
+ stdout: '',
+ stderr: ''
+ }
+ let result = RESULT_OK
+
+ try {
+ await this.setJobStatus(id, STATUS_RUNNING)
+
+ Object.assign(data, (await this.run(id)))
+ if (data.code !== 0)
result = RESULT_FAIL
- data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
- } finally {
- this.emit('job-done', {
- id,
- result,
- ...data
- })
+ } catch (error) {
+ this.logger.error(`job ${id}: error while run():`, error)
+ result = RESULT_FAIL
+ data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
+ } finally {
+ this.emit('job-done', {
+ id,
+ result,
+ ...data
+ })
- try {
- await this.setJobStatus(id, STATUS_DONE, result, data)
- } catch (error) {
- this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error)
- }
-
- cb()
+ try {
+ await this.setJobStatus(id, STATUS_DONE, result, data)
+ } catch (error) {
+ this.logger.error(`setJobStatus(${id})`, error)
}
- })
- })
- return {
- error,
- rows: results.length,
- accepted: accepted.length,
- ignored: ignored.length,
- }
+ cb()
+ }
+ })
}
/**
+ * Run job.
+ *
* @param {number} id
*/
async run(id) {
@@ -348,7 +415,7 @@ class Worker extends EventEmitter {
let process = child_process.spawn(args[0], args.slice(1), {
maxBuffer: config.get('max_output_buffer')
})
-
+
let stdoutChunks = []
let stderrChunks = []
@@ -391,6 +458,8 @@ class Worker extends EventEmitter {
}
/**
+ * Write job status to database.
+ *
* @param {number} id
* @param {string} status
* @param {string} result
@@ -458,6 +527,7 @@ class Worker extends EventEmitter {
*/
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+
const {queue, limit} = this.targets[target].slots[slot]
if (queue.length < limit && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`)
@@ -469,10 +539,15 @@ class Worker extends EventEmitter {
module.exports = {
Worker,
+
STATUS_WAITING,
STATUS_MANUAL,
STATUS_ACCEPTED,
STATUS_IGNORED,
STATUS_RUNNING,
STATUS_DONE,
+
+ JOB_ACCEPTED,
+ JOB_IGNORED,
+ JOB_NOTFOUND,
} \ No newline at end of file