From 5e7d34458a6e60487393caa4f320ab1cfc1cf8e5 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 24 Feb 2021 03:59:25 +0300 Subject: initial commit --- src/worker.js | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 472 insertions(+) create mode 100644 src/worker.js (limited to 'src/worker.js') diff --git a/src/worker.js b/src/worker.js new file mode 100644 index 0000000..3151b40 --- /dev/null +++ b/src/worker.js @@ -0,0 +1,472 @@ +const Queue = require('queue') +const child_process = require('child_process') +const db = require('./db') +const {timestamp} = require('./util') +const {getLogger} = require('./logger') +const EventEmitter = require('events') +const {workerConfig} = require('./config') + +const STATUS_WAITING = 'waiting' +const STATUS_MANUAL = 'manual' +const STATUS_ACCEPTED = 'accepted' +const STATUS_IGNORED = 'ignored' +const STATUS_RUNNING = 'running' +const STATUS_DONE = 'done' + +const RESULT_OK = 'ok' +const RESULT_FAIL = 'fail' + +class Worker extends EventEmitter { + + constructor() { + super() + + /** + * @type {object.}>} + */ + this.targets = {} + + /** + * @type {boolean} + */ + this.polling = false + + /** + * @type {boolean} + */ + this.nextpoll = {} + + /** + * @type {Logger} + */ + this.logger = getLogger('Worker') + } + + /** + * @param {string} target + * @param {string} slot + * @param {number} limit + */ + addSlot(target, slot, limit) { + this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + + if (this.targets[target] === undefined) + this.targets[target] = {slots: {}} + + if (this.targets[target].slots[slot] !== undefined) + throw new Error(`slot ${slot} for target ${target} has already been added`) + + let queue = Queue({ + concurrency: limit, + autostart: true + }) + queue.on('success', this.onJobFinished.bind(this, target, slot)) + queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.start() + + this.targets[target].slots[slot] = {limit, queue} + } + + /** + * @param {string} target + * @returns {boolean} + */ + hasTarget(target) { + return (target in this.targets) + } + + /** + * Returns status of all queues. + * + * @return {object} + */ + getStatus() { + let status = {targets: {}} + for (const targetName in this.targets) { + let target = this.targets[targetName] + status.targets[targetName] = {} + for (const slotName in target.slots) { + const {queue, limit} = target.slots[slotName] + status.targets[targetName][slotName] = { + concurrency: queue.concurrency, + limit, + length: queue.length, + } + } + } + return status + } + + /** + * @return {string[]} + */ + getTargets() { + return Object.keys(this.targets) + } + + /** + * + */ + poll() { + const LOGPREFIX = `poll():` + + let targets = this.getPollTargets() + if (!targets.length) { + this.poller.warn(`${LOGPREFIX} no targets`) + return + } + + // skip and postpone the poll, if we're in the middle on another poll + // it will be called again from the last .then() at the end of this method + if (this.polling) { + this.logger.debug(`${LOGPREFIX} already polling`) + return + } + + // skip and postpone the poll, if no free slots + // it will be called again from onJobFinished() + if (!this.hasFreeSlots(targets)) { + this.logger.debug(`${LOGPREFIX} no free slots`) + return + } + + // set polling flag + this.polling = true + + // clear postponed polls target list + this.setPollTargets() + + this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) + this.getTasks(targets) + .then(({rows}) => { + let message = `${LOGPREFIX} ${rows} processed` + if (workerConfig.mysql_fetch_limit && rows >= workerConfig.mysql_fetch_limit) { + // it seems, there are more, so we'll need to perform another query + this.setPollTargets(targets) + message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` + } + this.logger.debug(message) + }) + .catch((error) => { + this.logger.error(`${LOGPREFIX}`, error) + //this.setPollTargets(targets) + }) + .then(() => { + // unset polling flag + this.polling = false + + // perform another poll, if needed + if (this.getPollTargets().length > 0) { + this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) + this.poll() + } + }) + } + + /** + * @param {string|string[]|null} target + */ + setPollTargets(target) { + // when called without parameter, remove all targets + if (target === undefined) { + this.nextpoll = {} + return + } + + // just a fix + if (target === 'null') + target = null + + if (Array.isArray(target)) { + target.forEach(t => { + this.nextpoll[t] = true + }) + } else { + if (target === null) + this.nextpoll = {} + this.nextpoll[target] = true + } + } + + /** + * @return {string[]} + */ + getPollTargets() { + if (null in this.nextpoll) + return Object.keys(this.targets) + + return Object.keys(this.nextpoll) + } + + /** + * @param {string} target + * @return {boolean} + */ + hasPollTarget(target) { + return target in this.nextpoll || null in this.nextpoll + } + + /** + * @param {string|null|string[]} target + * @param {string} reqstatus + * @param {object} data + * @returns {Promise<{ignored: number, accepted: number, rows: number}>} + */ + async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { + const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` + + // get new jobs in transaction + await db.beginTransaction() + + let sqlFields = `id, status, target, slot` + let sql + if (data.id) { + sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE id=${db.escape(data.id)} FOR UPDATE` + } else { + let targets + if (target === null) { + targets = Object.keys(this.targets) + } else if (!Array.isArray(target)) { + targets = [target] + } else { + targets = target + } + let sqlLimit = workerConfig.mysql_fetch_limit !== 0 ? ` LIMIT 0, ${workerConfig.mysql_fetch_limit}` : '' + let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` + sql = `SELECT ${sqlFields} FROM ${workerConfig.mysql_table} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` + } + + /** @type {object[]} results */ + let results = await db.query(sql) + this.logger.trace(`${LOGPREFIX} query result:`, results) + + /** + * @type {{target: string, slot: string, id: number}[]} + */ + let accepted = [] + + /** + * @type {number[]} + */ + let ignored = [] + + for (let result of results) { + let {id, slot, target, status} = result + id = parseInt(id) + + if (status !== reqstatus) { + this.logger.warn(`${LOGPREFIX} status = ${status} != ${reqstatus}`) + ignored.push(id) + continue + } + + if (!target || this.targets[target] === undefined) { + this.logger.error(`${LOGPREFIX} target '${target}' not found (job id=${id})`) + ignored.push(id) + continue + } + + if (!slot || this.targets[target].slots[slot] === undefined) { + this.logger.error(`${LOGPREFIX} slot '${slot}' of target '${target}' not found (job id=${id})`) + ignored.push(id) + continue + } + + this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + accepted.push({target, slot, id}) + } + + if (accepted.length) + await db.query(`UPDATE ${workerConfig.mysql_table} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) + + if (ignored.length) + await db.query(`UPDATE ${workerConfig.mysql_table} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) + + await db.commit() + + accepted.forEach(({id, target, slot}) => { + let q = this.targets[target].slots[slot].queue + q.push(async (cb) => { + let data = { + code: null, + signal: null, + stdout: '', + stderr: '' + } + let result = RESULT_OK + + try { + await this.setJobStatus(id, STATUS_RUNNING) + + Object.assign(data, (await this.run(id))) + if (data.code !== 0) + result = RESULT_FAIL + } catch (error) { + this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) + result = RESULT_FAIL + data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') + } finally { + this.emit('job-done', { + id, + result, + ...data + }) + + try { + await this.setJobStatus(id, STATUS_DONE, result, data) + } catch (error) { + this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) + } + + cb() + } + }) + }) + + return { + rows: results.length, + accepted: accepted.length, + ignored: ignored.length, + } + } + + /** + * @param {number} id + */ + async run(id) { + let command = workerConfig.launcher.replace(/\{id\}/g, id) + let args = command.split(/ +/) + return new Promise((resolve, reject) => { + this.logger.info(`run(${id}): launching`, args) + + let process = child_process.spawn(args[0], args.slice(1), { + maxBuffer: workerConfig.max_output_buffer + }) + + let stdoutChunks = [] + let stderrChunks = [] + + process.on('exit', + /** + * @param {null|number} code + * @param {null|string} signal + */ + (code, signal) => { + let stdout = stdoutChunks.join('') + let stderr = stderrChunks.join('') + + stdoutChunks = undefined + stderrChunks = undefined + + resolve({ + code, + signal, + stdout, + stderr + }) + }) + + process.on('error', (error) => { + reject(error) + }) + + process.stdout.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stdoutChunks.push(data) + }) + + process.stderr.on('data', (data) => { + if (data instanceof Buffer) + data = data.toString('utf-8') + stderrChunks.push(data) + }) + }) + } + + /** + * @param {number} id + * @param {string} status + * @param {string} result + * @param {object} data + * @return {Promise} + */ + async setJobStatus(id, status, result = RESULT_OK, data = {}) { + let update = { + status, + result + } + switch (status) { + case STATUS_RUNNING: + case STATUS_DONE: + update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() + break + } + if (data.code !== undefined) + update.return_code = data.code + if (data.signal !== undefined) + update.sig = data.signal + if (data.stderr !== undefined) + update.stderr = data.stderr + if (data.stdout !== undefined) + update.stdout = data.stdout + + let list = [] + for (let field in update) { + let val = update[field] + if (val !== null) + val = db.escape(val) + list.push(`${field}=${val}`) + } + + await db.query(`UPDATE ${workerConfig.mysql_table} SET ${list.join(', ')} WHERE id=?`, [id]) + } + + /** + * @param {string[]} inTargets + * @returns {boolean} + */ + hasFreeSlots(inTargets = []) { + const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + + this.logger.debug(`${LOGPREFIX} entered`) + + for (const target in this.targets) { + if (!inTargets.includes(target)) + continue + + for (const slot in this.targets[target].slots) { + const {limit, queue} = this.targets[target].slots[slot] + this.logger.debug(LOGPREFIX, limit, queue.length) + if (queue.length < limit) + return true + } + } + + return false + } + + /** + * @param {string} target + * @param {string} slot + */ + onJobFinished = (target, slot) => { + this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) + const {queue, limit} = this.targets[target].slots[slot] + if (queue.length < limit && this.hasPollTarget(target)) { + this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) + this.poll() + } + } + +} + +module.exports = { + Worker, + STATUS_WAITING, + STATUS_MANUAL, + STATUS_ACCEPTED, + STATUS_IGNORED, + STATUS_RUNNING, + STATUS_DONE, +} \ No newline at end of file -- cgit v1.2.3