summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/jobd.js18
-rw-r--r--src/lib/config.js31
-rw-r--r--src/lib/worker.js149
3 files changed, 84 insertions, 114 deletions
diff --git a/src/jobd.js b/src/jobd.js
index 477934f..f8be71f 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -107,16 +107,13 @@ async function initApp(appName) {
function initWorker() {
worker = new Worker()
- for (let targetName in config.get('targets')) {
- let slots = config.get('targets')[targetName].slots
- // let target = new Target({name: targetName})
- // queue.addTarget(target)
-
- for (let slotName in slots) {
- let slotLimit = slots[slotName]
- worker.addSlot(targetName, slotName, slotLimit)
- }
+
+ const targets = config.get('targets')
+ for (const target in targets) {
+ let limit = targets[target]
+ worker.addTarget(target, limit)
}
+
worker.on('job-done', (data) => {
if (jobPromises[data.id] !== undefined) {
const P = jobPromises[data.id]
@@ -186,11 +183,10 @@ function onPollRequest(data, requestNo, connection) {
* @param {Connection} connection
*/
function onStatus(data, requestNo, connection) {
- const qs = worker.getStatus()
connection.send(
new ResponseMessage(requestNo)
.setData({
- targets: qs.targets,
+ targets: worker.getStatus(),
jobPromisesCount: Object.keys(jobPromises).length,
memoryUsage: process.memoryUsage()
})
diff --git a/src/lib/config.js b/src/lib/config.js
index eb1135d..ac711fd 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -13,7 +13,7 @@ function readFile(file) {
function processScheme(source, scheme) {
const result = {}
-
+
for (let key in scheme) {
let opts = scheme[key]
let ne = !(key in source) || !source[key]
@@ -34,11 +34,17 @@ function processScheme(source, scheme) {
throw new Error(`'${key}' must be a float`)
value = parseFloat(value)
break
+
+ case 'object':
+ if (typeof value !== 'object')
+ throw new Error(`'${key}' must be an object`)
+
+ break
}
result[key] = value
}
-
+
return result
}
@@ -69,26 +75,23 @@ function parseWorkerConfig(file) {
launcher: {required: true},
max_output_buffer: {default: 1024*1024, type: 'int'},
+ targets: {required: true, type: 'object'},
}
Object.assign(config, processScheme(raw, scheme))
config.targets = {}
-
- // targets
- for (let target in raw) {
+ for (let target in raw.targets) {
if (target === 'null')
throw new Error('word \'null\' is reserved, please don\'t use it as a target name')
- if (typeof raw[target] !== 'object')
- continue
+ if (!isNumeric(raw.targets[target]))
+ throw new Error(`value of target '${target}' must be a number`)
- config.targets[target] = {slots: {}}
- for (let slotName in raw[target]) {
- let slotLimit = parseInt(raw[target][slotName], 10)
- if (slotLimit < 1)
- throw new Error(`${target}: slot ${slotName} has invalid limit`)
- config.targets[target].slots[slotName] = slotLimit
- }
+ let value = parseInt(raw.targets[target], 10)
+ if (value < 1)
+ throw new Error(`target '${target}' has invalid value`)
+
+ config.targets[target] = value
}
}
diff --git a/src/lib/worker.js b/src/lib/worker.js
index 562e0b1..beba7f9 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -26,7 +26,7 @@ class Worker extends EventEmitter {
super()
/**
- * @type {object.<string, {slots: object.<string, Queue>, paused: boolean}>}
+ * @type {object.<string, {queue: Queue, paused: boolean}>}
*/
this.targets = {}
@@ -50,30 +50,26 @@ class Worker extends EventEmitter {
* Creates new queue.
*
* @param {string} target
- * @param {string} slot
* @param {number} limit
*/
- addSlot(target, slot, limit) {
- this.logger.debug(`addSlot: adding slot '${slot}' for target' ${target}' (limit: ${limit})`)
+ addTarget(target, limit) {
+ this.logger.debug(`addTarget: adding target' ${target}', limit = ${limit}`)
- if (this.targets[target] === undefined)
- this.targets[target] = {
- slots: {},
- paused: false
- }
-
- if (this.targets[target].slots[slot] !== undefined)
- throw new Error(`slot ${slot} for target ${target} has already been added`)
+ if (target in this.targets)
+ throw new Error(`target '${target}' already added`)
let queue = Queue({
concurrency: limit,
autostart: true
})
- queue.on('success', this.onJobFinished.bind(this, target, slot))
- queue.on('error', this.onJobFinished.bind(this, target, slot))
+ queue.on('success', this.onJobFinished.bind(this, target))
+ queue.on('error', this.onJobFinished.bind(this, target))
queue.start()
- this.targets[target].slots[slot] = queue
+ this.targets[target] = {
+ paused: false,
+ queue
+ }
}
/**
@@ -85,19 +81,17 @@ class Worker extends EventEmitter {
if (targets === null)
targets = this.getTargets()
- for (const targetName of targets) {
- const target = this.targets[targetName]
- if (target.paused) {
- this.logger.warn(`pauseTargets: ${targetName} is already paused`)
+ for (const target of targets) {
+ const {queue, paused} = this.targets[target]
+ if (paused) {
+ this.logger.warn(`pauseTargets: ${target} is already paused`)
continue
}
- for (const slotName in target.slots) {
- this.logger.debug(`pauseTargets: stopping ${targetName}/${slotName} queue`)
- target.slots[slotName].stop()
- }
+ this.logger.debug(`pauseTargets: stopping ${target}`)
+ queue.stop()
- target.paused = true
+ this.targets[target].paused = true
}
}
@@ -110,19 +104,17 @@ class Worker extends EventEmitter {
if (targets === null)
targets = this.getTargets()
- for (const targetName of targets) {
- const target = this.targets[targetName]
- if (!target.paused) {
- this.logger.warn(`continueTargets: ${targetName} is not paused`)
+ for (const target of targets) {
+ const {queue, paused} = this.targets[target]
+ if (!paused) {
+ this.logger.warn(`continueTargets: ${target} is not paused`)
continue
}
- for (const slotName in target.slots) {
- this.logger.debug(`pauseTargets: starting ${targetName}/${slotName} queue`)
- target.slots[slotName].start()
- }
+ this.logger.debug(`pauseTargets: starting ${target}`)
+ queue.start()
- target.paused = false
+ this.targets[target].paused = false
}
}
@@ -142,19 +134,16 @@ class Worker extends EventEmitter {
* @return {object}
*/
getStatus() {
- let status = {targets: {}}
- for (const targetName in this.targets) {
- let target = this.targets[targetName]
- status.targets[targetName] = {
- paused: target.paused,
- slots: {}
- }
- for (const slotName in target.slots) {
- const queue = target.slots[slotName]
- status.targets[targetName].slots[slotName] = {
- concurrency: queue.concurrency,
- length: queue.length,
- }
+ let status = {}
+ for (const target in this.targets) {
+ if (!this.targets.hasOwnProperty(target))
+ continue
+
+ const {queue, paused} = this.targets[target]
+ status[target] = {
+ paused,
+ concurrency: queue.concurrency,
+ length: queue.length,
}
}
return status
@@ -188,10 +177,10 @@ class Worker extends EventEmitter {
return
}
- // skip and postpone the poll, if no free slots
+ // skip and postpone the poll, if no free targets
// it will be called again from onJobFinished()
- if (!this.hasFreeSlots(targets)) {
- this.logger.debug(`${LOGPREFIX} no free slots`)
+ if (!this.hasFreeTargets(targets)) {
+ this.logger.debug(`${LOGPREFIX} no free targets`)
return
}
@@ -279,7 +268,7 @@ class Worker extends EventEmitter {
* @param {{ids: number[]}} data
* @returns
* {Promise<{
- * results: Map<number, {status: number, reason: string, slot: string, target: string}>,
+ * results: Map<number, {status: number, reason: string, target: string}>,
* rowsCount: number
* }>}
*/
@@ -290,11 +279,11 @@ class Worker extends EventEmitter {
await db.beginTransaction()
/**
- * @type {Map<number, {status: number, reason: string, slot: string, target: string}>}
+ * @type {Map<number, {status: number, reason: string, target: string}>}
*/
const jobsResults = new Map()
- let sqlFields = `id, status, target, slot`
+ let sqlFields = `id, status, target`
let sql
if (data.ids) {
sql = `SELECT ${sqlFields} FROM ${config.get('mysql_table')} WHERE id IN(`+data.ids.map(db.escape).join(',')+`) FOR UPDATE`
@@ -318,7 +307,6 @@ class Worker extends EventEmitter {
for (let result of rows) {
const id = parseInt(result.id)
- const slot = String(result.slot)
const target = String(result.target)
const status = String(result.status)
@@ -333,7 +321,7 @@ class Worker extends EventEmitter {
continue
}
- if (!target || this.targets[target] === undefined) {
+ if (!target || !(target in this.targets)) {
let reason = `target '${target}' not found (job id=${id})`
jobsResults.set(id, {
result: JOB_IGNORED,
@@ -344,23 +332,11 @@ class Worker extends EventEmitter {
continue
}
- if (!slot || this.targets[target].slots[slot] === undefined) {
- let reason = `slot '${slot}' of target '${target}' not found (job id=${id})`
- jobsResults.set(id, {
- result: JOB_IGNORED,
- reason
- })
-
- this.logger.error(`${LOGPREFIX} ${reason}`)
- continue
- }
-
- this.logger.debug(`${LOGPREFIX} accepted target='${target}', slot='${slot}', id=${id}`)
+ this.logger.debug(`${LOGPREFIX} accepted target='${target}', id=${id}`)
jobsResults.set(id, {
result: JOB_ACCEPTED,
- target,
- slot
+ target
})
}
@@ -400,8 +376,8 @@ class Worker extends EventEmitter {
if (result !== JOB_ACCEPTED)
continue
- const {slot, target} = jobResult
- this.enqueueJob(id, target, slot)
+ const {target} = jobResult
+ this.enqueueJob(id, target)
}
return {
@@ -415,10 +391,9 @@ class Worker extends EventEmitter {
*
* @param {int} id
* @param {string} target
- * @param {string} slot
*/
- enqueueJob(id, target, slot) {
- const queue = this.targets[target].slots[slot]
+ enqueueJob(id, target) {
+ const queue = this.targets[target].queue
queue.push(async (cb) => {
let data = {
code: null,
@@ -556,21 +531,20 @@ class Worker extends EventEmitter {
* @param {string[]} inTargets
* @returns {boolean}
*/
- hasFreeSlots(inTargets = []) {
- const LOGPREFIX = `hasFreeSlots(${JSON.stringify(inTargets)}):`
+ hasFreeTargets(inTargets = []) {
+ const LOGPREFIX = `hasFreeTargets(${JSON.stringify(inTargets)}):`
this.logger.debug(`${LOGPREFIX} entered`)
for (const target in this.targets) {
- if (!inTargets.includes(target))
+ if (!this.targets.hasOwnProperty(target) || !inTargets.includes(target))
continue
- for (const slot in this.targets[target].slots) {
- const queue = this.targets[target].slots[slot]
- this.logger.debug(LOGPREFIX, queue.concurrency, queue.length)
- if (queue.length < queue.concurrency)
- return true
- }
+ const {paused, queue} = this.targets[target]
+ this.logger.trace(LOGPREFIX, target, queue.concurrency, queue.length)
+
+ if (queue.length < queue.concurrency)
+ return true
}
return false
@@ -578,15 +552,12 @@ class Worker extends EventEmitter {
/**
* @param {string} target
- * @param {string} slot
*/
- onJobFinished = (target, slot) => {
- this.logger.debug(`onJobFinished: target=${target}, slot=${slot}`)
-
- const targetPaused = this.targets[target].paused
- const queue = this.targets[target].slots[slot]
+ onJobFinished = (target) => {
+ this.logger.debug(`onJobFinished: target=${target}`)
- if (!targetPaused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
+ const {paused, queue} = this.targets[target]
+ if (!paused && queue.length < queue.concurrency && this.hasPollTarget(target)) {
this.logger.debug(`onJobFinished: ${queue.length} < ${queue.concurrency}, calling poll(${target})`)
this.poll()
}