diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/jobd.js | 18 | ||||
-rw-r--r-- | src/lib/config.js | 31 | ||||
-rw-r--r-- | src/lib/worker.js | 149 |
3 files changed, 84 insertions, 114 deletions
diff --git a/src/jobd.js b/src/jobd.js index 477934f..f8be71f 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -107,16 +107,13 @@ async function initApp(appName) { function initWorker() { worker = new Worker() - for (let targetName in config.get('targets')) { - let slots = config.get('targets')[targetName].slots - // let target = new Target({name: targetName}) - // queue.addTarget(target) - - for (let slotName in slots) { - let slotLimit = slots[slotName] - worker.addSlot(targetName, slotName, slotLimit) - } + + const targets = config.get('targets') + for (const target in targets) { + let limit = targets[target] + worker.addTarget(target, limit) } + worker.on('job-done', (data) => { if (jobPromises[data.id] !== undefined) { const P = jobPromises[data.id] @@ -186,11 +183,10 @@ function onPollRequest(data, requestNo, connection) { * @param {Connection} connection */ function onStatus(data, requestNo, connection) { - const qs = worker.getStatus() connection.send( new ResponseMessage(requestNo) .setData({ - targets: qs.targets, + targets: worker.getStatus(), jobPromisesCount: Object.keys(jobPromises).length, memoryUsage: process.memoryUsage() }) diff --git a/src/lib/config.js b/src/lib/config.js index eb1135d..ac711fd 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -13,7 +13,7 @@ function readFile(file) { function processScheme(source, scheme) { const result = {} - + for (let key in scheme) { let opts = scheme[key] let ne = !(key in source) || !source[key] @@ -34,11 +34,17 @@ function processScheme(source, scheme) { throw new Error(`'${key}' must be a float`) value = parseFloat(value) break + + case 'object': + if (typeof value !== 'object') + throw new Error(`'${key}' must be an object`) + + break } result[key] = value } - + return result } @@ -69,26 +75,23 @@ function parseWorkerConfig(file) { launcher: {required: true}, max_output_buffer: {default: 1024*1024, type: 'int'}, + targets: {required: true, type: 'object'}, } Object.assign(config, processScheme(raw, scheme)) config.targets = {} - - // targets - for (let target in raw) { + for (let target in raw.targets) { if (target === 'null') throw new Error('word \'null\' is reserved, please don\'t use it as a target name') - if (typeof raw[target] !== 'object') - continue + if (!isNumeric(raw.targets[target])) + throw new Error(`value of target '${target}' must be a number`) - config.targets[target] = {slots: {}} - for (let slotName in raw[target]) { - let slotLimit = parseInt(raw[target][slotName], 10) - if (slotLimit < 1) - throw new Error(`${target}: slot ${slotName} has invalid limit`) - config.targets[target].slots[slotName] = slotLimit - } + let value = parseInt(raw.targets[target], 10) + if (value < 1) + throw new Error(`target '${target}' has invalid value`) + + config.targets[target] = value } } diff --git a/src/lib/worker.js b/src/lib/worker.js index 562e0b1..beba7f9 100644 --- a/src/lib/worker.js +++ b/src/lib/worker.js @@ -26,7 +26,7 @@ class Worker extends EventEmitter { super() /** - * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>} + * @type {object.<string, {queue: Queue, paused: boolean}>} */ this.targets = {} @@ -50,30 +50,26 @@ class Worker extends EventEmitter { * Creates new queue. * * @param {string} target - * @param {string} slot * @param {number} limit */ - addSlot(target, slot, limit) { - this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`) + addTarget(target, limit) { + this.logger.debug(`addTarget: adding target' ${target}', limit = ${limit}`) - if (this.targets[target] === undefined) - this.targets[target] = { - slots: {}, - paused: false - } - - if (this.targets[target].slots[slot] !== undefined) - throw new Error(`slot ${slot} for target ${target} has already been added`) + if (target in this.targets) + throw new Error(`target '${target}' already added`) let queue = Queue({ concurrency: limit, autostart: true }) - queue.on('success', this.onJobFinished.bind(this, target, slot)) - queue.on('error', this.onJobFinished.bind(this, target, slot)) + queue.on('success', this.onJobFinished.bind(this, target)) + queue.on('error', this.onJobFinished.bind(this, target)) queue.start() - this.targets[target].slots[slot] = queue + this.targets[target] = { + paused: false, + queue + } } /** @@ -85,19 +81,17 @@ class Worker extends EventEmitter { if (targets === null) targets = this.getTargets() - for (const targetName of targets) { - const target = this.targets[targetName] - if (target.paused) { - this.logger.warn(`pauseTargets: ${targetName} is already paused`) + for (const target of targets) { + const {queue, paused} = this.targets[target] + if (paused) { + this.logger.warn(`pauseTargets: ${target} is already paused`) continue } - for (const slotName in target.slots) { - this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`) - target.slots[slotName].stop() - } + this.logger.debug(`pauseTargets: stopping ${target}`) + queue.stop() - target.paused = true + this.targets[target].paused = true } } @@ -110,19 +104,17 @@ class Worker extends EventEmitter { if (targets === null) targets = this.getTargets() - for (const targetName of targets) { - const target = this.targets[targetName] - if (!target.paused) { - this.logger.warn(`continueTargets: ${targetName} is not paused`) + for (const target of targets) { + const {queue, paused} = this.targets[target] + if (!paused) { + this.logger.warn(`continueTargets: ${target} is not paused`) continue } - for (const slotName in target.slots) { - this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`) - target.slots[slotName].start() - } + this.logger.debug(`pauseTargets: starting ${target}`) + queue.start() - target.paused = false + this.targets[target].paused = false } } @@ -142,19 +134,16 @@ class Worker extends EventEmitter { * @return {object} */ getStatus() { - let status = {targets: {}} - for (const targetName in this.targets) { - let target = this.targets[targetName] - status.targets[targetName] = { - paused: target.paused, - slots: {} - } - for (const slotName in target.slots) { - const queue = target.slots[slotName] - status.targets[targetName].slots[slotName] = { - concurrency: queue.concurrency, - length: queue.length, - } + let status = {} + for (const target in this.targets) { + if (!this.targets.hasOwnProperty(target)) + continue + + const {queue, paused} = this.targets[target] + status[target] = { + paused, + concurrency: queue.concurrency, + length: queue.length, } } return status @@ -188,10 +177,10 @@ class Worker extends EventEmitter { return } - // skip and postpone the poll, if no free slots + // skip and postpone the poll, if no free targets // it will be called again from onJobFinished() - if (!this.hasFreeSlots(targets)) { - this.logger.debug(`${LOGPREFIX} no free slots`) + if (!this.hasFreeTargets(targets)) { + this.logger.debug(`${LOGPREFIX} no free targets`) return } @@ -279,7 +268,7 @@ class Worker extends EventEmitter { * @param {{ids: number[]}} data * @returns * {Promise<{ - * results: Map<number, {status: number, reason: string, slot: string, target: string}>, + * results: Map<number, {status: number, reason: string, target: string}>, * rowsCount: number * }>} */ @@ -290,11 +279,11 @@ class Worker extends EventEmitter { await db.beginTransaction() /** - * @type {Map<number, {status: number, reason: string, slot: string, target: string}>} + * @type {Map<number, {status: number, reason: string, target: string}>} */ const jobsResults = new Map() - let sqlFields = `id, status, target, slot` + let sqlFields = `id, status, target` let sql if (data.ids) { sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE` @@ -318,7 +307,6 @@ class Worker extends EventEmitter { for (let result of rows) { const id = parseInt(result.id) - const slot = String(result.slot) const target = String(result.target) const status = String(result.status) @@ -333,7 +321,7 @@ class Worker extends EventEmitter { continue } - if (!target || this.targets[target] === undefined) { + if (!target || !(target in this.targets)) { let reason = `target '${target}' not found (job id=${id})` jobsResults.set(id, { result: JOB_IGNORED, @@ -344,23 +332,11 @@ class Worker extends EventEmitter { continue } - if (!slot || this.targets[target].slots[slot] === undefined) { - let reason = `slot '${slot}' of target '${target}' not found (job id=${id})` - jobsResults.set(id, { - result: JOB_IGNORED, - reason - }) - - this.logger.error(`${LOGPREFIX} ${reason}`) - continue - } - - this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`) + this.logger.debug(`${LOGPREFIX} accepted target='${target}', id=${id}`) jobsResults.set(id, { result: JOB_ACCEPTED, - target, - slot + target }) } @@ -400,8 +376,8 @@ class Worker extends EventEmitter { if (result !== JOB_ACCEPTED) continue - const {slot, target} = jobResult - this.enqueueJob(id, target, slot) + const {target} = jobResult + this.enqueueJob(id, target) } return { @@ -415,10 +391,9 @@ class Worker extends EventEmitter { * * @param {int} id * @param {string} target - * @param {string} slot */ - enqueueJob(id, target, slot) { - const queue = this.targets[target].slots[slot] + enqueueJob(id, target) { + const queue = this.targets[target].queue queue.push(async (cb) => { let data = { code: null, @@ -556,21 +531,20 @@ class Worker extends EventEmitter { * @param {string[]} inTargets * @returns {boolean} */ - hasFreeSlots(inTargets = []) { - const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):` + hasFreeTargets(inTargets = []) { + const LOGPREFIX = `hasFreeTargets(${JSON.stringify(inTargets)}):` this.logger.debug(`${LOGPREFIX} entered`) for (const target in this.targets) { - if (!inTargets.includes(target)) + if (!this.targets.hasOwnProperty(target) || !inTargets.includes(target)) continue - for (const slot in this.targets[target].slots) { - const queue = this.targets[target].slots[slot] - this.logger.debug(LOGPREFIX, queue.concurrency, queue.length) - if (queue.length < queue.concurrency) - return true - } + const {paused, queue} = this.targets[target] + this.logger.trace(LOGPREFIX, target, queue.concurrency, queue.length) + + if (queue.length < queue.concurrency) + return true } return false @@ -578,15 +552,12 @@ class Worker extends EventEmitter { /** * @param {string} target - * @param {string} slot */ - onJobFinished = (target, slot) => { - this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`) - - const targetPaused = this.targets[target].paused - const queue = this.targets[target].slots[slot] + onJobFinished = (target) => { + this.logger.debug(`onJobFinished: target=${target}`) - if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) { + const {paused, queue} = this.targets[target] + if (!paused && queue.length < queue.concurrency && this.hasPollTarget(target)) { this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`) this.poll() } |