aboutsummaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/config.js2
-rw-r--r--src/lib/data-validator.js71
-rw-r--r--src/lib/server.js320
-rw-r--r--src/lib/util.js23
-rw-r--r--src/lib/workers-list.js4
5 files changed, 357 insertions, 63 deletions
diff --git a/src/lib/config.js b/src/lib/config.js
index a59615e..eb1135d 100644
--- a/src/lib/config.js
+++ b/src/lib/config.js
@@ -112,7 +112,7 @@ function parseMasterConfig(file) {
}
/**
- * @param {string} key
+ * @param {string|null} key
* @return {string|number|object}
*/
function get(key = null) {
diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js
new file mode 100644
index 0000000..276ea9b
--- /dev/null
+++ b/src/lib/data-validator.js
@@ -0,0 +1,71 @@
+const {isInteger, isObject} = require('lodash')
+const {getLogger} = require('./logger')
+
+const typeNames = {
+ 'i': 'integer',
+ 'n': 'number',
+ 's': 'string',
+ 'o': 'object',
+ 'a': 'array',
+}
+
+const logger = getLogger('data-validator')
+
+/**
+ * @param {string} expectedType
+ * @param value
+ */
+function checkType(expectedType, value) {
+ switch (expectedType) {
+ case 'i':
+ return isInteger(value)
+ case 'n':
+ return typeof value === 'number'
+ case 's':
+ return typeof value === 'string'
+ case 'o':
+ return typeof value === 'object'
+ case 'a':
+ return Array.isArray(value)
+ default:
+ logger.error(`checkType: unknown type ${expectedType}`)
+ return false
+ }
+}
+
+/**
+ * @param {object} data
+ * @param {array} schema
+ */
+function validateMessageData(data, schema) {
+ if (!isObject(data))
+ throw new Error(`data is not an object`)
+
+ for (const field of schema) {
+ let [name, types, required] = field
+ if (!(name in data)) {
+ if (required)
+ throw new Error(`missing required field ${name}`)
+
+ continue
+ }
+
+ types = types.split('')
+
+ if (!types
+ .map(type => checkType(type, data[name]))
+ .some(result => result === true)) {
+
+ let error = `'${name}' must be `
+ if (types.length === 1) {
+ error += typeNames[types[0]]
+ } else {
+ error += 'any of: ' + types.map(t => typeNames[t]).join(', ')
+ }
+
+ throw new Error(error)
+ }
+ }
+}
+
+module.exports = {validateMessageData} \ No newline at end of file
diff --git a/src/lib/server.js b/src/lib/server.js
index 08eadca..ca06a6d 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -1,14 +1,21 @@
const net = require('net')
const EventEmitter = require('events')
const {getLogger} = require('./logger')
-const isObject = require('lodash/isObject')
+const random = require('lodash/random')
+const config = require('./config')
+const {createCallablePromise} = require('./util')
+const {validateMessageData} = require('./data-validator')
const EOT = 0x04
+const REQUEST_NO_LIMIT = 999999
+
class Message {
static REQUEST = 0
static RESPONSE = 1
+ static PING = 2
+ static PONG = 3
/**
* @param {number} type
@@ -20,37 +27,61 @@ class Message {
this.type = type
}
+ /**
+ * @return {array}
+ */
getAsObject() {
return [this.type]
}
-
}
class ResponseMessage extends Message {
- constructor() {
+ /**
+ * @param {number} requestNo
+ */
+ constructor(requestNo) {
super(Message.RESPONSE)
+ this.requestNo = requestNo
this.error = null
this.data = null
}
+ /**
+ * @param {string} error
+ * @return {ResponseMessage}
+ */
setError(error) {
this.error = error
return this
}
+ /**
+ * @param data
+ * @return {ResponseMessage}
+ */
setData(data) {
this.data = data
return this
}
+ /**
+ * @return {array}
+ */
getAsObject() {
+ let response = {
+ no: this.requestNo
+ }
+
+ if (this.error !== null)
+ response.error = this.error
+
+ if (this.data !== null)
+ response.data = this.data
+
return [
...super.getAsObject(),
- [
- this.error,
- this.data
- ]
+ response
]
}
}
@@ -77,16 +108,28 @@ class RequestMessage extends Message {
* @type {null|string}
*/
this.password = null
+
+ /**
+ * @type {null|number}
+ */
+ this.requestNo = null
}
+ /**
+ * @return {array}
+ */
getAsObject() {
let request = {
+ no: this.requestNo,
type: this.requestType
}
if (this.requestData)
request.data = this.requestData
+ if (this.password)
+ request.password = this.password
+
return [
...super.getAsObject(),
request
@@ -99,8 +142,28 @@ class RequestMessage extends Message {
setPassword(password) {
this.password = password
}
+
+ /**
+ * @param {number} no
+ */
+ setRequestNo(no) {
+ this.requestNo = no
+ }
}
+class PingMessage extends Message {
+ constructor() {
+ super(Message.PING)
+ }
+}
+
+class PongMessage extends Message {
+ constructor() {
+ super(Message.PONG)
+ }
+}
+
+
class Server extends EventEmitter {
constructor() {
@@ -137,14 +200,10 @@ class Server extends EventEmitter {
onConnection = (socket) => {
let connection = new Connection()
connection.setSocket(socket)
- connection.on('message', (message) => {
- this.emit('message', {
- message,
- connection
- })
- })
this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
+
+ this.emit('new-connection', connection)
}
onListening = () => {
@@ -158,6 +217,7 @@ class Server extends EventEmitter {
}
+
class Connection extends EventEmitter {
constructor() {
@@ -190,9 +250,32 @@ class Connection extends EventEmitter {
this.remotePort = null
/**
- * @type {null|number}
+ * @type {boolean}
+ * @private
+ */
+ this._isAuthorized = config.get('password') === ''
+
+ /**
+ * @type {boolean}
+ * @private
*/
- this.id = null
+ this._textConversationAllowed = false
+
+ /**
+ * @type {boolean}
+ */
+ this._isOutgoing = false
+
+ /**
+ * @type {number}
+ */
+ this._lastOutgoingRequestNo = random(0, REQUEST_NO_LIMIT)
+
+ /**
+ * @type {object.<number, Promise>}
+ * @private
+ */
+ this._requestPromises = {}
this._setLogger()
}
@@ -205,13 +288,14 @@ class Connection extends EventEmitter {
if (this.socket !== null)
throw new Error(`this Connection already has a socket`)
+ this._isOutgoing = true
+
this.socket = new net.Socket()
this.socket.connect({host, port})
this.remoteAddress = host
this.remotePort = port
- this._setId()
this._setLogger()
this._setSocketEvents()
}
@@ -225,7 +309,11 @@ class Connection extends EventEmitter {
this.remoteAddress = socket.remoteAddress
this.remotePort = socket.remotePort
- this._setId()
+ if (this.remoteAddress === '127.0.0.1') {
+ this._isAuthorized = true
+ this._textConversationAllowed = true
+ }
+
this._setLogger()
this._setSocketEvents()
}
@@ -235,14 +323,7 @@ class Connection extends EventEmitter {
*/
_setLogger() {
let addr = this.socket ? this.remoteAddr() : '?'
- this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
- }
-
- /**
- * @private
- */
- _setId() {
- this.id = Math.floor(Math.random() * 10000)
+ this.logger = getLogger(`<Connection ${addr}>`)
}
/**
@@ -304,76 +385,183 @@ class Connection extends EventEmitter {
this.logger.trace(`processChunks (after parsing):`, this.data)
- for (let message of messages) {
+ for (let rawMessage of messages) {
try {
- let buf = message.toString('utf-8')
+ let buf = rawMessage.toString('utf-8')
this.logger.debug(buf)
let json = JSON.parse(buf)
- this._emitMessage(json)
+
+ // try to parse the message
+ let message
+ try {
+ message = this._parseMessage(json)
+ } catch (e) {
+ // message is malformed
+ this.logger.error(e.message)
+
+ // send error to the other size
+ this.send(
+ new ResponseMessage(0).setError(e.message)
+ )
+
+ continue
+ }
+
+ if (message instanceof PingMessage) {
+ this.send(new PongMessage())
+ continue
+ }
+
+ if (message instanceof PongMessage)
+ continue
+
+ if (message instanceof RequestMessage) {
+ if (!this._isAuthorized) {
+ if (message.password !== config.get('password')) {
+ this.send(new ResponseMessage(message.requestNo).setError('invalid password'))
+ this.close()
+ break
+ }
+
+ this._isAuthorized = true
+ }
+
+ this.emit('request-message', message, this)
+ }
+
+ if (message instanceof ResponseMessage) {
+ if (message.requestNo in this._requestPromises) {
+ const P = this._requestPromises[message.requestNo]
+ delete this._requestPromises[message.requestNo]
+
+ P.resolve(message)
+ } else {
+ this.logger.warn('received unexpected Response message:', message)
+ }
+ }
} catch (error) {
this.logger.error('failed to parse data as JSON')
- this.logger.debug(message)
+ this.logger.debug(rawMessage)
}
}
}
/**
+ * Parse incoming message
+ *
* @param {object} json
+ * @return {Message}
* @private
+ * @throws Error
*/
- _emitMessage(json) {
- if (!Array.isArray(json)) {
- this.logger.error('malformed message, JSON array expected', json)
- return
- }
+ _parseMessage(json) {
+ if (!Array.isArray(json))
+ throw new Error('JSON array expected, got: ' + json)
let type = json.shift()
let message
switch (type) {
case Message.REQUEST: {
let data = json.shift()
- if (!data || !isObject(data)) {
- this.logger.error('malformed REQUEST message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['type', 's', true],
+ ['no', 'i', true],
+ ['password', 's', false],
+ ['data', 'snoa', false]
+ ])
+ } catch (e) {
+ throw new Error(`malformed REQUEST message: ${e.message}`)
}
message = new RequestMessage(data.type, data.data || null)
+ message.setRequestNo(data.no)
if (data.password)
message.setPassword(data.password)
- break
+
+ return message
}
case Message.RESPONSE: {
let data = json.shift()
- if (!data || !Array.isArray(data) || data.length < 2) {
- this.logger.error('malformed RESPONSE message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['no', 'i', true],
+ ['data', 'snoa', false],
+ ['error', 's', false],
+ ])
+ } catch (e) {
+ throw new Error(`malformed RESPONSE message: ${e.message}`)
}
- message = new ResponseMessage()
- message.setError(data[0]).setData(data[1])
+ message = new ResponseMessage(data.no)
+ message.setError(data.error || null)
+ .setData(data.data || null)
- break
+ return message
}
+ case Message.PING:
+ return new PingMessage()
+
+ case Message.PONG:
+ return new PongMessage()
+
default:
- this.logger.error(`malformed message, unexpected type ${type}`)
- return
+ throw new Error(`unexpected type ${type}`)
+ }
+ }
+
+ /**
+ * Send request
+ *
+ * @param {RequestMessage} message
+ * @return {Promise}
+ */
+ sendRequest(message) {
+ if (!(message instanceof RequestMessage))
+ throw new Error('sendRequest only accepts RequestMessage, got:', message)
+
+ // send password once (when talking to jobd-master)
+ if (!this._isAuthorized) {
+ message.setPassword(config.get('password'))
+ this._isAuthorized = true
+ }
+
+ // assign request number
+ const no = this._getNextOutgoingRequestNo()
+ if (this._requestPromises[no] !== undefined) {
+ this.logger.error(`sendRequest: next request's No is ${no}, found a promise awaiting response with the same no, rejecting...`)
+ this._requestPromises[no].reject(new Error(`this should not happen, but another request needs this number (${no})`))
+ delete this._requestPromises[no]
}
- this.emit('message', message)
+ message.setRequestNo(no)
+
+ // send it
+ this.send(message)
+
+ // create and return promise
+ const P = createCallablePromise()
+ this._requestPromises[no] = P
+
+ return P
}
/**
+ * Send any Message
+ *
* @type {Message} data
* @param message
*/
send(message) {
if (!(message instanceof Message))
- throw new Error('send expects Message, got', message)
-
- // TODO set password!
+ throw new Error('send expects Message, got: ' + message)
let json = JSON.stringify(message.getAsObject())
let buf = Buffer.concat([
@@ -391,13 +579,20 @@ class Connection extends EventEmitter {
}
}
+ _getNextOutgoingRequestNo() {
+ this._lastOutgoingRequestNo++;
+ if (this._lastOutgoingRequestNo >= REQUEST_NO_LIMIT)
+ this._lastOutgoingRequestNo = 1
+ return this._lastOutgoingRequestNo
+ }
+
/**
*/
close() {
try {
this.socket.end()
this.socket.destroy()
- this._emitClose()
+ this._handleClose()
} catch (error) {
this.logger.error('close:', error)
}
@@ -406,12 +601,17 @@ class Connection extends EventEmitter {
/**
* @private
*/
- _emitClose() {
- if (this._closeEmitted)
- return
+ _handleClose() {
+ if (!this._closeEmitted) {
+ this._closeEmitted = true
+ this.emit('close')
+ }
+
+ for (const no in this._requestPromises) {
+ this._requestPromises[no].reject(new Error('socket is closed'))
+ }
- this._closeEmitted = true
- this.emit('close')
+ this._requestPromises = {}
}
onConnect = () => {
@@ -433,12 +633,12 @@ class Connection extends EventEmitter {
}
onClose = (hadError) => {
- this._emitClose()
+ this._handleClose()
this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
- this._emitClose()
+ this._handleClose()
this.logger.warn(`socket error:`, error)
}
@@ -448,5 +648,7 @@ module.exports = {
Server,
Connection,
RequestMessage,
- ResponseMessage
+ ResponseMessage,
+ PingMessage,
+ PongMessage,
} \ No newline at end of file
diff --git a/src/lib/util.js b/src/lib/util.js
index 6de07e5..a4e0025 100644
--- a/src/lib/util.js
+++ b/src/lib/util.js
@@ -5,5 +5,26 @@ module.exports = {
isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n)
+ },
+
+ /**
+ * Creates a Promise that can be resolved or rejected from the outside
+ *
+ * @param {Function} abortCallback
+ * @return {Promise}
+ */
+ createCallablePromise(abortCallback = null) {
+ let promise, resolve, reject
+ promise = new Promise(function(_resolve, _reject) {
+ resolve = _resolve
+ reject = _reject
+ })
+ promise.resolve = function(result) {
+ resolve(result)
+ }
+ promise.reject = function(result) {
+ reject(result)
+ }
+ return promise
}
-} \ No newline at end of file
+}
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
index 620f711..ed436e0 100644
--- a/src/lib/workers-list.js
+++ b/src/lib/workers-list.js
@@ -1,7 +1,7 @@
const intersection = require('lodash/intersection')
const config = require('./config')
const {getLogger} = require('./logger')
-const {RequestMessage} = require('./server')
+const {RequestMessage, PingMessage} = require('./server')
const throttle = require('lodash/throttle')
class WorkersList {
@@ -136,7 +136,7 @@ class WorkersList {
this.workers
.forEach(w => {
this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
- w.connection.send(new RequestMessage('ping'))
+ w.connection.send(new PingMessage())
})
}