diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-03-02 21:06:51 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-03-02 21:06:51 +0300 |
commit | 2be11422804aaf5a088e0c5ffdba53fe1df37365 (patch) | |
tree | 8c1fecb0157d52850d66ebb16b2c79a530f7148c /src/lib | |
parent | 6cd5d5aef1fad6152af67f2f642577d57700d833 (diff) |
run-manual: support multiple jobs
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/worker.js | 249 |
1 files changed, 162 insertions, 87 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js index b4beeab..3713202 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -6,16 +6,20 @@ const {getLogger} = require('./logger') const EventEmitter = require('events') const config = require('./config') -const STATUS_WAITING = 'waiting' -const STATUS_MANUAL = 'manual' +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' const STATUS_ACCEPTED = 'accepted' -const STATUS_IGNORED = 'ignored' -const STATUS_RUNNING = 'running' -const STATUS_DONE = 'done' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' -const RESULT_OK = 'ok' +const RESULT_OK = 'ok' const RESULT_FAIL = 'fail' +const JOB_ACCEPTED = 0x01 +const JOB_IGNORED = 0x02 +const JOB_NOTFOUND = 0x03 + class Worker extends EventEmitter { constructor() { @@ -43,6 +47,8 @@ class Worker extends EventEmitter { } /** + * Creates new queue. + * * @param {string} target * @param {string} slot * @param {number} limit @@ -68,6 +74,8 @@ class Worker extends EventEmitter { } /** + * Checks whether target is being served. + * * @param {string} target * @returns {boolean} */ @@ -98,6 +106,8 @@ class Worker extends EventEmitter { } /** + * Returns list of serving targets. + * * @return {string[]} */ getTargets() { @@ -138,9 +148,9 @@ class Worker extends EventEmitter { this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) this.getTasks(targets) - .then(({rows}) => { - let message = `${LOGPREFIX} ${rows} processed` - if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { + .then(({rowsCount}) => { + let message = `${LOGPREFIX} ${rowsCount} processed` + if (config.get('mysql_fetch_limit') && rowsCount >= config.get('mysql_fetch_limit')) { // it seems, there are more, so we'll need to perform another query this.setPollTargets(targets) message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` @@ -207,23 +217,32 @@ class Worker extends EventEmitter { } /** + * Get new tasks from database. + * * @param {string|null|string[]} target - * @param {string} reqstatus - * @param {object} data - * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + * @param {string} neededStatus + * @param {{ids: number[]}} data + * @returns + * {Promise<{ + * results: Map<number, {status: number, reason: string, slot: string, target: string}>, + * rowsCount: number + * }>} */ - async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { - const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` - + async getTasks(target = null, neededStatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${neededStatus}', ${JSON.stringify(data)}):` + // get new jobs in transaction await db.beginTransaction() - let error = null + /** + * @type {Map<number, {status: number, reason: string, slot: string, target: string}>} + */ + const jobsResults = new Map() let sqlFields = `id, status, target, slot` let sql - if (data.id) { - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` + if (data.ids) { + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE` } else { let targets if (target === null) { @@ -234,109 +253,157 @@ class Worker extends EventEmitter { targets = target } let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' - let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + let sqlWhere = `status=${db.escape(neededStatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` } /** @type {object[]} results */ - let results = await db.query(sql) - this.logger.trace(`${LOGPREFIX} query result:`, results) - - /** - * @type {{target: string, slot: string, id: number}[]} - */ - let accepted = [] - - /** - * @type {number[]} - */ - let ignored = [] - - for (let result of results) { - let {id, slot, target, status} = result - id = parseInt(id) + let rows = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, rows) + + for (let result of rows) { + const id = parseInt(result.id) + const slot = String(result.slot) + const target = String(result.target) + const status = String(result.status) + + if (status !== neededStatus) { + let reason = `status = ${status} != ${neededStatus}` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) - if (status !== reqstatus) { - error = `status = ${status} != ${reqstatus}` - this.logger.warn(`${LOGPREFIX} ${error}`) - ignored.push(id) + this.logger.warn(`${LOGPREFIX} ${reason}`) continue } if (!target || this.targets[target] === undefined) { - error = `target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) + let reason = `target '${target}' not found (job id=${id})` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) + + this.logger.error(`${LOGPREFIX} ${reason}`) continue } if (!slot || this.targets[target].slots[slot] === undefined) { - error = `slot '${slot}' of target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) + let reason = `slot '${slot}' of target '${target}' not found (job id=${id})` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) + + this.logger.error(`${LOGPREFIX} ${reason}`) continue } this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) - accepted.push({target, slot, id}) + + jobsResults.set(id, { + result: JOB_ACCEPTED, + target, + slot + }) + } + + if (data.ids) { + for (const id of data.ids) { + if (!jobsResults.has(id)) + jobsResults.set(id, { + result: JOB_NOTFOUND + }) + } + } + + let accepted = [], ignored = [] + for (const [id, jobResult] of jobsResults.entries()) { + const {result} = jobResult + switch (result) { + case JOB_ACCEPTED: + accepted.push(id) + break + + case JOB_IGNORED: + ignored.push(id) + break + } } if (accepted.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.join(',')+`)`) if (ignored.length) await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) await db.commit() - accepted.forEach(({id, target, slot}) => { - let q = this.targets[target].slots[slot].queue - q.push(async (cb) => { - let data = { - code: null, - signal: null, - stdout: '', - stderr: '' - } - let result = RESULT_OK + for (const [id, jobResult] of jobsResults.entries()) { + const {result} = jobResult + if (result !== JOB_ACCEPTED) + continue - try { - await this.setJobStatus(id, STATUS_RUNNING) + const {slot, target} = jobResult + this.enqueueJob(id, target, slot) + } - Object.assign(data, (await this.run(id))) - if (data.code !== 0) - result = RESULT_FAIL - } catch (error) { - this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + return { + results: jobsResults, + rowsCount: rows.length, + } + } + + /** + * Enqueue job. + * + * @param {int} id + * @param {string} target + * @param {string} slot + */ + enqueueJob(id, target, slot) { + const queue = this.targets[target].slots[slot].queue + queue.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) result = RESULT_FAIL - data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') - } finally { - this.emit('job-done', { - id, - result, - ...data - }) + } catch (error) { + this.logger.error(`job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) - try { - await this.setJobStatus(id, STATUS_DONE, result, data) - } catch (error) { - this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) - } - - cb() + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`setJobStatus(${id})`, error) } - }) - }) - return { - error, - rows: results.length, - accepted: accepted.length, - ignored: ignored.length, - } + cb() + } + }) } /** + * Run job. + * * @param {number} id */ async run(id) { @@ -348,7 +415,7 @@ class Worker extends EventEmitter { let process = child_process.spawn(args[0], args.slice(1), { maxBuffer: config.get('max_output_buffer') }) - + let stdoutChunks = [] let stderrChunks = [] @@ -391,6 +458,8 @@ class Worker extends EventEmitter { } /** + * Write job status to database. + * * @param {number} id * @param {string} status * @param {string} result @@ -458,6 +527,7 @@ class Worker extends EventEmitter { */ onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] if (queue.length < limit && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) @@ -469,10 +539,15 @@ class Worker extends EventEmitter { module.exports = { Worker, + STATUS_WAITING, STATUS_MANUAL, STATUS_ACCEPTED, STATUS_IGNORED, STATUS_RUNNING, STATUS_DONE, + + JOB_ACCEPTED, + JOB_IGNORED, + JOB_NOTFOUND, }
\ No newline at end of file |