aboutsummaryrefslogtreecommitdiff
path: root/src/server.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.js')
-rw-r--r--src/server.js450
1 files changed, 450 insertions, 0 deletions
diff --git a/src/server.js b/src/server.js
new file mode 100644
index 0000000..b5eca9a
--- /dev/null
+++ b/src/server.js
@@ -0,0 +1,450 @@
+const net = require('net')
+const EventEmitter = require('events')
+const {getLogger} = require('./logger')
+const isObject = require('lodash/isObject')
+
+const EOT = 0x04
+
+class Message {
+
+ static REQUEST = 0
+ static RESPONSE = 1
+
+ /**
+ * @param {number} type
+ */
+ constructor(type) {
+ /**
+ * @type {number}
+ */
+ this.type = type
+ }
+
+ getAsObject() {
+ return [this.type]
+ }
+
+}
+
+class ResponseMessage extends Message {
+ constructor() {
+ super(Message.RESPONSE)
+
+ this.error = null
+ this.data = null
+ }
+
+ setError(error) {
+ this.error = error
+ return this
+ }
+
+ setData(data) {
+ this.data = data
+ return this
+ }
+
+ getAsObject() {
+ return [
+ ...super.getAsObject(),
+ [
+ this.error,
+ this.data
+ ]
+ ]
+ }
+}
+
+class RequestMessage extends Message {
+ /**
+ * @param {string} type
+ * @param {any} data
+ */
+ constructor(type, data = null) {
+ super(Message.REQUEST)
+
+ /**
+ * @type string
+ */
+ this.requestType = type
+
+ /**
+ * @type any
+ */
+ this.requestData = data
+
+ /**
+ * @type {null|string}
+ */
+ this.password = null
+ }
+
+ getAsObject() {
+ let request = {
+ type: this.requestType
+ }
+
+ if (this.requestData)
+ request.data = this.requestData
+
+ return [
+ ...super.getAsObject(),
+ request
+ ]
+ }
+
+ /**
+ * @param {string} password
+ */
+ setPassword(password) {
+ this.password = password
+ }
+}
+
+class Server extends EventEmitter {
+
+ constructor() {
+ super()
+
+ /**
+ * @type {null|module:net.Server}
+ */
+ this.server = null
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('server')
+ }
+
+ /**
+ * @param {number} port
+ * @param {string} host
+ */
+ start(port, host) {
+ this.server = net.createServer()
+
+ this.server.on('connection', this.onConnection)
+ this.server.on('error', this.onError)
+ this.server.on('listening', this.onListening)
+
+ this.server.listen(port, host)
+ }
+
+ /**
+ * @param {module:net.Socket} socket
+ */
+ onConnection = (socket) => {
+ let connection = new Connection()
+ connection.setSocket(socket)
+ connection.on('message', (message) => {
+ this.emit('message', {
+ message,
+ connection
+ })
+ })
+
+ this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
+ }
+
+ onListening = () => {
+ let addr = this.server.address()
+ this.logger.info(`server is listening on ${addr.address}:${addr.port}`)
+ }
+
+ onError = (error) => {
+ this.logger.error('error: ', error)
+ }
+
+}
+
+class Connection extends EventEmitter {
+
+ constructor() {
+ super()
+
+ /**
+ * @type {null|module:net.Socket}
+ */
+ this.socket = null
+
+ /**
+ * @type {Buffer}
+ */
+ this.data = Buffer.from([])
+
+ /**
+ * @type {boolean}
+ * @private
+ */
+ this._closeEmitted = false
+
+ /**
+ * @type {null|string}
+ */
+ this.remoteAddress = null
+
+ /**
+ * @type {null|number}
+ */
+ this.remotePort = null
+
+ /**
+ * @type {null|number}
+ */
+ this.id = null
+
+ this._setLogger()
+ }
+
+ /**
+ * @param {string} host
+ * @param {number} port
+ */
+ connect(host, port) {
+ if (this.socket !== null)
+ throw new Error(`this Connection already has a socket`)
+
+ this.socket = new net.Socket()
+ this.socket.connect({host, port})
+
+ this.remoteAddress = host
+ this.remotePort = port
+
+ this._setId()
+ this._setLogger()
+ this._setSocketEvents()
+ }
+
+ /**
+ * @param {module:net.Socket} socket
+ */
+ setSocket(socket) {
+ this.socket = socket
+
+ this.remoteAddress = socket.remoteAddress
+ this.remotePort = socket.remotePort
+
+ this._setId()
+ this._setLogger()
+ this._setSocketEvents()
+ }
+
+ /**
+ * @private
+ */
+ _setLogger() {
+ let addr = this.socket ? this.remoteAddr() : '?'
+ this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
+ }
+
+ /**
+ * @private
+ */
+ _setId() {
+ this.id = Math.floor(Math.random() * 10000)
+ }
+
+ /**
+ * @private
+ */
+ _setSocketEvents() {
+ this.socket.on('connect', this.onConnect)
+ this.socket.on('data', this.onData)
+ this.socket.on('end', this.onEnd)
+ this.socket.on('close', this.onClose)
+ this.socket.on('error', this.onError)
+ }
+
+ /**
+ * @param {Buffer} data
+ * @private
+ */
+ _appendToBuffer(data) {
+ this.data = Buffer.concat([this.data, data])
+ }
+
+ /**
+ * @return {string}
+ */
+ remoteAddr() {
+ return this.remoteAddress + ':' + this.remotePort
+ }
+
+ /**
+ * @private
+ */
+ _processChunks() {
+ if (!this.data.length)
+ return
+
+ this.logger.trace(`processChunks (start):`, this.data)
+
+ /**
+ * @type {Buffer[]}
+ */
+ let messages = []
+ let offset = 0
+ let eotPos
+ do {
+ eotPos = this.data.indexOf(EOT, offset)
+ if (eotPos !== -1) {
+ let message = this.data.slice(offset, eotPos)
+ messages.push(message)
+
+ this.logger.debug(`processChunks: found new message (${offset}, ${eotPos})`)
+ offset = eotPos + 1
+ }
+ } while (eotPos !== -1 && offset < this.data.length-1)
+
+ if (offset !== 0) {
+ this.data = this.data.slice(offset)
+ this.logger.trace(`processChunks: slicing data from ${offset}`)
+ }
+
+ this.logger.trace(`processChunks (after parsing):`, this.data)
+
+ for (let message of messages) {
+ try {
+ let buf = message.toString('utf-8')
+ this.logger.debug(buf)
+
+ let json = JSON.parse(buf)
+ this._emitMessage(json)
+ } catch (error) {
+ this.logger.error('failed to parse data as JSON')
+ this.logger.debug(message)
+ }
+ }
+ }
+
+ /**
+ * @param {object} json
+ * @private
+ */
+ _emitMessage(json) {
+ if (!Array.isArray(json)) {
+ this.logger.error('malformed message, JSON array expected', json)
+ return
+ }
+
+ let type = json.shift()
+ let message
+ switch (type) {
+ case Message.REQUEST: {
+ let data = json.shift()
+ if (!data || !isObject(data)) {
+ this.logger.error('malformed REQUEST message')
+ return
+ }
+
+ message = new RequestMessage(data.type, data.data || null)
+ if (data.password)
+ message.setPassword(data.password)
+ break
+ }
+
+ case Message.RESPONSE: {
+ let data = json.shift()
+ if (!data || !Array.isArray(data) || data.length < 2) {
+ this.logger.error('malformed RESPONSE message')
+ return
+ }
+
+ message = new ResponseMessage()
+ message.setError(data[0]).setData(data[1])
+
+ break
+ }
+
+ default:
+ this.logger.error(`malformed message, unexpected type ${type}`)
+ return
+ }
+
+ this.emit('message', message)
+ }
+
+ /**
+ * @type {Message} data
+ * @param message
+ */
+ send(message) {
+ if (!(message instanceof Message))
+ throw new Error('send expects Message, got', message)
+
+ let json = JSON.stringify(message.getAsObject())
+ let buf = Buffer.concat([
+ Buffer.from(json),
+ Buffer.from([EOT])
+ ])
+
+ this.logger.debug('send:', json)
+ this.logger.trace('send:', buf)
+
+ try {
+ this.socket.write(buf)
+ } catch (error) {
+ this.logger.error(`processChunks: failed to write response ${JSON.stringify(message)} to a socket`, error)
+ }
+ }
+
+ /**
+ */
+ close() {
+ try {
+ this.socket.end()
+ this.socket.destroy()
+ this._emitClose()
+ } catch (error) {
+ this.logger.error('close:', error)
+ }
+ }
+
+ /**
+ * @private
+ */
+ _emitClose() {
+ if (this._closeEmitted)
+ return
+
+ this._closeEmitted = true
+ this.emit('close')
+ }
+
+ onConnect = () => {
+ this.logger.debug('connection established')
+ this.emit('connect')
+ }
+
+ onData = (data) => {
+ this.logger.trace('onData', data)
+ this._appendToBuffer(data)
+ this._processChunks()
+ }
+
+ onEnd = (data) => {
+ if (data)
+ this._appendToBuffer(data)
+
+ this._processChunks()
+ }
+
+ onClose = (hadError) => {
+ this._emitClose()
+ this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
+ }
+
+ onError = (error) => {
+ this._emitClose()
+ this.logger.warn(`socket error:`, error)
+ }
+
+}
+
+module.exports = {
+ Server,
+ Connection,
+ RequestMessage,
+ ResponseMessage
+} \ No newline at end of file