From 8783de4fe5945a7ea22995c25a3e546c0426c7f6 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Mon, 1 Mar 2021 02:05:37 +0300 Subject: update protocol --- src/jobd-master.js | 64 ++++++---- src/jobd.js | 107 +++++++++------- src/lib/config.js | 2 +- src/lib/data-validator.js | 71 ++++++++++ src/lib/server.js | 320 +++++++++++++++++++++++++++++++++++++--------- src/lib/util.js | 23 +++- src/lib/workers-list.js | 4 +- 7 files changed, 454 insertions(+), 137 deletions(-) create mode 100644 src/lib/data-validator.js (limited to 'src') diff --git a/src/jobd-master.js b/src/jobd-master.js index 608750d..d6bc09a 100755 --- a/src/jobd-master.js +++ b/src/jobd-master.js @@ -65,7 +65,9 @@ async function main() { // start server server = new Server() - server.on('message', onMessage) + server.on('new-connection', (connection) => { + connection.on('request-message', onRequestMessage) + }) server.start(config.get('port'), config.get('host')) logger.info('server started') } @@ -75,65 +77,71 @@ async function main() { * @param {Connection} connection * @return {Promise<*>} */ -async function onMessage({message, connection}) { +async function onRequestMessage(message, connection) { try { - if (!(message instanceof RequestMessage)) { - logger.debug('ignoring message', message) - return - } - - if (message.requestType !== 'ping') - logger.info('onMessage:', message) - - if (config.get('password') && message.password !== config.get('password')) { - connection.send(new ResponseMessage().setError('invalid password')) - return connection.close() - } + logger.info('onMessage:', message) switch (message.requestType) { - case 'ping': - connection.send(new ResponseMessage().setError('pong')) - break - case 'register-worker': { const targets = message.requestData?.targets || [] if (!targets.length) { - connection.send(new ResponseMessage().setError(`targets are empty`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`targets are empty`) + ) break } workers.add(connection, targets) - connection.send(new ResponseMessage().setData('ok')) + connection.send( + new ResponseMessage(message.requestNo) + .setData('ok') + ) break } case 'poke': { const targets = message.requestData?.targets || [] if (!targets.length) { - connection.send(new ResponseMessage().setError(`targets are empty`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`targets are empty`) + ) break } workers.poke(targets) - connection.send(new ResponseMessage().setData('ok')) + connection.send( + new ResponseMessage(message.requestNo) + .setData('ok') + ) break } case 'status': const info = workers.getInfo() - connection.send(new ResponseMessage().setData({ - workers: info, - memoryUsage: process.memoryUsage() - })) + connection.send( + new ResponseMessage(message.requestNo) + .setData({ + workers: info, + memoryUsage: process.memoryUsage() + }) + ) break default: - connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`unknown request type: '${message.requestType}'`) + ) break } } catch (error) { logger.error(`error while handling message:`, message, error) - connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + connection.send( + new ResponseMessage(message.requestNo) + .setError('server error: ' + error?.message) + ) } } diff --git a/src/jobd.js b/src/jobd.js index a175b63..0df4e70 100755 --- a/src/jobd.js +++ b/src/jobd.js @@ -22,7 +22,7 @@ let logger let server /** - * @type {object.} + * @type {object.} */ let jobDoneAwaiters = {} @@ -90,8 +90,14 @@ async function main() { } worker.on('job-done', (data) => { if (jobDoneAwaiters[data.id] !== undefined) { - jobDoneAwaiters[data.id].send(new ResponseMessage().setData(data)) - jobDoneAwaiters[data.id].close() + const {connection, requestNo} = jobDoneAwaiters[data.id] + + connection.send( + new ResponseMessage(requestNo) + .setData(data) + ) + connection.close() + delete jobDoneAwaiters[data.id] } }) @@ -99,7 +105,9 @@ async function main() { // start server server = new Server() - server.on('message', onMessage) + server.on('new-connection', (connection) => { + connection.on('request-message', onRequestMessage) + }) server.start(config.get('port'), config.get('host')) logger.info('server started') @@ -114,36 +122,27 @@ async function main() { * @param {Connection} connection * @return {Promise<*>} */ -async function onMessage({message, connection}) { +async function onRequestMessage(message, connection) { try { - if (!(message instanceof RequestMessage)) { - logger.debug('ignoring message', message) - return - } - - if (message.requestType !== 'ping') - logger.info('onMessage:', message) - - if (config.get('password') && message.password !== config.get('password')) { - connection.send(new ResponseMessage().setError('invalid password')) - return connection.close() - } + logger.info('onMessage:', message) switch (message.requestType) { - case 'ping': - connection.send(new ResponseMessage().setData('pong')) - break - case 'poll': const targets = message.requestData?.targets || [] if (!targets.length) { - connection.send(new ResponseMessage().setError('empty targets')) + connection.send( + new ResponseMessage(message.requestNo) + .setError('empty targets') + ) break } for (const t of targets) { if (!worker.hasTarget(t)) { - connection.send(new ResponseMessage().setError(`invalid target '${t}'`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`invalid target '${t}'`) + ) break } } @@ -151,28 +150,38 @@ async function onMessage({message, connection}) { worker.setPollTargets(targets) worker.poll() - connection.send(new ResponseMessage().setData('ok')); + connection.send( + new ResponseMessage(message.requestNo) + .setData('ok') + ) break case 'status': const qs = worker.getStatus() connection.send( - new ResponseMessage().setData({ - queue: qs, - jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, - memoryUsage: process.memoryUsage() - }) + new ResponseMessage(message.requestNo) + .setData({ + queue: qs, + jobDoneAwaitersCount: Object.keys(jobDoneAwaiters).length, + memoryUsage: process.memoryUsage() + }) ) break case 'run-manual': const {id} = message.requestData if (id in jobDoneAwaiters) { - connection.send(new ResponseMessage().setError('another client is already waiting this job')) + connection.send( + new ResponseMessage(message.requestNo) + .setError('another client is already waiting this job') + ) break } - jobDoneAwaiters[id] = connection + jobDoneAwaiters[id] = { + connection, + requestNo: message.requestNo + } const {accepted, error} = await worker.getTasks(null, STATUS_MANUAL, {id}) if (!accepted) { @@ -181,18 +190,28 @@ async function onMessage({message, connection}) { let message = 'failed to run task' if (typeof error === 'string') message += `: ${error}` - connection.send(new ResponseMessage().setError(message)) + + connection.send( + new ResponseMessage(message.requestNo) + .setError(message) + ) } break default: - connection.send(new ResponseMessage().setError(`unknown request type: '${message.requestType}'`)) + connection.send( + new ResponseMessage(message.requestNo) + .setError(`unknown request type: '${message.requestType}'`) + ) break } } catch (error) { logger.error(`error while handling message:`, message, error) - connection.send(new ResponseMessage().setError('server error: ' + error?.message)) + connection.send( + new ResponseMessage(message.requestNo) + .setError('server error: ' + error?.message) + ) } } @@ -202,11 +221,17 @@ function connectToMaster() { connection.connect(config.get('master_host'), config.get('master_port')) connection.on('connect', function() { - connection.send( + connection.sendRequest( new RequestMessage('register-worker', { targets: worker.getTargets() }) ) + .then(response => { + logger.debug('connectToMaster: response:', response) + }) + .catch(error => { + logger.error('connectToMaster: error while awaiting response:', error) + }) }) connection.on('close', () => { @@ -216,17 +241,7 @@ function connectToMaster() { }, config.get('master_reconnect_timeout') * 1000) }) - connection.on('message', (message) => { - if (!(message instanceof RequestMessage)) { - logger.debug('message from master is not a request, hmm... skipping', message) - return - } - - onMessage({message, connection}) - .catch((error) => { - logger.error('connectToMaster: onMessage:', error) - }) - }) + connection.on('request-message', onRequestMessage) } diff --git a/src/lib/config.js b/src/lib/config.js index a59615e..eb1135d 100644 --- a/src/lib/config.js +++ b/src/lib/config.js @@ -112,7 +112,7 @@ function parseMasterConfig(file) { } /** - * @param {string} key + * @param {string|null} key * @return {string|number|object} */ function get(key = null) { diff --git a/src/lib/data-validator.js b/src/lib/data-validator.js new file mode 100644 index 0000000..276ea9b --- /dev/null +++ b/src/lib/data-validator.js @@ -0,0 +1,71 @@ +const {isInteger, isObject} = require('lodash') +const {getLogger} = require('./logger') + +const typeNames = { + 'i': 'integer', + 'n': 'number', + 's': 'string', + 'o': 'object', + 'a': 'array', +} + +const logger = getLogger('data-validator') + +/** + * @param {string} expectedType + * @param value + */ +function checkType(expectedType, value) { + switch (expectedType) { + case 'i': + return isInteger(value) + case 'n': + return typeof value === 'number' + case 's': + return typeof value === 'string' + case 'o': + return typeof value === 'object' + case 'a': + return Array.isArray(value) + default: + logger.error(`checkType: unknown type ${expectedType}`) + return false + } +} + +/** + * @param {object} data + * @param {array} schema + */ +function validateMessageData(data, schema) { + if (!isObject(data)) + throw new Error(`data is not an object`) + + for (const field of schema) { + let [name, types, required] = field + if (!(name in data)) { + if (required) + throw new Error(`missing required field ${name}`) + + continue + } + + types = types.split('') + + if (!types + .map(type => checkType(type, data[name])) + .some(result => result === true)) { + + let error = `'${name}' must be ` + if (types.length === 1) { + error += typeNames[types[0]] + } else { + error += 'any of: ' + types.map(t => typeNames[t]).join(', ') + } + + throw new Error(error) + } + } +} + +module.exports = {validateMessageData} \ No newline at end of file diff --git a/src/lib/server.js b/src/lib/server.js index 08eadca..ca06a6d 100644 --- a/src/lib/server.js +++ b/src/lib/server.js @@ -1,14 +1,21 @@ const net = require('net') const EventEmitter = require('events') const {getLogger} = require('./logger') -const isObject = require('lodash/isObject') +const random = require('lodash/random') +const config = require('./config') +const {createCallablePromise} = require('./util') +const {validateMessageData} = require('./data-validator') const EOT = 0x04 +const REQUEST_NO_LIMIT = 999999 + class Message { static REQUEST = 0 static RESPONSE = 1 + static PING = 2 + static PONG = 3 /** * @param {number} type @@ -20,37 +27,61 @@ class Message { this.type = type } + /** + * @return {array} + */ getAsObject() { return [this.type] } - } class ResponseMessage extends Message { - constructor() { + /** + * @param {number} requestNo + */ + constructor(requestNo) { super(Message.RESPONSE) + this.requestNo = requestNo this.error = null this.data = null } + /** + * @param {string} error + * @return {ResponseMessage} + */ setError(error) { this.error = error return this } + /** + * @param data + * @return {ResponseMessage} + */ setData(data) { this.data = data return this } + /** + * @return {array} + */ getAsObject() { + let response = { + no: this.requestNo + } + + if (this.error !== null) + response.error = this.error + + if (this.data !== null) + response.data = this.data + return [ ...super.getAsObject(), - [ - this.error, - this.data - ] + response ] } } @@ -77,16 +108,28 @@ class RequestMessage extends Message { * @type {null|string} */ this.password = null + + /** + * @type {null|number} + */ + this.requestNo = null } + /** + * @return {array} + */ getAsObject() { let request = { + no: this.requestNo, type: this.requestType } if (this.requestData) request.data = this.requestData + if (this.password) + request.password = this.password + return [ ...super.getAsObject(), request @@ -99,8 +142,28 @@ class RequestMessage extends Message { setPassword(password) { this.password = password } + + /** + * @param {number} no + */ + setRequestNo(no) { + this.requestNo = no + } } +class PingMessage extends Message { + constructor() { + super(Message.PING) + } +} + +class PongMessage extends Message { + constructor() { + super(Message.PONG) + } +} + + class Server extends EventEmitter { constructor() { @@ -137,14 +200,10 @@ class Server extends EventEmitter { onConnection = (socket) => { let connection = new Connection() connection.setSocket(socket) - connection.on('message', (message) => { - this.emit('message', { - message, - connection - }) - }) this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + + this.emit('new-connection', connection) } onListening = () => { @@ -158,6 +217,7 @@ class Server extends EventEmitter { } + class Connection extends EventEmitter { constructor() { @@ -190,9 +250,32 @@ class Connection extends EventEmitter { this.remotePort = null /** - * @type {null|number} + * @type {boolean} + * @private + */ + this._isAuthorized = config.get('password') === '' + + /** + * @type {boolean} + * @private */ - this.id = null + this._textConversationAllowed = false + + /** + * @type {boolean} + */ + this._isOutgoing = false + + /** + * @type {number} + */ + this._lastOutgoingRequestNo = random(0, REQUEST_NO_LIMIT) + + /** + * @type {object.} + * @private + */ + this._requestPromises = {} this._setLogger() } @@ -205,13 +288,14 @@ class Connection extends EventEmitter { if (this.socket !== null) throw new Error(`this Connection already has a socket`) + this._isOutgoing = true + this.socket = new net.Socket() this.socket.connect({host, port}) this.remoteAddress = host this.remotePort = port - this._setId() this._setLogger() this._setSocketEvents() } @@ -225,7 +309,11 @@ class Connection extends EventEmitter { this.remoteAddress = socket.remoteAddress this.remotePort = socket.remotePort - this._setId() + if (this.remoteAddress === '127.0.0.1') { + this._isAuthorized = true + this._textConversationAllowed = true + } + this._setLogger() this._setSocketEvents() } @@ -235,14 +323,7 @@ class Connection extends EventEmitter { */ _setLogger() { let addr = this.socket ? this.remoteAddr() : '?' - this.logger = getLogger(``) - } - - /** - * @private - */ - _setId() { - this.id = Math.floor(Math.random() * 10000) + this.logger = getLogger(``) } /** @@ -304,76 +385,183 @@ class Connection extends EventEmitter { this.logger.trace(`processChunks (after parsing):`, this.data) - for (let message of messages) { + for (let rawMessage of messages) { try { - let buf = message.toString('utf-8') + let buf = rawMessage.toString('utf-8') this.logger.debug(buf) let json = JSON.parse(buf) - this._emitMessage(json) + + // try to parse the message + let message + try { + message = this._parseMessage(json) + } catch (e) { + // message is malformed + this.logger.error(e.message) + + // send error to the other size + this.send( + new ResponseMessage(0).setError(e.message) + ) + + continue + } + + if (message instanceof PingMessage) { + this.send(new PongMessage()) + continue + } + + if (message instanceof PongMessage) + continue + + if (message instanceof RequestMessage) { + if (!this._isAuthorized) { + if (message.password !== config.get('password')) { + this.send(new ResponseMessage(message.requestNo).setError('invalid password')) + this.close() + break + } + + this._isAuthorized = true + } + + this.emit('request-message', message, this) + } + + if (message instanceof ResponseMessage) { + if (message.requestNo in this._requestPromises) { + const P = this._requestPromises[message.requestNo] + delete this._requestPromises[message.requestNo] + + P.resolve(message) + } else { + this.logger.warn('received unexpected Response message:', message) + } + } } catch (error) { this.logger.error('failed to parse data as JSON') - this.logger.debug(message) + this.logger.debug(rawMessage) } } } /** + * Parse incoming message + * * @param {object} json + * @return {Message} * @private + * @throws Error */ - _emitMessage(json) { - if (!Array.isArray(json)) { - this.logger.error('malformed message, JSON array expected', json) - return - } + _parseMessage(json) { + if (!Array.isArray(json)) + throw new Error('JSON array expected, got: ' + json) let type = json.shift() let message switch (type) { case Message.REQUEST: { let data = json.shift() - if (!data || !isObject(data)) { - this.logger.error('malformed REQUEST message') - return + + try { + validateMessageData(data, [ + // name type required + ['type', 's', true], + ['no', 'i', true], + ['password', 's', false], + ['data', 'snoa', false] + ]) + } catch (e) { + throw new Error(`malformed REQUEST message: ${e.message}`) } message = new RequestMessage(data.type, data.data || null) + message.setRequestNo(data.no) if (data.password) message.setPassword(data.password) - break + + return message } case Message.RESPONSE: { let data = json.shift() - if (!data || !Array.isArray(data) || data.length < 2) { - this.logger.error('malformed RESPONSE message') - return + + try { + validateMessageData(data, [ + // name type required + ['no', 'i', true], + ['data', 'snoa', false], + ['error', 's', false], + ]) + } catch (e) { + throw new Error(`malformed RESPONSE message: ${e.message}`) } - message = new ResponseMessage() - message.setError(data[0]).setData(data[1]) + message = new ResponseMessage(data.no) + message.setError(data.error || null) + .setData(data.data || null) - break + return message } + case Message.PING: + return new PingMessage() + + case Message.PONG: + return new PongMessage() + default: - this.logger.error(`malformed message, unexpected type ${type}`) - return + throw new Error(`unexpected type ${type}`) + } + } + + /** + * Send request + * + * @param {RequestMessage} message + * @return {Promise} + */ + sendRequest(message) { + if (!(message instanceof RequestMessage)) + throw new Error('sendRequest only accepts RequestMessage, got:', message) + + // send password once (when talking to jobd-master) + if (!this._isAuthorized) { + message.setPassword(config.get('password')) + this._isAuthorized = true + } + + // assign request number + const no = this._getNextOutgoingRequestNo() + if (this._requestPromises[no] !== undefined) { + this.logger.error(`sendRequest: next request's No is ${no}, found a promise awaiting response with the same no, rejecting...`) + this._requestPromises[no].reject(new Error(`this should not happen, but another request needs this number (${no})`)) + delete this._requestPromises[no] } - this.emit('message', message) + message.setRequestNo(no) + + // send it + this.send(message) + + // create and return promise + const P = createCallablePromise() + this._requestPromises[no] = P + + return P } /** + * Send any Message + * * @type {Message} data * @param message */ send(message) { if (!(message instanceof Message)) - throw new Error('send expects Message, got', message) - - // TODO set password! + throw new Error('send expects Message, got: ' + message) let json = JSON.stringify(message.getAsObject()) let buf = Buffer.concat([ @@ -391,13 +579,20 @@ class Connection extends EventEmitter { } } + _getNextOutgoingRequestNo() { + this._lastOutgoingRequestNo++; + if (this._lastOutgoingRequestNo >= REQUEST_NO_LIMIT) + this._lastOutgoingRequestNo = 1 + return this._lastOutgoingRequestNo + } + /** */ close() { try { this.socket.end() this.socket.destroy() - this._emitClose() + this._handleClose() } catch (error) { this.logger.error('close:', error) } @@ -406,12 +601,17 @@ class Connection extends EventEmitter { /** * @private */ - _emitClose() { - if (this._closeEmitted) - return + _handleClose() { + if (!this._closeEmitted) { + this._closeEmitted = true + this.emit('close') + } + + for (const no in this._requestPromises) { + this._requestPromises[no].reject(new Error('socket is closed')) + } - this._closeEmitted = true - this.emit('close') + this._requestPromises = {} } onConnect = () => { @@ -433,12 +633,12 @@ class Connection extends EventEmitter { } onClose = (hadError) => { - this._emitClose() + this._handleClose() this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) } onError = (error) => { - this._emitClose() + this._handleClose() this.logger.warn(`socket error:`, error) } @@ -448,5 +648,7 @@ module.exports = { Server, Connection, RequestMessage, - ResponseMessage + ResponseMessage, + PingMessage, + PongMessage, } \ No newline at end of file diff --git a/src/lib/util.js b/src/lib/util.js index 6de07e5..a4e0025 100644 --- a/src/lib/util.js +++ b/src/lib/util.js @@ -5,5 +5,26 @@ module.exports = { isNumeric(n) { return !isNaN(parseFloat(n)) && isFinite(n) + }, + + /** + * Creates a Promise that can be resolved or rejected from the outside + * + * @param {Function} abortCallback + * @return {Promise} + */ + createCallablePromise(abortCallback = null) { + let promise, resolve, reject + promise = new Promise(function(_resolve, _reject) { + resolve = _resolve + reject = _reject + }) + promise.resolve = function(result) { + resolve(result) + } + promise.reject = function(result) { + reject(result) + } + return promise } -} \ No newline at end of file +} diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js index 620f711..ed436e0 100644 --- a/src/lib/workers-list.js +++ b/src/lib/workers-list.js @@ -1,7 +1,7 @@ const intersection = require('lodash/intersection') const config = require('./config') const {getLogger} = require('./logger') -const {RequestMessage} = require('./server') +const {RequestMessage, PingMessage} = require('./server') const throttle = require('lodash/throttle') class WorkersList { @@ -136,7 +136,7 @@ class WorkersList { this.workers .forEach(w => { this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) - w.connection.send(new RequestMessage('ping')) + w.connection.send(new PingMessage()) }) } -- cgit v1.2.3