diff options
Diffstat (limited to 'src/lib/server.js')
-rw-r--r-- | src/lib/server.js | 320 |
1 files changed, 261 insertions, 59 deletions
diff --git a/src/lib/server.js b/src/lib/server.js index 08eadca..ca06a6d 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -1,14 +1,21 @@ const net = require('net') const EventEmitter = require('events') const {getLogger} = require('./logger') -const isObject = require('lodash/isObject') +const random = require('lodash/random') +const config = require('./config') +const {createCallablePromise} = require('./util') +const {validateMessageData} = require('./data-validator') const EOT = 0x04 +const REQUEST_NO_LIMIT = 999999 + class Message { static REQUEST = 0 static RESPONSE = 1 + static PING = 2 + static PONG = 3 /** * @param {number} type @@ -20,37 +27,61 @@ class Message { this.type = type } + /** + * @return {array} + */ getAsObject() { return [this.type] } - } class ResponseMessage extends Message { - constructor() { + /** + * @param {number} requestNo + */ + constructor(requestNo) { super(Message.RESPONSE) + this.requestNo = requestNo this.error = null this.data = null } + /** + * @param {string} error + * @return {ResponseMessage} + */ setError(error) { this.error = error return this } + /** + * @param data + * @return {ResponseMessage} + */ setData(data) { this.data = data return this } + /** + * @return {array} + */ getAsObject() { + let response = { + no: this.requestNo + } + + if (this.error !== null) + response.error = this.error + + if (this.data !== null) + response.data = this.data + return [ ...super.getAsObject(), - [ - this.error, - this.data - ] + response ] } } @@ -77,16 +108,28 @@ class RequestMessage extends Message { * @type {null|string} */ this.password = null + + /** + * @type {null|number} + */ + this.requestNo = null } + /** + * @return {array} + */ getAsObject() { let request = { + no: this.requestNo, type: this.requestType } if (this.requestData) request.data = this.requestData + if (this.password) + request.password = this.password + return [ ...super.getAsObject(), request @@ -99,8 +142,28 @@ class RequestMessage extends Message { setPassword(password) { this.password = password } + + /** + * @param {number} no + */ + setRequestNo(no) { + this.requestNo = no + } } +class PingMessage extends Message { + constructor() { + super(Message.PING) + } +} + +class PongMessage extends Message { + constructor() { + super(Message.PONG) + } +} + + class Server extends EventEmitter { constructor() { @@ -137,14 +200,10 @@ class Server extends EventEmitter { onConnection = (socket) => { let connection = new Connection() connection.setSocket(socket) - connection.on('message', (message) => { - this.emit('message', { - message, - connection - }) - }) this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + + this.emit('new-connection', connection) } onListening = () => { @@ -158,6 +217,7 @@ class Server extends EventEmitter { } + class Connection extends EventEmitter { constructor() { @@ -190,9 +250,32 @@ class Connection extends EventEmitter { this.remotePort = null /** - * @type {null|number} + * @type {boolean} + * @private + */ + this._isAuthorized = config.get('password') === '' + + /** + * @type {boolean} + * @private */ - this.id = null + this._textConversationAllowed = false + + /** + * @type {boolean} + */ + this._isOutgoing = false + + /** + * @type {number} + */ + this._lastOutgoingRequestNo = random(0, REQUEST_NO_LIMIT) + + /** + * @type {object.<number, Promise>} + * @private + */ + this._requestPromises = {} this._setLogger() } @@ -205,13 +288,14 @@ class Connection extends EventEmitter { if (this.socket !== null) throw new Error(`this Connection already has a socket`) + this._isOutgoing = true + this.socket = new net.Socket() this.socket.connect({host, port}) this.remoteAddress = host this.remotePort = port - this._setId() this._setLogger() this._setSocketEvents() } @@ -225,7 +309,11 @@ class Connection extends EventEmitter { this.remoteAddress = socket.remoteAddress this.remotePort = socket.remotePort - this._setId() + if (this.remoteAddress === '127.0.0.1') { + this._isAuthorized = true + this._textConversationAllowed = true + } + this._setLogger() this._setSocketEvents() } @@ -235,14 +323,7 @@ class Connection extends EventEmitter { */ _setLogger() { let addr = this.socket ? this.remoteAddr() : '?' - this.logger = getLogger(`<Connection ${this.id} ${addr}>`) - } - - /** - * @private - */ - _setId() { - this.id = Math.floor(Math.random() * 10000) + this.logger = getLogger(`<Connection ${addr}>`) } /** @@ -304,76 +385,183 @@ class Connection extends EventEmitter { this.logger.trace(`processChunks (after parsing):`, this.data) - for (let message of messages) { + for (let rawMessage of messages) { try { - let buf = message.toString('utf-8') + let buf = rawMessage.toString('utf-8') this.logger.debug(buf) let json = JSON.parse(buf) - this._emitMessage(json) + + // try to parse the message + let message + try { + message = this._parseMessage(json) + } catch (e) { + // message is malformed + this.logger.error(e.message) + + // send error to the other size + this.send( + new ResponseMessage(0).setError(e.message) + ) + + continue + } + + if (message instanceof PingMessage) { + this.send(new PongMessage()) + continue + } + + if (message instanceof PongMessage) + continue + + if (message instanceof RequestMessage) { + if (!this._isAuthorized) { + if (message.password !== config.get('password')) { + this.send(new ResponseMessage(message.requestNo).setError('invalid password')) + this.close() + break + } + + this._isAuthorized = true + } + + this.emit('request-message', message, this) + } + + if (message instanceof ResponseMessage) { + if (message.requestNo in this._requestPromises) { + const P = this._requestPromises[message.requestNo] + delete this._requestPromises[message.requestNo] + + P.resolve(message) + } else { + this.logger.warn('received unexpected Response message:', message) + } + } } catch (error) { this.logger.error('failed to parse data as JSON') - this.logger.debug(message) + this.logger.debug(rawMessage) } } } /** + * Parse incoming message + * * @param {object} json + * @return {Message} * @private + * @throws Error */ - _emitMessage(json) { - if (!Array.isArray(json)) { - this.logger.error('malformed message, JSON array expected', json) - return - } + _parseMessage(json) { + if (!Array.isArray(json)) + throw new Error('JSON array expected, got: ' + json) let type = json.shift() let message switch (type) { case Message.REQUEST: { let data = json.shift() - if (!data || !isObject(data)) { - this.logger.error('malformed REQUEST message') - return + + try { + validateMessageData(data, [ + // name type required + ['type', 's', true], + ['no', 'i', true], + ['password', 's', false], + ['data', 'snoa', false] + ]) + } catch (e) { + throw new Error(`malformed REQUEST message: ${e.message}`) } message = new RequestMessage(data.type, data.data || null) + message.setRequestNo(data.no) if (data.password) message.setPassword(data.password) - break + + return message } case Message.RESPONSE: { let data = json.shift() - if (!data || !Array.isArray(data) || data.length < 2) { - this.logger.error('malformed RESPONSE message') - return + + try { + validateMessageData(data, [ + // name type required + ['no', 'i', true], + ['data', 'snoa', false], + ['error', 's', false], + ]) + } catch (e) { + throw new Error(`malformed RESPONSE message: ${e.message}`) } - message = new ResponseMessage() - message.setError(data[0]).setData(data[1]) + message = new ResponseMessage(data.no) + message.setError(data.error || null) + .setData(data.data || null) - break + return message } + case Message.PING: + return new PingMessage() + + case Message.PONG: + return new PongMessage() + default: - this.logger.error(`malformed message, unexpected type ${type}`) - return + throw new Error(`unexpected type ${type}`) + } + } + + /** + * Send request + * + * @param {RequestMessage} message + * @return {Promise} + */ + sendRequest(message) { + if (!(message instanceof RequestMessage)) + throw new Error('sendRequest only accepts RequestMessage, got:', message) + + // send password once (when talking to jobd-master) + if (!this._isAuthorized) { + message.setPassword(config.get('password')) + this._isAuthorized = true + } + + // assign request number + const no = this._getNextOutgoingRequestNo() + if (this._requestPromises[no] !== undefined) { + this.logger.error(`sendRequest: next request's No is ${no}, found a promise awaiting response with the same no, rejecting...`) + this._requestPromises[no].reject(new Error(`this should not happen, but another request needs this number (${no})`)) + delete this._requestPromises[no] } - this.emit('message', message) + message.setRequestNo(no) + + // send it + this.send(message) + + // create and return promise + const P = createCallablePromise() + this._requestPromises[no] = P + + return P } /** + * Send any Message + * * @type {Message} data * @param message */ send(message) { if (!(message instanceof Message)) - throw new Error('send expects Message, got', message) - - // TODO set password! + throw new Error('send expects Message, got: ' + message) let json = JSON.stringify(message.getAsObject()) let buf = Buffer.concat([ @@ -391,13 +579,20 @@ class Connection extends EventEmitter { } } + _getNextOutgoingRequestNo() { + this._lastOutgoingRequestNo++; + if (this._lastOutgoingRequestNo >= REQUEST_NO_LIMIT) + this._lastOutgoingRequestNo = 1 + return this._lastOutgoingRequestNo + } + /** */ close() { try { this.socket.end() this.socket.destroy() - this._emitClose() + this._handleClose() } catch (error) { this.logger.error('close:', error) } @@ -406,12 +601,17 @@ class Connection extends EventEmitter { /** * @private */ - _emitClose() { - if (this._closeEmitted) - return + _handleClose() { + if (!this._closeEmitted) { + this._closeEmitted = true + this.emit('close') + } + + for (const no in this._requestPromises) { + this._requestPromises[no].reject(new Error('socket is closed')) + } - this._closeEmitted = true - this.emit('close') + this._requestPromises = {} } onConnect = () => { @@ -433,12 +633,12 @@ class Connection extends EventEmitter { } onClose = (hadError) => { - this._emitClose() + this._handleClose() this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) } onError = (error) => { - this._emitClose() + this._handleClose() this.logger.warn(`socket error:`, error) } @@ -448,5 +648,7 @@ module.exports = { Server, Connection, RequestMessage, - ResponseMessage + ResponseMessage, + PingMessage, + PongMessage, }
\ No newline at end of file |