From 61ceac1e5643200a75e5fb6287473c6e2316b060 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Fri, 5 Mar 2021 02:49:32 +0300 Subject: drop slots, leave only targets. change config format. --- src/lib/worker.js | 149 ++++++++++++++++++++++-------------------------------- 1 file changed, 60 insertions(+), 89 deletions(-) (limited to 'src/lib/worker.js') diff --git a/src/lib/worker.js b/src/lib/worker.js index 562e0b1..beba7f9 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object., paused: boolean}>} + * @type {object.} */ this.targets = {} @@ -50,30 +50,26 @@ class Worker extends EventEmitter { * Creates new queue. * * @param {string} target - * @param {string} slot * @param {number} limit */ - addSlot(target, slot, limit) { - this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + addTarget(target, limit) { + this.logger.debug(`addTarget: adding target' ${target}', limit = ${limit}`) - if (this.targets[target] === undefined) - this.targets[target] = { - slots: {}, - paused: false - } - - if (this.targets[target].slots[slot] !== undefined) - throw new Error(`slot ${slot} for target ${target} has already been added`) + if (target in this.targets) + throw new Error(`target '${target}' already added`) let queue = Queue({ concurrency: limit, autostart: true }) - queue.on('success', this.onJobFinished.bind(this, target, slot)) - queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.on('success', this.onJobFinished.bind(this, target)) + queue.on('error', this.onJobFinished.bind(this, target)) queue.start() - this.targets[target].slots[slot] = queue + this.targets[target] = { + paused: false, + queue + } } /** @@ -85,19 +81,17 @@ class Worker extends EventEmitter { if (targets === null) targets = this.getTargets() - for (const targetName of targets) { - const target = this.targets[targetName] - if (target.paused) { - this.logger.warn(`pauseTargets: ${targetName} is already paused`) + for (const target of targets) { + const {queue, paused} = this.targets[target] + if (paused) { + this.logger.warn(`pauseTargets: ${target} is already paused`) continue } - for (const slotName in target.slots) { - this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`) - target.slots[slotName].stop() - } + this.logger.debug(`pauseTargets: stopping ${target}`) + queue.stop() - target.paused = true + this.targets[target].paused = true } } @@ -110,19 +104,17 @@ class Worker extends EventEmitter { if (targets === null) targets = this.getTargets() - for (const targetName of targets) { - const target = this.targets[targetName] - if (!target.paused) { - this.logger.warn(`continueTargets: ${targetName} is not paused`) + for (const target of targets) { + const {queue, paused} = this.targets[target] + if (!paused) { + this.logger.warn(`continueTargets: ${target} is not paused`) continue } - for (const slotName in target.slots) { - this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`) - target.slots[slotName].start() - } + this.logger.debug(`pauseTargets: starting ${target}`) + queue.start() - target.paused = false + this.targets[target].paused = false } } @@ -142,19 +134,16 @@ class Worker extends EventEmitter { * @return {object} */ getStatus() { - let status = {targets: {}} - for (const targetName in this.targets) { - let target = this.targets[targetName] - status.targets[targetName] = { - paused: target.paused, - slots: {} - } - for (const slotName in target.slots) { - const queue = target.slots[slotName] - status.targets[targetName].slots[slotName] = { - concurrency: queue.concurrency, - length: queue.length, - } + let status = {} + for (const target in this.targets) { + if (!this.targets.hasOwnProperty(target)) + continue + + const {queue, paused} = this.targets[target] + status[target] = { + paused, + concurrency: queue.concurrency, + length: queue.length, } } return status @@ -188,10 +177,10 @@ class Worker extends EventEmitter { return } - // skip and postpone the poll, if no free slots + // skip and postpone the poll, if no free targets // it will be called again from onJobFinished() - if (!this.hasFreeSlots(targets)) { - this.logger.debug(`${LOGPREFIX} no free slots`) + if (!this.hasFreeTargets(targets)) { + this.logger.debug(`${LOGPREFIX} no free targets`) return } @@ -279,7 +268,7 @@ class Worker extends EventEmitter { * @param {{ids: number[]}} data * @returns * {Promise<{ - * results: Map, + * results: Map, * rowsCount: number * }>} */ @@ -290,11 +279,11 @@ class Worker extends EventEmitter { await db.beginTransaction() /** - * @type {Map} + * @type {Map} */ const jobsResults = new Map() - let sqlFields = `id, status, target, slot` + let sqlFields = `id, status, target` let sql if (data.ids) { sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE` @@ -318,7 +307,6 @@ class Worker extends EventEmitter { for (let result of rows) { const id = parseInt(result.id) - const slot = String(result.slot) const target = String(result.target) const status = String(result.status) @@ -333,7 +321,7 @@ class Worker extends EventEmitter { continue } - if (!target || this.targets[target] === undefined) { + if (!target || !(target in this.targets)) { let reason = `target '${target}' not found (job id=${id})` jobsResults.set(id, { result: JOB_IGNORED, @@ -344,23 +332,11 @@ class Worker extends EventEmitter { continue } - if (!slot || this.targets[target].slots[slot] === undefined) { - let reason = `slot '${slot}' of target '${target}' not found (job id=${id})` - jobsResults.set(id, { - result: JOB_IGNORED, - reason - }) - - this.logger.error(`${LOGPREFIX} ${reason}`) - continue - } - - this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + this.logger.debug(`${LOGPREFIX} accepted target='${target}', id=${id}`) jobsResults.set(id, { result: JOB_ACCEPTED, - target, - slot + target }) } @@ -400,8 +376,8 @@ class Worker extends EventEmitter { if (result !== JOB_ACCEPTED) continue - const {slot, target} = jobResult - this.enqueueJob(id, target, slot) + const {target} = jobResult + this.enqueueJob(id, target) } return { @@ -415,10 +391,9 @@ class Worker extends EventEmitter { * * @param {int} id * @param {string} target - * @param {string} slot */ - enqueueJob(id, target, slot) { - const queue = this.targets[target].slots[slot] + enqueueJob(id, target) { + const queue = this.targets[target].queue queue.push(async (cb) => { let data = { code: null, @@ -556,21 +531,20 @@ class Worker extends EventEmitter { * @param {string[]} inTargets * @returns {boolean} */ - hasFreeSlots(inTargets = []) { - const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + hasFreeTargets(inTargets = []) { + const LOGPREFIX = `hasFreeTargets(${JSON.stringify(inTargets)}):` this.logger.debug(`${LOGPREFIX} entered`) for (const target in this.targets) { - if (!inTargets.includes(target)) + if (!this.targets.hasOwnProperty(target) || !inTargets.includes(target)) continue - for (const slot in this.targets[target].slots) { - const queue = this.targets[target].slots[slot] - this.logger.debug(LOGPREFIX, queue.concurrency, queue.length) - if (queue.length < queue.concurrency) - return true - } + const {paused, queue} = this.targets[target] + this.logger.trace(LOGPREFIX, target, queue.concurrency, queue.length) + + if (queue.length < queue.concurrency) + return true } return false @@ -578,15 +552,12 @@ class Worker extends EventEmitter { /** * @param {string} target - * @param {string} slot */ - onJobFinished = (target, slot) => { - this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) - - const targetPaused = this.targets[target].paused - const queue = this.targets[target].slots[slot] + onJobFinished = (target) => { + this.logger.debug(`onJobFinished: target=${target}`) - if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) { + const {paused, queue} = this.targets[target] + if (!paused && queue.length < queue.concurrency && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } -- cgit v1.2.3