summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/jobd-master.js64
-rwxr-xr-xsrc/jobd.js107
-rw-r--r--src/lib/config.js2
-rw-r--r--src/lib/data-validator.js71
-rw-r--r--src/lib/server.js320
-rw-r--r--src/lib/util.js23
-rw-r--r--src/lib/workers-list.js4
7 files changed, 454 insertions, 137 deletions
diff --git a/src/jobd-master.js b/src/jobd-master.js
index 608750d..d6bc09a 100755
--- a/src/jobd-master.js
+++ b/src/jobd-master.js
@@ -65,7 +65,9 @@ async function main() {
// start server
server = new Server()
- server.on('message', onMessage)
+ server.on('new-connection', (connection) => {
+ connection.on('request-message', onRequestMessage)
+ })
server.start(config.get('port'), config.get('host'))
logger.info('server started')
}
@@ -75,65 +77,71 @@ async function main() {
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onMessage({message, connection}) {
+async function onRequestMessage(message, connection) {
try {
- if (!(message instanceof RequestMessage)) {
- logger.debug('ignoring message', message)
- return
- }
-
- if (message.requestType !== 'ping')
- logger.info('onMessage:', message)
-
- if (config.get('password') && message.password !== config.get('password')) {
- connection.send(new ResponseMessage().setError('invalid password'))
- return connection.close()
- }
+ logger.info('onMessage:', message)
switch (message.requestType) {
- case 'ping':
- connection.send(new ResponseMessage().setError('pong'))
- break
-
case 'register-worker': {
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError(`targets are empty`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`targets are empty`)
+ )
break
}
workers.add(connection, targets)
- connection.send(new ResponseMessage().setData('ok'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
}
case 'poke': {
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError(`targets are empty`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`targets are empty`)
+ )
break
}
workers.poke(targets)
- connection.send(new ResponseMessage().setData('ok'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
}
case 'status':
const info = workers.getInfo()
- connection.send(new ResponseMessage().setData({
- workers: info,
- memoryUsage: process.memoryUsage()
- }))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData({
+ workers: info,
+ memoryUsage: process.memoryUsage()
+ })
+ )
break
default:
- connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`unknown request type: '${message.requestType}'`)
+ )
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
- connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('server error: ' + error?.message)
+ )
}
}
diff --git a/src/jobd.js b/src/jobd.js
index a175b63..0df4e70 100755
--- a/src/jobd.js
+++ b/src/jobd.js
@@ -22,7 +22,7 @@ let logger
let server
/**
- * @type {object.<string, Connection>}
+ * @type {object.<string, {connection: Connection, requestNo: number}>}
*/
let jobDoneAwaiters = {}
@@ -90,8 +90,14 @@ async function main() {
}
worker.on('job-done', (data) => {
if (jobDoneAwaiters[data.id] !== undefined) {
- jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data))
- jobDoneAwaiters[data.id].close()
+ const {connection, requestNo} = jobDoneAwaiters[data.id]
+
+ connection.send(
+ new ResponseMessage(requestNo)
+ .setData(data)
+ )
+ connection.close()
+
delete jobDoneAwaiters[data.id]
}
})
@@ -99,7 +105,9 @@ async function main() {
// start server
server = new Server()
- server.on('message', onMessage)
+ server.on('new-connection', (connection) => {
+ connection.on('request-message', onRequestMessage)
+ })
server.start(config.get('port'), config.get('host'))
logger.info('server started')
@@ -114,36 +122,27 @@ async function main() {
* @param {Connection} connection
* @return {Promise<*>}
*/
-async function onMessage({message, connection}) {
+async function onRequestMessage(message, connection) {
try {
- if (!(message instanceof RequestMessage)) {
- logger.debug('ignoring message', message)
- return
- }
-
- if (message.requestType !== 'ping')
- logger.info('onMessage:', message)
-
- if (config.get('password') && message.password !== config.get('password')) {
- connection.send(new ResponseMessage().setError('invalid password'))
- return connection.close()
- }
+ logger.info('onMessage:', message)
switch (message.requestType) {
- case 'ping':
- connection.send(new ResponseMessage().setData('pong'))
- break
-
case 'poll':
const targets = message.requestData?.targets || []
if (!targets.length) {
- connection.send(new ResponseMessage().setError('empty targets'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('empty targets')
+ )
break
}
for (const t of targets) {
if (!worker.hasTarget(t)) {
- connection.send(new ResponseMessage().setError(`invalid target '${t}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`invalid target '${t}'`)
+ )
break
}
}
@@ -151,28 +150,38 @@ async function onMessage({message, connection}) {
worker.setPollTargets(targets)
worker.poll()
- connection.send(new ResponseMessage().setData('ok'));
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setData('ok')
+ )
break
case 'status':
const qs = worker.getStatus()
connection.send(
- new ResponseMessage().setData({
- queue: qs,
- jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
- memoryUsage: process.memoryUsage()
- })
+ new ResponseMessage(message.requestNo)
+ .setData({
+ queue: qs,
+ jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length,
+ memoryUsage: process.memoryUsage()
+ })
)
break
case 'run-manual':
const {id} = message.requestData
if (id in jobDoneAwaiters) {
- connection.send(new ResponseMessage().setError('another client is already waiting this job'))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('another client is already waiting this job')
+ )
break
}
- jobDoneAwaiters[id] = connection
+ jobDoneAwaiters[id] = {
+ connection,
+ requestNo: message.requestNo
+ }
const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id})
if (!accepted) {
@@ -181,18 +190,28 @@ async function onMessage({message, connection}) {
let message = 'failed to run task'
if (typeof error === 'string')
message += `: ${error}`
- connection.send(new ResponseMessage().setError(message))
+
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(message)
+ )
}
break
default:
- connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError(`unknown request type: '${message.requestType}'`)
+ )
break
}
} catch (error) {
logger.error(`error while handling message:`, message, error)
- connection.send(new ResponseMessage().setError('server error: ' + error?.message))
+ connection.send(
+ new ResponseMessage(message.requestNo)
+ .setError('server error: ' + error?.message)
+ )
}
}
@@ -202,11 +221,17 @@ function connectToMaster() {
connection.connect(config.get('master_host'), config.get('master_port'))
connection.on('connect', function() {
- connection.send(
+ connection.sendRequest(
new RequestMessage('register-worker', {
targets: worker.getTargets()
})
)
+ .then(response => {
+ logger.debug('connectToMaster: response:', response)
+ })
+ .catch(error => {
+ logger.error('connectToMaster: error while awaiting response:', error)
+ })
})
connection.on('close', () => {
@@ -216,17 +241,7 @@ function connectToMaster() {
}, config.get('master_reconnect_timeout') * 1000)
})
- connection.on('message', (message) => {
- if (!(message instanceof RequestMessage)) {
- logger.debug('message from master is not a request, hmm... skipping', message)
- return
- }
-
- onMessage({message, connection})
- .catch((error) => {
- logger.error('connectToMaster: onMessage:', error)
- })
- })
+ connection.on('request-message', onRequestMessage)
}
diff --git a/src/lib/config.js b/src/lib/config.js
index a59615e..eb1135d 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -112,7 +112,7 @@ function parseMasterConfig(file) {
}
/**
- * @param {string} key
+ * @param {string|null} key
* @return {string|number|object}
*/
function get(key = null) {
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
new file mode 100644
index 0000000..276ea9b
--- /dev/null
+++ b/src/lib/data-validator.js
@@ -0,0 +1,71 @@
+const {isInteger, isObject} = require('lodash')
+const {getLogger} = require('./logger')
+
+const typeNames = {
+ 'i': 'integer',
+ 'n': 'number',
+ 's': 'string',
+ 'o': 'object',
+ 'a': 'array',
+}
+
+const logger = getLogger('data-validator')
+
+/**
+ * @param {string} expectedType
+ * @param value
+ */
+function checkType(expectedType, value) {
+ switch (expectedType) {
+ case 'i':
+ return isInteger(value)
+ case 'n':
+ return typeof value === 'number'
+ case 's':
+ return typeof value === 'string'
+ case 'o':
+ return typeof value === 'object'
+ case 'a':
+ return Array.isArray(value)
+ default:
+ logger.error(`checkType: unknown type ${expectedType}`)
+ return false
+ }
+}
+
+/**
+ * @param {object} data
+ * @param {array} schema
+ */
+function validateMessageData(data, schema) {
+ if (!isObject(data))
+ throw new Error(`data is not an object`)
+
+ for (const field of schema) {
+ let [name, types, required] = field
+ if (!(name in data)) {
+ if (required)
+ throw new Error(`missing required field ${name}`)
+
+ continue
+ }
+
+ types = types.split('')
+
+ if (!types
+ .map(type => checkType(type, data[name]))
+ .some(result => result === true)) {
+
+ let error = `'${name}' must be `
+ if (types.length === 1) {
+ error += typeNames[types[0]]
+ } else {
+ error += 'any of: ' + types.map(t => typeNames[t]).join(', ')
+ }
+
+ throw new Error(error)
+ }
+ }
+}
+
+module.exports = {validateMessageData} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 08eadca..ca06a6d 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -1,14 +1,21 @@
const net = require('net')
const EventEmitter = require('events')
const {getLogger} = require('./logger')
-const isObject = require('lodash/isObject')
+const random = require('lodash/random')
+const config = require('./config')
+const {createCallablePromise} = require('./util')
+const {validateMessageData} = require('./data-validator')
const EOT = 0x04
+const REQUEST_NO_LIMIT = 999999
+
class Message {
static REQUEST = 0
static RESPONSE = 1
+ static PING = 2
+ static PONG = 3
/**
* @param {number} type
@@ -20,37 +27,61 @@ class Message {
this.type = type
}
+ /**
+ * @return {array}
+ */
getAsObject() {
return [this.type]
}
-
}
class ResponseMessage extends Message {
- constructor() {
+ /**
+ * @param {number} requestNo
+ */
+ constructor(requestNo) {
super(Message.RESPONSE)
+ this.requestNo = requestNo
this.error = null
this.data = null
}
+ /**
+ * @param {string} error
+ * @return {ResponseMessage}
+ */
setError(error) {
this.error = error
return this
}
+ /**
+ * @param data
+ * @return {ResponseMessage}
+ */
setData(data) {
this.data = data
return this
}
+ /**
+ * @return {array}
+ */
getAsObject() {
+ let response = {
+ no: this.requestNo
+ }
+
+ if (this.error !== null)
+ response.error = this.error
+
+ if (this.data !== null)
+ response.data = this.data
+
return [
...super.getAsObject(),
- [
- this.error,
- this.data
- ]
+ response
]
}
}
@@ -77,16 +108,28 @@ class RequestMessage extends Message {
* @type {null|string}
*/
this.password = null
+
+ /**
+ * @type {null|number}
+ */
+ this.requestNo = null
}
+ /**
+ * @return {array}
+ */
getAsObject() {
let request = {
+ no: this.requestNo,
type: this.requestType
}
if (this.requestData)
request.data = this.requestData
+ if (this.password)
+ request.password = this.password
+
return [
...super.getAsObject(),
request
@@ -99,8 +142,28 @@ class RequestMessage extends Message {
setPassword(password) {
this.password = password
}
+
+ /**
+ * @param {number} no
+ */
+ setRequestNo(no) {
+ this.requestNo = no
+ }
}
+class PingMessage extends Message {
+ constructor() {
+ super(Message.PING)
+ }
+}
+
+class PongMessage extends Message {
+ constructor() {
+ super(Message.PONG)
+ }
+}
+
+
class Server extends EventEmitter {
constructor() {
@@ -137,14 +200,10 @@ class Server extends EventEmitter {
onConnection = (socket) => {
let connection = new Connection()
connection.setSocket(socket)
- connection.on('message', (message) => {
- this.emit('message', {
- message,
- connection
- })
- })
this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
+
+ this.emit('new-connection', connection)
}
onListening = () => {
@@ -158,6 +217,7 @@ class Server extends EventEmitter {
}
+
class Connection extends EventEmitter {
constructor() {
@@ -190,9 +250,32 @@ class Connection extends EventEmitter {
this.remotePort = null
/**
- * @type {null|number}
+ * @type {boolean}
+ * @private
+ */
+ this._isAuthorized = config.get('password') === ''
+
+ /**
+ * @type {boolean}
+ * @private
*/
- this.id = null
+ this._textConversationAllowed = false
+
+ /**
+ * @type {boolean}
+ */
+ this._isOutgoing = false
+
+ /**
+ * @type {number}
+ */
+ this._lastOutgoingRequestNo = random(0, REQUEST_NO_LIMIT)
+
+ /**
+ * @type {object.<number, Promise>}
+ * @private
+ */
+ this._requestPromises = {}
this._setLogger()
}
@@ -205,13 +288,14 @@ class Connection extends EventEmitter {
if (this.socket !== null)
throw new Error(`this Connection already has a socket`)
+ this._isOutgoing = true
+
this.socket = new net.Socket()
this.socket.connect({host, port})
this.remoteAddress = host
this.remotePort = port
- this._setId()
this._setLogger()
this._setSocketEvents()
}
@@ -225,7 +309,11 @@ class Connection extends EventEmitter {
this.remoteAddress = socket.remoteAddress
this.remotePort = socket.remotePort
- this._setId()
+ if (this.remoteAddress === '127.0.0.1') {
+ this._isAuthorized = true
+ this._textConversationAllowed = true
+ }
+
this._setLogger()
this._setSocketEvents()
}
@@ -235,14 +323,7 @@ class Connection extends EventEmitter {
*/
_setLogger() {
let addr = this.socket ? this.remoteAddr() : '?'
- this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
- }
-
- /**
- * @private
- */
- _setId() {
- this.id = Math.floor(Math.random() * 10000)
+ this.logger = getLogger(`<Connection ${addr}>`)
}
/**
@@ -304,76 +385,183 @@ class Connection extends EventEmitter {
this.logger.trace(`processChunks (after parsing):`, this.data)
- for (let message of messages) {
+ for (let rawMessage of messages) {
try {
- let buf = message.toString('utf-8')
+ let buf = rawMessage.toString('utf-8')
this.logger.debug(buf)
let json = JSON.parse(buf)
- this._emitMessage(json)
+
+ // try to parse the message
+ let message
+ try {
+ message = this._parseMessage(json)
+ } catch (e) {
+ // message is malformed
+ this.logger.error(e.message)
+
+ // send error to the other size
+ this.send(
+ new ResponseMessage(0).setError(e.message)
+ )
+
+ continue
+ }
+
+ if (message instanceof PingMessage) {
+ this.send(new PongMessage())
+ continue
+ }
+
+ if (message instanceof PongMessage)
+ continue
+
+ if (message instanceof RequestMessage) {
+ if (!this._isAuthorized) {
+ if (message.password !== config.get('password')) {
+ this.send(new ResponseMessage(message.requestNo).setError('invalid password'))
+ this.close()
+ break
+ }
+
+ this._isAuthorized = true
+ }
+
+ this.emit('request-message', message, this)
+ }
+
+ if (message instanceof ResponseMessage) {
+ if (message.requestNo in this._requestPromises) {
+ const P = this._requestPromises[message.requestNo]
+ delete this._requestPromises[message.requestNo]
+
+ P.resolve(message)
+ } else {
+ this.logger.warn('received unexpected Response message:', message)
+ }
+ }
} catch (error) {
this.logger.error('failed to parse data as JSON')
- this.logger.debug(message)
+ this.logger.debug(rawMessage)
}
}
}
/**
+ * Parse incoming message
+ *
* @param {object} json
+ * @return {Message}
* @private
+ * @throws Error
*/
- _emitMessage(json) {
- if (!Array.isArray(json)) {
- this.logger.error('malformed message, JSON array expected', json)
- return
- }
+ _parseMessage(json) {
+ if (!Array.isArray(json))
+ throw new Error('JSON array expected, got: ' + json)
let type = json.shift()
let message
switch (type) {
case Message.REQUEST: {
let data = json.shift()
- if (!data || !isObject(data)) {
- this.logger.error('malformed REQUEST message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['type', 's', true],
+ ['no', 'i', true],
+ ['password', 's', false],
+ ['data', 'snoa', false]
+ ])
+ } catch (e) {
+ throw new Error(`malformed REQUEST message: ${e.message}`)
}
message = new RequestMessage(data.type, data.data || null)
+ message.setRequestNo(data.no)
if (data.password)
message.setPassword(data.password)
- break
+
+ return message
}
case Message.RESPONSE: {
let data = json.shift()
- if (!data || !Array.isArray(data) || data.length < 2) {
- this.logger.error('malformed RESPONSE message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['no', 'i', true],
+ ['data', 'snoa', false],
+ ['error', 's', false],
+ ])
+ } catch (e) {
+ throw new Error(`malformed RESPONSE message: ${e.message}`)
}
- message = new ResponseMessage()
- message.setError(data[0]).setData(data[1])
+ message = new ResponseMessage(data.no)
+ message.setError(data.error || null)
+ .setData(data.data || null)
- break
+ return message
}
+ case Message.PING:
+ return new PingMessage()
+
+ case Message.PONG:
+ return new PongMessage()
+
default:
- this.logger.error(`malformed message, unexpected type ${type}`)
- return
+ throw new Error(`unexpected type ${type}`)
+ }
+ }
+
+ /**
+ * Send request
+ *
+ * @param {RequestMessage} message
+ * @return {Promise}
+ */
+ sendRequest(message) {
+ if (!(message instanceof RequestMessage))
+ throw new Error('sendRequest only accepts RequestMessage, got:', message)
+
+ // send password once (when talking to jobd-master)
+ if (!this._isAuthorized) {
+ message.setPassword(config.get('password'))
+ this._isAuthorized = true
+ }
+
+ // assign request number
+ const no = this._getNextOutgoingRequestNo()
+ if (this._requestPromises[no] !== undefined) {
+ this.logger.error(`sendRequest: next request's No is ${no}, found a promise awaiting response with the same no, rejecting...`)
+ this._requestPromises[no].reject(new Error(`this should not happen, but another request needs this number (${no})`))
+ delete this._requestPromises[no]
}
- this.emit('message', message)
+ message.setRequestNo(no)
+
+ // send it
+ this.send(message)
+
+ // create and return promise
+ const P = createCallablePromise()
+ this._requestPromises[no] = P
+
+ return P
}
/**
+ * Send any Message
+ *
* @type {Message} data
* @param message
*/
send(message) {
if (!(message instanceof Message))
- throw new Error('send expects Message, got', message)
-
- // TODO set password!
+ throw new Error('send expects Message, got: ' + message)
let json = JSON.stringify(message.getAsObject())
let buf = Buffer.concat([
@@ -391,13 +579,20 @@ class Connection extends EventEmitter {
}
}
+ _getNextOutgoingRequestNo() {
+ this._lastOutgoingRequestNo++;
+ if (this._lastOutgoingRequestNo >= REQUEST_NO_LIMIT)
+ this._lastOutgoingRequestNo = 1
+ return this._lastOutgoingRequestNo
+ }
+
/**
*/
close() {
try {
this.socket.end()
this.socket.destroy()
- this._emitClose()
+ this._handleClose()
} catch (error) {
this.logger.error('close:', error)
}
@@ -406,12 +601,17 @@ class Connection extends EventEmitter {
/**
* @private
*/
- _emitClose() {
- if (this._closeEmitted)
- return
+ _handleClose() {
+ if (!this._closeEmitted) {
+ this._closeEmitted = true
+ this.emit('close')
+ }
+
+ for (const no in this._requestPromises) {
+ this._requestPromises[no].reject(new Error('socket is closed'))
+ }
- this._closeEmitted = true
- this.emit('close')
+ this._requestPromises = {}
}
onConnect = () => {
@@ -433,12 +633,12 @@ class Connection extends EventEmitter {
}
onClose = (hadError) => {
- this._emitClose()
+ this._handleClose()
this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
- this._emitClose()
+ this._handleClose()
this.logger.warn(`socket error:`, error)
}
@@ -448,5 +648,7 @@ module.exports = {
Server,
Connection,
RequestMessage,
- ResponseMessage
+ ResponseMessage,
+ PingMessage,
+ PongMessage,
} \ No newline at end of file
diff --git a/src/lib/util.js b/src/lib/util.js
index 6de07e5..a4e0025 100644
--- a/src/lib/util.js
+++ b/src/lib/util.js
@@ -5,5 +5,26 @@ module.exports = {
isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n)
+ },
+
+ /**
+ * Creates a Promise that can be resolved or rejected from the outside
+ *
+ * @param {Function} abortCallback
+ * @return {Promise}
+ */
+ createCallablePromise(abortCallback = null) {
+ let promise, resolve, reject
+ promise = new Promise(function(_resolve, _reject) {
+ resolve = _resolve
+ reject = _reject
+ })
+ promise.resolve = function(result) {
+ resolve(result)
+ }
+ promise.reject = function(result) {
+ reject(result)
+ }
+ return promise
}
-} \ No newline at end of file
+}
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index 620f711..ed436e0 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -1,7 +1,7 @@
const intersection = require('lodash/intersection')
const config = require('./config')
const {getLogger} = require('./logger')
-const {RequestMessage} = require('./server')
+const {RequestMessage, PingMessage} = require('./server')
const throttle = require('lodash/throttle')
class WorkersList {
@@ -136,7 +136,7 @@ class WorkersList {
this.workers
.forEach(w => {
this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
- w.connection.send(new RequestMessage('ping'))
+ w.connection.send(new PingMessage())
})
}