aboutsummaryrefslogtreecommitdiff
path: root/src/worker.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
commitfeaa5065f900a9c031ca7d66d80957040e2ee99f (patch)
treec6dbaf2861364725df1c54a8ba3423ca273107e0 /src/worker.js
parent142869948c40900569f339a2177e95a3be3bbdfb (diff)
refactor: moved some files to lib/
Diffstat (limited to 'src/worker.js')
-rw-r--r--src/worker.js478
1 files changed, 0 insertions, 478 deletions
diff --git a/src/worker.js b/src/worker.js
deleted file mode 100644
index b4beeab..0000000
--- a/src/worker.js
+++ /dev/null
@@ -1,478 +0,0 @@
-const Queue = require('queue')
-const child_process = require('child_process')
-const db = require('./db')
-const {timestamp} = require('./util')
-const {getLogger} = require('./logger')
-const EventEmitter = require('events')
-const config = require('./config')
-
-const STATUS_WAITING = 'waiting'
-const STATUS_MANUAL = 'manual'
-const STATUS_ACCEPTED = 'accepted'
-const STATUS_IGNORED = 'ignored'
-const STATUS_RUNNING = 'running'
-const STATUS_DONE = 'done'
-
-const RESULT_OK = 'ok'
-const RESULT_FAIL = 'fail'
-
-class Worker extends EventEmitter {
-
- constructor() {
- super()
-
- /**
- * @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>}
- */
- this.targets = {}
-
- /**
- * @type {boolean}
- */
- this.polling = false
-
- /**
- * @type {boolean}
- */
- this.nextpoll = {}
-
- /**
- * @type {Logger}
- */
- this.logger = getLogger('Worker')
- }
-
- /**
- * @param {string} target
- * @param {string} slot
- * @param {number} limit
- */
- addSlot(target, slot, limit) {
- this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
-
- if (this.targets[target] === undefined)
- this.targets[target] = {slots: {}}
-
- if (this.targets[target].slots[slot] !== undefined)
- throw new Error(`slot ${slot} for target ${target} has already been added`)
-
- let queue = Queue({
- concurrency: limit,
- autostart: true
- })
- queue.on('success', this.onJobFinished.bind(this, target, slot))
- queue.on('error', this.onJobFinished.bind(this, target, slot))
- queue.start()
-
- this.targets[target].slots[slot] = {limit, queue}
- }
-
- /**
- * @param {string} target
- * @returns {boolean}
- */
- hasTarget(target) {
- return (target in this.targets)
- }
-
- /**
- * Returns status of all queues.
- *
- * @return {object}
- */
- getStatus() {
- let status = {targets: {}}
- for (const targetName in this.targets) {
- let target = this.targets[targetName]
- status.targets[targetName] = {}
- for (const slotName in target.slots) {
- const {queue, limit} = target.slots[slotName]
- status.targets[targetName][slotName] = {
- concurrency: queue.concurrency,
- limit,
- length: queue.length,
- }
- }
- }
- return status
- }
-
- /**
- * @return {string[]}
- */
- getTargets() {
- return Object.keys(this.targets)
- }
-
- /**
- *
- */
- poll() {
- const LOGPREFIX = `poll():`
-
- let targets = this.getPollTargets()
- if (!targets.length) {
- this.poller.warn(`${LOGPREFIX} no targets`)
- return
- }
-
- // skip and postpone the poll, if we're in the middle on another poll
- // it will be called again from the last .then() at the end of this method
- if (this.polling) {
- this.logger.debug(`${LOGPREFIX} already polling`)
- return
- }
-
- // skip and postpone the poll, if no free slots
- // it will be called again from onJobFinished()
- if (!this.hasFreeSlots(targets)) {
- this.logger.debug(`${LOGPREFIX} no free slots`)
- return
- }
-
- // set polling flag
- this.polling = true
-
- // clear postponed polls target list
- this.setPollTargets()
-
- this.logger.debug(`${LOGPREFIX} calling getTasks(${JSON.stringify(targets)})`)
- this.getTasks(targets)
- .then(({rows}) => {
- let message = `${LOGPREFIX} ${rows} processed`
- if (config.get('mysql_fetch_limit') && rows >= config.get('mysql_fetch_limit')) {
- // it seems, there are more, so we'll need to perform another query
- this.setPollTargets(targets)
- message += `, scheduling more polls (targets: ${JSON.stringify(this.getPollTargets())})`
- }
- this.logger.debug(message)
- })
- .catch((error) => {
- this.logger.error(`${LOGPREFIX}`, error)
- //this.setPollTargets(targets)
- })
- .then(() => {
- // unset polling flag
- this.polling = false
-
- // perform another poll, if needed
- if (this.getPollTargets().length > 0) {
- this.logger.debug(`${LOGPREFIX} next poll scheduled, calling poll() again`)
- this.poll()
- }
- })
- }
-
- /**
- * @param {string|string[]|null} target
- */
- setPollTargets(target) {
- // when called without parameter, remove all targets
- if (target === undefined) {
- this.nextpoll = {}
- return
- }
-
- // just a fix
- if (target === 'null')
- target = null
-
- if (Array.isArray(target)) {
- target.forEach(t => {
- this.nextpoll[t] = true
- })
- } else {
- if (target === null)
- this.nextpoll = {}
- this.nextpoll[target] = true
- }
- }
-
- /**
- * @return {string[]}
- */
- getPollTargets() {
- if (null in this.nextpoll)
- return Object.keys(this.targets)
-
- return Object.keys(this.nextpoll)
- }
-
- /**
- * @param {string} target
- * @return {boolean}
- */
- hasPollTarget(target) {
- return target in this.nextpoll || null in this.nextpoll
- }
-
- /**
- * @param {string|null|string[]} target
- * @param {string} reqstatus
- * @param {object} data
- * @returns {Promise<{ignored: number, accepted: number, rows: number}>}
- */
- async getTasks(target = null, reqstatus = STATUS_WAITING, data = {}) {
- const LOGPREFIX = `getTasks(${JSON.stringify(target)}, '${reqstatus}', ${JSON.stringify(data)}):`
-
- // get new jobs in transaction
- await db.beginTransaction()
-
- let error = null
-
- let sqlFields = `id, status, target, slot`
- let sql
- if (data.id) {
- sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id=${db.escape(data.id)} FOR UPDATE`
- } else {
- let targets
- if (target === null) {
- targets = Object.keys(this.targets)
- } else if (!Array.isArray(target)) {
- targets = [target]
- } else {
- targets = target
- }
- let sqlLimit = config.get('mysql_fetch_limit') !== 0 ? ` LIMIT 0, ${config.get('mysql_fetch_limit')}` : ''
- let sqlWhere = `status=${db.escape(reqstatus)} AND target IN (`+targets.map(db.escape).join(',')+`)`
- sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE ${sqlWhere} ORDER BY id ${sqlLimit} FOR UPDATE`
- }
-
- /** @type {object[]} results */
- let results = await db.query(sql)
- this.logger.trace(`${LOGPREFIX} query result:`, results)
-
- /**
- * @type {{target: string, slot: string, id: number}[]}
- */
- let accepted = []
-
- /**
- * @type {number[]}
- */
- let ignored = []
-
- for (let result of results) {
- let {id, slot, target, status} = result
- id = parseInt(id)
-
- if (status !== reqstatus) {
- error = `status = ${status} != ${reqstatus}`
- this.logger.warn(`${LOGPREFIX} ${error}`)
- ignored.push(id)
- continue
- }
-
- if (!target || this.targets[target] === undefined) {
- error = `target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
- continue
- }
-
- if (!slot || this.targets[target].slots[slot] === undefined) {
- error = `slot '${slot}' of target '${target}' not found (job id=${id})`
- this.logger.error(`${LOGPREFIX} ${error}`)
- ignored.push(id)
- continue
- }
-
- this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
- accepted.push({target, slot, id})
- }
-
- if (accepted.length)
- await db.query(`UPDATE ${config.get('mysql_table')} SET status='accepted' WHERE id IN (`+accepted.map(j => j.id).join(',')+`)`)
-
- if (ignored.length)
- await db.query(`UPDATE ${config.get('mysql_table')} SET status='ignored' WHERE id IN (`+ignored.join(',')+`)`)
-
- await db.commit()
-
- accepted.forEach(({id, target, slot}) => {
- let q = this.targets[target].slots[slot].queue
- q.push(async (cb) => {
- let data = {
- code: null,
- signal: null,
- stdout: '',
- stderr: ''
- }
- let result = RESULT_OK
-
- try {
- await this.setJobStatus(id, STATUS_RUNNING)
-
- Object.assign(data, (await this.run(id)))
- if (data.code !== 0)
- result = RESULT_FAIL
- } catch (error) {
- this.logger.error(`${LOGPREFIX} job ${id}: error while run():`, error)
- result = RESULT_FAIL
- data.stderr = (error instanceof Error) ? (error.message + '\n' + error.stack) : (error + '')
- } finally {
- this.emit('job-done', {
- id,
- result,
- ...data
- })
-
- try {
- await this.setJobStatus(id, STATUS_DONE, result, data)
- } catch (error) {
- this.logger.error(`${LOGPREFIX} setJobStatus(${id})`, error)
- }
-
- cb()
- }
- })
- })
-
- return {
- error,
- rows: results.length,
- accepted: accepted.length,
- ignored: ignored.length,
- }
- }
-
- /**
- * @param {number} id
- */
- async run(id) {
- let command = config.get('launcher').replace(/\{id\}/g, id)
- let args = command.split(/ +/)
- return new Promise((resolve, reject) => {
- this.logger.info(`run(${id}): launching`, args)
-
- let process = child_process.spawn(args[0], args.slice(1), {
- maxBuffer: config.get('max_output_buffer')
- })
-
- let stdoutChunks = []
- let stderrChunks = []
-
- process.on('exit',
- /**
- * @param {null|number} code
- * @param {null|string} signal
- */
- (code, signal) => {
- let stdout = stdoutChunks.join('')
- let stderr = stderrChunks.join('')
-
- stdoutChunks = undefined
- stderrChunks = undefined
-
- resolve({
- code,
- signal,
- stdout,
- stderr
- })
- })
-
- process.on('error', (error) => {
- reject(error)
- })
-
- process.stdout.on('data', (data) => {
- if (data instanceof Buffer)
- data = data.toString('utf-8')
- stdoutChunks.push(data)
- })
-
- process.stderr.on('data', (data) => {
- if (data instanceof Buffer)
- data = data.toString('utf-8')
- stderrChunks.push(data)
- })
- })
- }
-
- /**
- * @param {number} id
- * @param {string} status
- * @param {string} result
- * @param {object} data
- * @return {Promise<void>}
- */
- async setJobStatus(id, status, result = RESULT_OK, data = {}) {
- let update = {
- status,
- result
- }
- switch (status) {
- case STATUS_RUNNING:
- case STATUS_DONE:
- update[status === STATUS_RUNNING ? 'time_started' : 'time_finished'] = timestamp()
- break
- }
- if (data.code !== undefined)
- update.return_code = data.code
- if (data.signal !== undefined)
- update.sig = data.signal
- if (data.stderr !== undefined)
- update.stderr = data.stderr
- if (data.stdout !== undefined)
- update.stdout = data.stdout
-
- let list = []
- for (let field in update) {
- let val = update[field]
- if (val !== null)
- val = db.escape(val)
- list.push(`${field}=${val}`)
- }
-
- await db.query(`UPDATE ${config.get('mysql_table')} SET ${list.join(', ')} WHERE id=?`, [id])
- }
-
- /**
- * @param {string[]} inTargets
- * @returns {boolean}
- */
- hasFreeSlots(inTargets = []) {
- const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):`
-
- this.logger.debug(`${LOGPREFIX} entered`)
-
- for (const target in this.targets) {
- if (!inTargets.includes(target))
- continue
-
- for (const slot in this.targets[target].slots) {
- const {limit, queue} = this.targets[target].slots[slot]
- this.logger.debug(LOGPREFIX, limit, queue.length)
- if (queue.length < limit)
- return true
- }
- }
-
- return false
- }
-
- /**
- * @param {string} target
- * @param {string} slot
- */
- onJobFinished = (target, slot) => {
- this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
- const {queue, limit} = this.targets[target].slots[slot]
- if (queue.length < limit && this.hasPollTarget(target)) {
- this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`)
- this.poll()
- }
- }
-
-}
-
-module.exports = {
- Worker,
- STATUS_WAITING,
- STATUS_MANUAL,
- STATUS_ACCEPTED,
- STATUS_IGNORED,
- STATUS_RUNNING,
- STATUS_DONE,
-} \ No newline at end of file