From 5e7d34458a6e60487393caa4f320ab1cfc1cf8e5 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 24 Feb 2021 03:59:25 +0300 Subject: initial commit --- src/server.js | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 450 insertions(+) create mode 100644 src/server.js (limited to 'src/server.js') diff --git a/src/server.js b/src/server.js new file mode 100644 index 0000000..b5eca9a --- /dev/null +++ b/src/server.js @@ -0,0 +1,450 @@ +const net = require('net') +const EventEmitter = require('events') +const {getLogger} = require('./logger') +const isObject = require('lodash/isObject') + +const EOT = 0x04 + +class Message { + + static REQUEST = 0 + static RESPONSE = 1 + + /** + * @param {number} type + */ + constructor(type) { + /** + * @type {number} + */ + this.type = type + } + + getAsObject() { + return [this.type] + } + +} + +class ResponseMessage extends Message { + constructor() { + super(Message.RESPONSE) + + this.error = null + this.data = null + } + + setError(error) { + this.error = error + return this + } + + setData(data) { + this.data = data + return this + } + + getAsObject() { + return [ + ...super.getAsObject(), + [ + this.error, + this.data + ] + ] + } +} + +class RequestMessage extends Message { + /** + * @param {string} type + * @param {any} data + */ + constructor(type, data = null) { + super(Message.REQUEST) + + /** + * @type string + */ + this.requestType = type + + /** + * @type any + */ + this.requestData = data + + /** + * @type {null|string} + */ + this.password = null + } + + getAsObject() { + let request = { + type: this.requestType + } + + if (this.requestData) + request.data = this.requestData + + return [ + ...super.getAsObject(), + request + ] + } + + /** + * @param {string} password + */ + setPassword(password) { + this.password = password + } +} + +class Server extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Server} + */ + this.server = null + + /** + * @type {Logger} + */ + this.logger = getLogger('server') + } + + /** + * @param {number} port + * @param {string} host + */ + start(port, host) { + this.server = net.createServer() + + this.server.on('connection', this.onConnection) + this.server.on('error', this.onError) + this.server.on('listening', this.onListening) + + this.server.listen(port, host) + } + + /** + * @param {module:net.Socket} socket + */ + onConnection = (socket) => { + let connection = new Connection() + connection.setSocket(socket) + connection.on('message', (message) => { + this.emit('message', { + message, + connection + }) + }) + + this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`) + } + + onListening = () => { + let addr = this.server.address() + this.logger.info(`server is listening on ${addr.address}:${addr.port}`) + } + + onError = (error) => { + this.logger.error('error: ', error) + } + +} + +class Connection extends EventEmitter { + + constructor() { + super() + + /** + * @type {null|module:net.Socket} + */ + this.socket = null + + /** + * @type {Buffer} + */ + this.data = Buffer.from([]) + + /** + * @type {boolean} + * @private + */ + this._closeEmitted = false + + /** + * @type {null|string} + */ + this.remoteAddress = null + + /** + * @type {null|number} + */ + this.remotePort = null + + /** + * @type {null|number} + */ + this.id = null + + this._setLogger() + } + + /** + * @param {string} host + * @param {number} port + */ + connect(host, port) { + if (this.socket !== null) + throw new Error(`this Connection already has a socket`) + + this.socket = new net.Socket() + this.socket.connect({host, port}) + + this.remoteAddress = host + this.remotePort = port + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @param {module:net.Socket} socket + */ + setSocket(socket) { + this.socket = socket + + this.remoteAddress = socket.remoteAddress + this.remotePort = socket.remotePort + + this._setId() + this._setLogger() + this._setSocketEvents() + } + + /** + * @private + */ + _setLogger() { + let addr = this.socket ? this.remoteAddr() : '?' + this.logger = getLogger(``) + } + + /** + * @private + */ + _setId() { + this.id = Math.floor(Math.random() * 10000) + } + + /** + * @private + */ + _setSocketEvents() { + this.socket.on('connect', this.onConnect) + this.socket.on('data', this.onData) + this.socket.on('end', this.onEnd) + this.socket.on('close', this.onClose) + this.socket.on('error', this.onError) + } + + /** + * @param {Buffer} data + * @private + */ + _appendToBuffer(data) { + this.data = Buffer.concat([this.data, data]) + } + + /** + * @return {string} + */ + remoteAddr() { + return this.remoteAddress + ':' + this.remotePort + } + + /** + * @private + */ + _processChunks() { + if (!this.data.length) + return + + this.logger.trace(`processChunks (start):`, this.data) + + /** + * @type {Buffer[]} + */ + let messages = [] + let offset = 0 + let eotPos + do { + eotPos = this.data.indexOf(EOT, offset) + if (eotPos !== -1) { + let message = this.data.slice(offset, eotPos) + messages.push(message) + + this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`) + offset = eotPos + 1 + } + } while (eotPos !== -1 && offset < this.data.length-1) + + if (offset !== 0) { + this.data = this.data.slice(offset) + this.logger.trace(`processChunks: slicing data from ${offset}`) + } + + this.logger.trace(`processChunks (after parsing):`, this.data) + + for (let message of messages) { + try { + let buf = message.toString('utf-8') + this.logger.debug(buf) + + let json = JSON.parse(buf) + this._emitMessage(json) + } catch (error) { + this.logger.error('failed to parse data as JSON') + this.logger.debug(message) + } + } + } + + /** + * @param {object} json + * @private + */ + _emitMessage(json) { + if (!Array.isArray(json)) { + this.logger.error('malformed message, JSON array expected', json) + return + } + + let type = json.shift() + let message + switch (type) { + case Message.REQUEST: { + let data = json.shift() + if (!data || !isObject(data)) { + this.logger.error('malformed REQUEST message') + return + } + + message = new RequestMessage(data.type, data.data || null) + if (data.password) + message.setPassword(data.password) + break + } + + case Message.RESPONSE: { + let data = json.shift() + if (!data || !Array.isArray(data) || data.length < 2) { + this.logger.error('malformed RESPONSE message') + return + } + + message = new ResponseMessage() + message.setError(data[0]).setData(data[1]) + + break + } + + default: + this.logger.error(`malformed message, unexpected type ${type}`) + return + } + + this.emit('message', message) + } + + /** + * @type {Message} data + * @param message + */ + send(message) { + if (!(message instanceof Message)) + throw new Error('send expects Message, got', message) + + let json = JSON.stringify(message.getAsObject()) + let buf = Buffer.concat([ + Buffer.from(json), + Buffer.from([EOT]) + ]) + + this.logger.debug('send:', json) + this.logger.trace('send:', buf) + + try { + this.socket.write(buf) + } catch (error) { + this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error) + } + } + + /** + */ + close() { + try { + this.socket.end() + this.socket.destroy() + this._emitClose() + } catch (error) { + this.logger.error('close:', error) + } + } + + /** + * @private + */ + _emitClose() { + if (this._closeEmitted) + return + + this._closeEmitted = true + this.emit('close') + } + + onConnect = () => { + this.logger.debug('connection established') + this.emit('connect') + } + + onData = (data) => { + this.logger.trace('onData', data) + this._appendToBuffer(data) + this._processChunks() + } + + onEnd = (data) => { + if (data) + this._appendToBuffer(data) + + this._processChunks() + } + + onClose = (hadError) => { + this._emitClose() + this.logger.debug(`socket closed` + (hadError ? ` with error` : '')) + } + + onError = (error) => { + this._emitClose() + this.logger.warn(`socket error:`, error) + } + +} + +module.exports = { + Server, + Connection, + RequestMessage, + ResponseMessage +} \ No newline at end of file -- cgit v1.2.3