diff options
author | Evgeny Zinoviev <me@ch1p.io> | 2021-02-28 16:11:57 +0300 |
---|---|---|
committer | Evgeny Zinoviev <me@ch1p.io> | 2021-02-28 16:11:57 +0300 |
commit | feaa5065f900a9c031ca7d66d80957040e2ee99f (patch) | |
tree | c6dbaf2861364725df1c54a8ba3423ca273107e0 /src/worker.js | |
parent | 142869948c40900569f339a2177e95a3be3bbdfb (diff) |
refactor: moved some files to lib/
Diffstat (limited to 'src/worker.js')
-rw-r--r-- | src/worker.js | 478 |
1 files changed, 0 insertions, 478 deletions
diff --git a/src/worker.js b/src/worker.js deleted file mode 100644 index b4beeab..0000000 --- a/src/worker.js +++ /dev/null @@ -1,478 +0,0 @@ -const Queue = require('queue') -const child_process = require('child_process') -const db = require('./db') -const {timestamp} = require('./util') -const {getLogger} = require('./logger') -const EventEmitter = require('events') -const config = require('./config') - -const STATUS_WAITING = 'waiting' -const STATUS_MANUAL = 'manual' -const STATUS_ACCEPTED = 'accepted' -const STATUS_IGNORED = 'ignored' -const STATUS_RUNNING = 'running' -const STATUS_DONE = 'done' - -const RESULT_OK = 'ok' -const RESULT_FAIL = 'fail' - -class Worker extends EventEmitter { - - constructor() { - super() - - /** - * @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>} - */ - this.targets = {} - - /** - * @type {boolean} - */ - this.polling = false - - /** - * @type {boolean} - */ - this.nextpoll = {} - - /** - * @type {Logger} - */ - this.logger = getLogger('Worker') - } - - /** - * @param {string} target - * @param {string} slot - * @param {number} limit - */ - addSlot(target, slot, limit) { - this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) - - if (this.targets[target] === undefined) - this.targets[target] = {slots: {}} - - if (this.targets[target].slots[slot] !== undefined) - throw new Error(`slot ${slot} for target ${target} has already been added`) - - let queue = Queue({ - concurrency: limit, - autostart: true - }) - queue.on('success', this.onJobFinished.bind(this, target, slot)) - queue.on('error', this.onJobFinished.bind(this, target, slot)) - queue.start() - - this.targets[target].slots[slot] = {limit, queue} - } - - /** - * @param {string} target - * @returns {boolean} - */ - hasTarget(target) { - return (target in this.targets) - } - - /** - * Returns status of all queues. - * - * @return {object} - */ - getStatus() { - let status = {targets: {}} - for (const targetName in this.targets) { - let target = this.targets[targetName] - status.targets[targetName] = {} - for (const slotName in target.slots) { - const {queue, limit} = target.slots[slotName] - status.targets[targetName][slotName] = { - concurrency: queue.concurrency, - limit, - length: queue.length, - } - } - } - return status - } - - /** - * @return {string[]} - */ - getTargets() { - return Object.keys(this.targets) - } - - /** - * - */ - poll() { - const LOGPREFIX = `poll():` - - let targets = this.getPollTargets() - if (!targets.length) { - this.poller.warn(`${LOGPREFIX} no targets`) - return - } - - // skip and postpone the poll, if we're in the middle on another poll - // it will be called again from the last .then() at the end of this method - if (this.polling) { - this.logger.debug(`${LOGPREFIX} already polling`) - return - } - - // skip and postpone the poll, if no free slots - // it will be called again from onJobFinished() - if (!this.hasFreeSlots(targets)) { - this.logger.debug(`${LOGPREFIX} no free slots`) - return - } - - // set polling flag - this.polling = true - - // clear postponed polls target list - this.setPollTargets() - - this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`) - this.getTasks(targets) - .then(({rows}) => { - let message = `${LOGPREFIX} ${rows} processed` - if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) { - // it seems, there are more, so we'll need to perform another query - this.setPollTargets(targets) - message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})` - } - this.logger.debug(message) - }) - .catch((error) => { - this.logger.error(`${LOGPREFIX}`, error) - //this.setPollTargets(targets) - }) - .then(() => { - // unset polling flag - this.polling = false - - // perform another poll, if needed - if (this.getPollTargets().length > 0) { - this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`) - this.poll() - } - }) - } - - /** - * @param {string|string[]|null} target - */ - setPollTargets(target) { - // when called without parameter, remove all targets - if (target === undefined) { - this.nextpoll = {} - return - } - - // just a fix - if (target === 'null') - target = null - - if (Array.isArray(target)) { - target.forEach(t => { - this.nextpoll[t] = true - }) - } else { - if (target === null) - this.nextpoll = {} - this.nextpoll[target] = true - } - } - - /** - * @return {string[]} - */ - getPollTargets() { - if (null in this.nextpoll) - return Object.keys(this.targets) - - return Object.keys(this.nextpoll) - } - - /** - * @param {string} target - * @return {boolean} - */ - hasPollTarget(target) { - return target in this.nextpoll || null in this.nextpoll - } - - /** - * @param {string|null|string[]} target - * @param {string} reqstatus - * @param {object} data - * @returns {Promise<{ignored: number, accepted: number, rows: number}>} - */ - async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) { - const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):` - - // get new jobs in transaction - await db.beginTransaction() - - let error = null - - let sqlFields = `id, status, target, slot` - let sql - if (data.id) { - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE` - } else { - let targets - if (target === null) { - targets = Object.keys(this.targets) - } else if (!Array.isArray(target)) { - targets = [target] - } else { - targets = target - } - let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : '' - let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)` - sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE` - } - - /** @type {object[]} results */ - let results = await db.query(sql) - this.logger.trace(`${LOGPREFIX} query result:`, results) - - /** - * @type {{target: string, slot: string, id: number}[]} - */ - let accepted = [] - - /** - * @type {number[]} - */ - let ignored = [] - - for (let result of results) { - let {id, slot, target, status} = result - id = parseInt(id) - - if (status !== reqstatus) { - error = `status = ${status} != ${reqstatus}` - this.logger.warn(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - if (!target || this.targets[target] === undefined) { - error = `target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - if (!slot || this.targets[target].slots[slot] === undefined) { - error = `slot '${slot}' of target '${target}' not found (job id=${id})` - this.logger.error(`${LOGPREFIX} ${error}`) - ignored.push(id) - continue - } - - this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) - accepted.push({target, slot, id}) - } - - if (accepted.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`) - - if (ignored.length) - await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`) - - await db.commit() - - accepted.forEach(({id, target, slot}) => { - let q = this.targets[target].slots[slot].queue - q.push(async (cb) => { - let data = { - code: null, - signal: null, - stdout: '', - stderr: '' - } - let result = RESULT_OK - - try { - await this.setJobStatus(id, STATUS_RUNNING) - - Object.assign(data, (await this.run(id))) - if (data.code !== 0) - result = RESULT_FAIL - } catch (error) { - this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error) - result = RESULT_FAIL - data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '') - } finally { - this.emit('job-done', { - id, - result, - ...data - }) - - try { - await this.setJobStatus(id, STATUS_DONE, result, data) - } catch (error) { - this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error) - } - - cb() - } - }) - }) - - return { - error, - rows: results.length, - accepted: accepted.length, - ignored: ignored.length, - } - } - - /** - * @param {number} id - */ - async run(id) { - let command = config.get('launcher').replace(/\{id\}/g, id) - let args = command.split(/ +/) - return new Promise((resolve, reject) => { - this.logger.info(`run(${id}): launching`, args) - - let process = child_process.spawn(args[0], args.slice(1), { - maxBuffer: config.get('max_output_buffer') - }) - - let stdoutChunks = [] - let stderrChunks = [] - - process.on('exit', - /** - * @param {null|number} code - * @param {null|string} signal - */ - (code, signal) => { - let stdout = stdoutChunks.join('') - let stderr = stderrChunks.join('') - - stdoutChunks = undefined - stderrChunks = undefined - - resolve({ - code, - signal, - stdout, - stderr - }) - }) - - process.on('error', (error) => { - reject(error) - }) - - process.stdout.on('data', (data) => { - if (data instanceof Buffer) - data = data.toString('utf-8') - stdoutChunks.push(data) - }) - - process.stderr.on('data', (data) => { - if (data instanceof Buffer) - data = data.toString('utf-8') - stderrChunks.push(data) - }) - }) - } - - /** - * @param {number} id - * @param {string} status - * @param {string} result - * @param {object} data - * @return {Promise<void>} - */ - async setJobStatus(id, status, result = RESULT_OK, data = {}) { - let update = { - status, - result - } - switch (status) { - case STATUS_RUNNING: - case STATUS_DONE: - update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp() - break - } - if (data.code !== undefined) - update.return_code = data.code - if (data.signal !== undefined) - update.sig = data.signal - if (data.stderr !== undefined) - update.stderr = data.stderr - if (data.stdout !== undefined) - update.stdout = data.stdout - - let list = [] - for (let field in update) { - let val = update[field] - if (val !== null) - val = db.escape(val) - list.push(`${field}=${val}`) - } - - await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id]) - } - - /** - * @param {string[]} inTargets - * @returns {boolean} - */ - hasFreeSlots(inTargets = []) { - const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` - - this.logger.debug(`${LOGPREFIX} entered`) - - for (const target in this.targets) { - if (!inTargets.includes(target)) - continue - - for (const slot in this.targets[target].slots) { - const {limit, queue} = this.targets[target].slots[slot] - this.logger.debug(LOGPREFIX, limit, queue.length) - if (queue.length < limit) - return true - } - } - - return false - } - - /** - * @param {string} target - * @param {string} slot - */ - onJobFinished = (target, slot) => { - this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) - const {queue, limit} = this.targets[target].slots[slot] - if (queue.length < limit && this.hasPollTarget(target)) { - this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) - this.poll() - } - } - -} - -module.exports = { - Worker, - STATUS_WAITING, - STATUS_MANUAL, - STATUS_ACCEPTED, - STATUS_IGNORED, - STATUS_RUNNING, - STATUS_DONE, -}
\ No newline at end of file |