aboutsummaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/data-validator.js58
-rw-r--r--src/lib/logger.js3
-rw-r--r--src/lib/request-handler.js17
-rw-r--r--src/lib/worker.js19
4 files changed, 87 insertions, 10 deletions
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
index 7419b34..74827c1 100644
--- a/src/lib/data-validator.js
+++ b/src/lib/data-validator.js
@@ -11,6 +11,11 @@ const typeNames = {
const logger = getLogger('data-validator')
+
+/**************************************/
+/** Common Functions **/
+/**************************************/
+
/**
* @param {string} expectedType
* @param value
@@ -69,7 +74,12 @@ function validateObjectSchema(data, schema) {
}
}
-function validateTargetsListFormat(targets) {
+
+/********************************************/
+/** Request input data validators */
+/********************************************/
+
+function validateInputTargetsListFormat(targets) {
if (!Array.isArray(targets))
throw new Error('targets must be array')
@@ -83,7 +93,51 @@ function validateTargetsListFormat(targets) {
}
}
+function validateInputTargetAndConcurrency(data, onlyTarget = false) {
+ const schema = [
+ ['target', 's', true],
+ ]
+
+ if (!onlyTarget) {
+ schema.push(
+ ['concurrency', 'i', true]
+ )
+ }
+
+ validateObjectSchema(data, schema)
+
+ if (!onlyTarget && data.concurrency <= 0)
+ throw new Error('Invalid concurrency value.')
+}
+
+/**
+ * @param data
+ * @param {Worker|null} worker
+ * @return {null|string[]}
+ */
+function validateInputTargets(data, worker) {
+ // null means all targets
+ let targets = null
+
+ if (data.targets !== undefined) {
+ targets = data.targets
+
+ validateInputTargetsListFormat(targets)
+
+ if (worker !== null) {
+ for (const t of targets) {
+ if (!worker.hasTarget(t))
+ throw new Error(`invalid target '${t}'`)
+ }
+ }
+ }
+
+ return targets
+}
+
module.exports = {
validateObjectSchema,
- validateTargetsListFormat
+ validateInputTargetsListFormat,
+ validateInputTargetAndConcurrency,
+ validateInputTargets,
} \ No newline at end of file
diff --git a/src/lib/logger.js b/src/lib/logger.js
index f886e0c..b71020c 100644
--- a/src/lib/logger.js
+++ b/src/lib/logger.js
@@ -101,10 +101,9 @@ module.exports = {
},
/**
- * @param cb
* @return {Promise}
*/
- shutdown(cb) {
+ shutdown() {
return new Promise((resolve, reject) => {
log4js.shutdown(resolve)
})
diff --git a/src/lib/request-handler.js b/src/lib/request-handler.js
index 4330b6b..e7f9fe2 100644
--- a/src/lib/request-handler.js
+++ b/src/lib/request-handler.js
@@ -35,14 +35,17 @@ class RequestHandler {
if (this.handlers.has(message.requestType)) {
const f = this.handlers.get(message.requestType)
- const result = f(message.requestData || {}, message.requestNo, connection)
+ const result = f(message.requestData || {}, connection)
if (result instanceof Promise) {
- result.catch(error => {
- this.logger.error(`${message.requestType}:`, error)
-
+ result.then(data => {
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData(data)
+ )
+ }).catch(error => {
connection.send(
new ResponseMessage(message.requestNo)
- .setError('server error: ' + error?.message)
+ .setError(error?.message)
)
})
}
@@ -56,4 +59,6 @@ class RequestHandler {
}
-module.exports = RequestHandler \ No newline at end of file
+module.exports = {
+ RequestHandler
+} \ No newline at end of file
diff --git a/src/lib/worker.js b/src/lib/worker.js
index e53d03f..0be6a19 100644
--- a/src/lib/worker.js
+++ b/src/lib/worker.js
@@ -73,6 +73,25 @@ class Worker extends EventEmitter {
}
/**
+ * Deletes a queue.
+ *
+ * @param {string} target
+ */
+ removeTarget(target) {
+ if (!(target in this.targets))
+ throw new Error(`target '${target}' not found`)
+
+ const {queue} = this.targets[target]
+ if (queue.length > 0)
+ throw new Error(`queue is not empty`)
+
+ this.logger.debug(`deleteTarget: deleting target' ${target}'`)
+ queue.removeAllListeners()
+ queue.end()
+ delete this.targets[target]
+ }
+
+ /**
* @param {string} target
* @param {number} concurrency
*/