From 2be11422804aaf5a088e0c5ffdba53fe1df37365 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Tue, 2 Mar 2021 21:06:51 +0300 Subject: run-manual: support multiple jobs --- src/jobd.js | 139 ++++++++++++++++++++++-------- src/lib/worker.js | 249 +++++++++++++++++++++++++++++++++++------------------- 2 files changed, 266 insertions(+), 122 deletions(-) diff --git a/src/jobd.js b/src/jobd.js index 6f3c85e..e341f3d 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -3,8 +3,21 @@ const minimist = require('minimist') const loggerModule = require('./lib/logger') const config = require('./lib/config') const db = require('./lib/db') -const {Server, Connection, RequestMessage, ResponseMessage} = require('./lib/server') -const {Worker, STATUS_MANUAL} = require('./lib/worker') +const {uniq} = require('lodash') +const {createCallablePromise} = require('./lib/util') +const { + Server, + Connection, + RequestMessage, + ResponseMessage +} = require('./lib/server') +const { + Worker, + STATUS_MANUAL, + JOB_NOTFOUND, + JOB_ACCEPTED, + JOB_IGNORED +} = require('./lib/worker') const package_json = require('../package.json') /** @@ -23,9 +36,9 @@ let logger let server /** - * @type {object.} + * @type {object.} */ -let jobDoneAwaiters = {} +let jobPromises = {} main().catch(e => { @@ -95,16 +108,14 @@ async function main() { } } worker.on('job-done', (data) => { - if (jobDoneAwaiters[data.id] !== undefined) { - const {connection, requestNo} = jobDoneAwaiters[data.id] - - connection.send( - new ResponseMessage(requestNo) - .setData(data) - ) - connection.close() - - delete jobDoneAwaiters[data.id] + if (jobPromises[data.id] !== undefined) { + const P = jobPromises[data.id] + delete jobPromises[data.id] + + logger.trace(`job-done: resolving promise of job ${data.id}`) + P.resolve(data) + } else { + logger.warn(`job-done: jobPromises[${data.id}] is undefined`) } }) logger.info('queue initialized') @@ -133,7 +144,7 @@ async function onRequestMessage(message, connection) { logger.info('onMessage:', message) switch (message.requestType) { - case 'poll': + case 'poll': { const targets = message.requestData?.targets || [] if (!targets.length) { connection.send( @@ -161,49 +172,107 @@ async function onRequestMessage(message, connection) { .setData('ok') ) break + } - case 'status': + case 'status': { const qs = worker.getStatus() connection.send( new ResponseMessage(message.requestNo) .setData({ queue: qs, - jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + jobDoneAwaitersCount: Object.keys(jobPromises).length, memoryUsage: process.memoryUsage() }) ) break + } - case 'run-manual': - const {id} = message.requestData - if (id in jobDoneAwaiters) { - connection.send( - new ResponseMessage(message.requestNo) - .setError('another client is already waiting this job') - ) - break + case 'run-manual': { + let {ids: jobIds} = message.requestData + jobIds = uniq(jobIds) + + // if at least one of the jobs is already being run, reject + // if at least one item is not a number, reject + for (const id of jobIds) { + if (typeof id !== 'number') { + connection.send( + new ResponseMessage(message.requestNo) + .setError(`all ids must be numbers, got ${typeof id}`) + ) + return + } + + if (id in jobPromises) { + connection.send( + new ResponseMessage(message.requestNo) + .setError(`another client is already waiting for job ${id}`) + ) + return + } } - jobDoneAwaiters[id] = { - connection, - requestNo: message.requestNo + // create a bunch of promises, one per job + let promises = [] + for (const id of jobIds) { + const P = createCallablePromise() + jobPromises[id] = P + promises.push(P) } - const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id}) - if (!accepted) { - delete jobDoneAwaiters[id] + // get jobs from database and enqueue for execution + const {results} = await worker.getTasks(null, STATUS_MANUAL, {ids: jobIds}) + + // wait till all jobs are done (or failed), then send a response + Promise.allSettled(promises).then(results => { + const response = {} + + for (let i = 0; i < results.length; i++) { + let jobId = jobIds[i] + let result = results[i] + + if (result.status === 'fulfilled') { + if (!('jobs' in response)) + response.jobs = {} + + if (result.value?.id !== undefined) + delete result.value.id + + response.jobs[jobId] = result.value + } else if (result.status === 'rejected') { + if (!('errors' in response)) + response.errors = {} - let message = 'failed to run task' - if (typeof error === 'string') - message += `: ${error}` + response.errors[jobId] = result.reason?.message + } + } connection.send( new ResponseMessage(message.requestNo) - .setError(message) + .setData(response) ) + }) + + // reject all ignored / non-found jobs + for (const [id, value] of results.entries()) { + if (!(id in jobPromises)) { + this.logger.error(`run-manual: ${id} not found in jobPromises`) + continue + } + + if (value.result === JOB_IGNORED || value.result === JOB_NOTFOUND) { + const P = jobPromises[id] + delete jobPromises[id] + + if (value.result === JOB_IGNORED) + P.reject(new Error(value.reason)) + + else if (value.result === JOB_NOTFOUND) + P.reject(new Error(`job ${id} not found`)) + } } break + } default: connection.send( diff --git a/src/lib/worker.js b/src/lib/worker.js index b4beeab..3713202 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -6,16 +6,20 @@ const {getLogger} = require('./logger') const EventEmitter = require('events') const config = require('./config') -const STATUS_WAITING = 'waiting' -const STATUS_MANUAL = 'manual' +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' const STATUS_ACCEPTED = 'accepted' -const STATUS_IGNORED = 'ignored' -const STATUS_RUNNING = 'running' -const STATUS_DONE = 'done' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' -const RESULT_OK = 'ok' +const RESULT_OK = 'ok' const RESULT_FAIL = 'fail' +const JOB_ACCEPTED = 0x01 +const JOB_IGNORED = 0x02 +const JOB_NOTFOUND = 0x03 + class Worker extends EventEmitter { constructor() { @@ -43,6 +47,8 @@ class Worker extends EventEmitter { } /** + * Creates new queue. + * * @param {string} target * @param {string} slot * @param {number} limit @@ -68,6 +74,8 @@ class Worker extends EventEmitter { } /** + * Checks whether target is being served. + * * @param {string} target * @returns {boolean} */ @@ -98,6 +106,8 @@ class Worker extends EventEmitter { } /** + * Returns list of serving targets. + * * @return {string[]} */ getTargets() { @@ -138,9 +148,9 @@ class Worker extends EventEmitter { this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) this.getTasks(targets) - .then(({rows}) => { - let message = `${LOGPREFIX} ${rows} processed` - if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { + .then(({rowsCount}) => { + let message = `${LOGPREFIX} ${rowsCount} processed` + if (config.get('mysql_fetch_limit') && rowsCount >= config.get('mysql_fetch_limit')) { // it seems, there are more, so we'll need to perform another query this.setPollTargets(targets) message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` @@ -207,23 +217,32 @@ class Worker extends EventEmitter { } /** + * Get new tasks from database. + * * @param {string|null|string[]} target - * @param {string} reqstatus - * @param {object} data - * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + * @param {string} neededStatus + * @param {{ids: number[]}} data + * @returns + * {Promise<{ + * results: Map, + * rowsCount: number + * }>} */ - async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { - const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` - + async getTasks(target = null, neededStatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${neededStatus}', ${JSON.stringify(data)}):` + // get new jobs in transaction await db.beginTransaction() - let error = null + /** + * @type {Map} + */ + const jobsResults = new Map() let sqlFields = `id, status, target, slot` let sql - if (data.id) { - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` + if (data.ids) { + sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE` } else { let targets if (target === null) { @@ -234,109 +253,157 @@ class Worker extends EventEmitter { targets = target } let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' - let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + let sqlWhere = `status=${db.escape(neededStatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` } /** @type {object[]} results */ - let results = await db.query(sql) - this.logger.trace(`${LOGPREFIX} query result:`, results) - - /** - * @type {{target: string, slot: string, id: number}[]} - */ - let accepted = [] - - /** - * @type {number[]} - */ - let ignored = [] - - for (let result of results) { - let {id, slot, target, status} = result - id = parseInt(id) + let rows = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, rows) + + for (let result of rows) { + const id = parseInt(result.id) + const slot = String(result.slot) + const target = String(result.target) + const status = String(result.status) + + if (status !== neededStatus) { + let reason = `status = ${status} != ${neededStatus}` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) - if (status !== reqstatus) { - error = `status = ${status} != ${reqstatus}` - this.logger.warn(`${LOGPREFIX} ${error}`) - ignored.push(id) + this.logger.warn(`${LOGPREFIX} ${reason}`) continue } if (!target || this.targets[target] === undefined) { - error = `target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) + let reason = `target '${target}' not found (job id=${id})` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) + + this.logger.error(`${LOGPREFIX} ${reason}`) continue } if (!slot || this.targets[target].slots[slot] === undefined) { - error = `slot '${slot}' of target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) + let reason = `slot '${slot}' of target '${target}' not found (job id=${id})` + jobsResults.set(id, { + result: JOB_IGNORED, + reason + }) + + this.logger.error(`${LOGPREFIX} ${reason}`) continue } this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) - accepted.push({target, slot, id}) + + jobsResults.set(id, { + result: JOB_ACCEPTED, + target, + slot + }) + } + + if (data.ids) { + for (const id of data.ids) { + if (!jobsResults.has(id)) + jobsResults.set(id, { + result: JOB_NOTFOUND + }) + } + } + + let accepted = [], ignored = [] + for (const [id, jobResult] of jobsResults.entries()) { + const {result} = jobResult + switch (result) { + case JOB_ACCEPTED: + accepted.push(id) + break + + case JOB_IGNORED: + ignored.push(id) + break + } } if (accepted.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.join(',')+`)`) if (ignored.length) await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) await db.commit() - accepted.forEach(({id, target, slot}) => { - let q = this.targets[target].slots[slot].queue - q.push(async (cb) => { - let data = { - code: null, - signal: null, - stdout: '', - stderr: '' - } - let result = RESULT_OK + for (const [id, jobResult] of jobsResults.entries()) { + const {result} = jobResult + if (result !== JOB_ACCEPTED) + continue - try { - await this.setJobStatus(id, STATUS_RUNNING) + const {slot, target} = jobResult + this.enqueueJob(id, target, slot) + } - Object.assign(data, (await this.run(id))) - if (data.code !== 0) - result = RESULT_FAIL - } catch (error) { - this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + return { + results: jobsResults, + rowsCount: rows.length, + } + } + + /** + * Enqueue job. + * + * @param {int} id + * @param {string} target + * @param {string} slot + */ + enqueueJob(id, target, slot) { + const queue = this.targets[target].slots[slot].queue + queue.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) result = RESULT_FAIL - data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') - } finally { - this.emit('job-done', { - id, - result, - ...data - }) + } catch (error) { + this.logger.error(`job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) - try { - await this.setJobStatus(id, STATUS_DONE, result, data) - } catch (error) { - this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) - } - - cb() + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`setJobStatus(${id})`, error) } - }) - }) - return { - error, - rows: results.length, - accepted: accepted.length, - ignored: ignored.length, - } + cb() + } + }) } /** + * Run job. + * * @param {number} id */ async run(id) { @@ -348,7 +415,7 @@ class Worker extends EventEmitter { let process = child_process.spawn(args[0], args.slice(1), { maxBuffer: config.get('max_output_buffer') }) - + let stdoutChunks = [] let stderrChunks = [] @@ -391,6 +458,8 @@ class Worker extends EventEmitter { } /** + * Write job status to database. + * * @param {number} id * @param {string} status * @param {string} result @@ -458,6 +527,7 @@ class Worker extends EventEmitter { */ onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] if (queue.length < limit && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) @@ -469,10 +539,15 @@ class Worker extends EventEmitter { module.exports = { Worker, + STATUS_WAITING, STATUS_MANUAL, STATUS_ACCEPTED, STATUS_IGNORED, STATUS_RUNNING, STATUS_DONE, + + JOB_ACCEPTED, + JOB_IGNORED, + JOB_NOTFOUND, } \ No newline at end of file -- cgit v1.2.3