aboutsummaryrefslogtreecommitdiff
path: root/src/lib/worker.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/worker.js')
-rw-r--r--src/lib/worker.js249
1 files changed, 162 insertions, 87 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js
index b4beeab..3713202 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -6,16 +6,20 @@ const {getLogger} = require('./logger')
const EventEmitter = require('events')
const config = require('./config')
-const STATUS_WAITING = 'waiting'
-const STATUS_MANUAL = 'manual'
+const STATUS_WAITING = 'waiting'
+const STATUS_MANUAL = 'manual'
const STATUS_ACCEPTED = 'accepted'
-const STATUS_IGNORED = 'ignored'
-const STATUS_RUNNING = 'running'
-const STATUS_DONE = 'done'
+const STATUS_IGNORED = 'ignored'
+const STATUS_RUNNING = 'running'
+const STATUS_DONE = 'done'
-const RESULT_OK = 'ok'
+const RESULT_OK = 'ok'
const RESULT_FAIL = 'fail'
+const JOB_ACCEPTED = 0x01
+const JOB_IGNORED = 0x02
+const JOB_NOTFOUND = 0x03
+
class Worker extends EventEmitter {
constructor() {
@@ -43,6 +47,8 @@ class Worker extends EventEmitter {
}
/**
+ * Creates new queue.
+ *
* @param {string} target
* @param {string} slot
* @param {number} limit
@@ -68,6 +74,8 @@ class Worker extends EventEmitter {
}
/**
+ * Checks whether target is being served.
+ *
* @param {string} target
* @returns {boolean}
*/
@@ -98,6 +106,8 @@ class Worker extends EventEmitter {
}
/**
+ * Returns list of serving targets.
+ *
* @return {string[]}
*/
getTargets() {
@@ -138,9 +148,9 @@ class Worker extends EventEmitter {
this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`)
this.getTasks(targets)
- .then(({rows}) => {
- let message = `${LOGPREFIX} ${rows} processed`
- if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) {
+ .then(({rowsCount}) => {
+ let message = `${LOGPREFIX} ${rowsCount} processed`
+ if (config.get('mysql_fetch_limit') && rowsCount >= config.get('mysql_fetch_limit')) {
// it seems, there are more, so we'll need to perform another query
this.setPollTargets(targets)
message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
@@ -207,23 +217,32 @@ class Worker extends EventEmitter {
}
/**
+ * Get new tasks from database.
+ *
* @param {string|null|string[]} target
- * @param {string} reqstatus
- * @param {object} data
- * @returns {Promise<{ignored: number, accepted: number, rows: number}>}
+ * @param {string} neededStatus
+ * @param {{ids: number[]}} data
+ * @returns
+ * {Promise<{
+ * results: Map<number, {status: number, reason: string, slot: string, target: string}>,
+ * rowsCount: number
+ * }>}
*/
- async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) {
- const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):`
-
+ async getTasks(target = null, neededStatus = STATUS_WAITING, data = {}) {
+ const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${neededStatus}', ${JSON.stringify(data)}):`
+
// get new jobs in transaction
await db.beginTransaction()
- let error = null
+ /**
+ * @type {Map<number, {status: number, reason: string, slot: string, target: string}>}
+ */
+ const jobsResults = new Map()
let sqlFields = `id, status, target, slot`
let sql
- if (data.id) {
- sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE`
+ if (data.ids) {
+ sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE`
} else {
let targets
if (target === null) {
@@ -234,109 +253,157 @@ class Worker extends EventEmitter {
targets = target
}
let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : ''
- let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
+ let sqlWhere = `status=${db.escape(neededStatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
}
/** @type {object[]} results */
- let results = await db.query(sql)
- this.logger.trace(`${LOGPREFIX} query result:`, results)
-
- /**
- * @type {{target: string, slot: string, id: number}[]}
- */
- let accepted = []
-
- /**
- * @type {number[]}
- */
- let ignored = []
-
- for (let result of results) {
- let {id, slot, target, status} = result
- id = parseInt(id)
+ let rows = await db.query(sql)
+ this.logger.trace(`${LOGPREFIX} query result:`, rows)
+
+ for (let result of rows) {
+ const id = parseInt(result.id)
+ const slot = String(result.slot)
+ const target = String(result.target)
+ const status = String(result.status)
+
+ if (status !== neededStatus) {
+ let reason = `status = ${status} != ${neededStatus}`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
- if (status !== reqstatus) {
- error = `status = ${status} != ${reqstatus}`
- this.logger.warn(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ this.logger.warn(`${LOGPREFIX} ${reason}`)
continue
}
if (!target || this.targets[target] === undefined) {
- error = `target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ let reason = `target '${target}' not found (job id=${id})`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
+
+ this.logger.error(`${LOGPREFIX} ${reason}`)
continue
}
if (!slot || this.targets[target].slots[slot] === undefined) {
- error = `slot '${slot}' of target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
+ let reason = `slot '${slot}' of target '${target}' not found (job id=${id})`
+ jobsResults.set(id, {
+ result: JOB_IGNORED,
+ reason
+ })
+
+ this.logger.error(`${LOGPREFIX} ${reason}`)
continue
}
this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
- accepted.push({target, slot, id})
+
+ jobsResults.set(id, {
+ result: JOB_ACCEPTED,
+ target,
+ slot
+ })
+ }
+
+ if (data.ids) {
+ for (const id of data.ids) {
+ if (!jobsResults.has(id))
+ jobsResults.set(id, {
+ result: JOB_NOTFOUND
+ })
+ }
+ }
+
+ let accepted = [], ignored = []
+ for (const [id, jobResult] of jobsResults.entries()) {
+ const {result} = jobResult
+ switch (result) {
+ case JOB_ACCEPTED:
+ accepted.push(id)
+ break
+
+ case JOB_IGNORED:
+ ignored.push(id)
+ break
+ }
}
if (accepted.length)
- await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
+ await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.join(',')+`)`)
if (ignored.length)
await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
await db.commit()
- accepted.forEach(({id, target, slot}) => {
- let q = this.targets[target].slots[slot].queue
- q.push(async (cb) => {
- let data = {
- code: null,
- signal: null,
- stdout: '',
- stderr: ''
- }
- let result = RESULT_OK
+ for (const [id, jobResult] of jobsResults.entries()) {
+ const {result} = jobResult
+ if (result !== JOB_ACCEPTED)
+ continue
- try {
- await this.setJobStatus(id, STATUS_RUNNING)
+ const {slot, target} = jobResult
+ this.enqueueJob(id, target, slot)
+ }
- Object.assign(data, (await this.run(id)))
- if (data.code !== 0)
- result = RESULT_FAIL
- } catch (error) {
- this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error)
+ return {
+ results: jobsResults,
+ rowsCount: rows.length,
+ }
+ }
+
+ /**
+ * Enqueue job.
+ *
+ * @param {int} id
+ * @param {string} target
+ * @param {string} slot
+ */
+ enqueueJob(id, target, slot) {
+ const queue = this.targets[target].slots[slot].queue
+ queue.push(async (cb) => {
+ let data = {
+ code: null,
+ signal: null,
+ stdout: '',
+ stderr: ''
+ }
+ let result = RESULT_OK
+
+ try {
+ await this.setJobStatus(id, STATUS_RUNNING)
+
+ Object.assign(data, (await this.run(id)))
+ if (data.code !== 0)
result = RESULT_FAIL
- data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
- } finally {
- this.emit('job-done', {
- id,
- result,
- ...data
- })
+ } catch (error) {
+ this.logger.error(`job ${id}: error while run():`, error)
+ result = RESULT_FAIL
+ data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
+ } finally {
+ this.emit('job-done', {
+ id,
+ result,
+ ...data
+ })
- try {
- await this.setJobStatus(id, STATUS_DONE, result, data)
- } catch (error) {
- this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error)
- }
-
- cb()
+ try {
+ await this.setJobStatus(id, STATUS_DONE, result, data)
+ } catch (error) {
+ this.logger.error(`setJobStatus(${id})`, error)
}
- })
- })
- return {
- error,
- rows: results.length,
- accepted: accepted.length,
- ignored: ignored.length,
- }
+ cb()
+ }
+ })
}
/**
+ * Run job.
+ *
* @param {number} id
*/
async run(id) {
@@ -348,7 +415,7 @@ class Worker extends EventEmitter {
let process = child_process.spawn(args[0], args.slice(1), {
maxBuffer: config.get('max_output_buffer')
})
-
+
let stdoutChunks = []
let stderrChunks = []
@@ -391,6 +458,8 @@ class Worker extends EventEmitter {
}
/**
+ * Write job status to database.
+ *
* @param {number} id
* @param {string} status
* @param {string} result
@@ -458,6 +527,7 @@ class Worker extends EventEmitter {
*/
onJobFinished = (target, slot) => {
this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
+
const {queue, limit} = this.targets[target].slots[slot]
if (queue.length < limit && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`)
@@ -469,10 +539,15 @@ class Worker extends EventEmitter {
module.exports = {
Worker,
+
STATUS_WAITING,
STATUS_MANUAL,
STATUS_ACCEPTED,
STATUS_IGNORED,
STATUS_RUNNING,
STATUS_DONE,
+
+ JOB_ACCEPTED,
+ JOB_IGNORED,
+ JOB_NOTFOUND,
} \ No newline at end of file