diff options
-rw-r--r-- | src/lib/worker.js | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/src/lib/worker.js b/src/lib/worker.js index 3713202..a271a31 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object.<string, {slots: object.<string, {limit: number, queue: Queue}>}>} + * @type {object.<string, {slots: object.<string, Queue>}>} */ this.targets = {} @@ -70,7 +70,7 @@ class Worker extends EventEmitter { queue.on('error', this.onJobFinished.bind(this, target, slot)) queue.start() - this.targets[target].slots[slot] = {limit, queue} + this.targets[target].slots[slot] = queue } /** @@ -94,10 +94,9 @@ class Worker extends EventEmitter { let target = this.targets[targetName] status.targets[targetName] = {} for (const slotName in target.slots) { - const {queue, limit} = target.slots[slotName] + const queue = target.slots[slotName] status.targets[targetName][slotName] = { concurrency: queue.concurrency, - limit, length: queue.length, } } @@ -363,7 +362,7 @@ class Worker extends EventEmitter { * @param {string} slot */ enqueueJob(id, target, slot) { - const queue = this.targets[target].slots[slot].queue + const queue = this.targets[target].slots[slot] queue.push(async (cb) => { let data = { code: null, @@ -511,9 +510,9 @@ class Worker extends EventEmitter { continue for (const slot in this.targets[target].slots) { - const {limit, queue} = this.targets[target].slots[slot] - this.logger.debug(LOGPREFIX, limit, queue.length) - if (queue.length < limit) + const queue = this.targets[target].slots[slot] + this.logger.debug(LOGPREFIX, queue.concurrency, queue.length) + if (queue.length < queue.concurrency) return true } } @@ -528,9 +527,9 @@ class Worker extends EventEmitter { onJobFinished = (target, slot) => { this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) - const {queue, limit} = this.targets[target].slots[slot] - if (queue.length < limit && this.hasPollTarget(target)) { - this.logger.debug(`onJobFinished: ${queue.length} < ${limit}, calling poll(${target})`) + const queue = this.targets[target].slots[slot] + if (queue.length < queue.concurrency && this.hasPollTarget(target)) { + this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } } |