aboutsummaryrefslogtreecommitdiff
path: root/src/lib/server.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:05:37 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-03-01 02:05:37 +0300
commit8783de4fe5945a7ea22995c25a3e546c0426c7f6 (patch)
tree17711b548483b57a5b5b2b81594ad2a19fc6eeab /src/lib/server.js
parent57f541a6ea6722a2902e7cc7774aefca883fe8de (diff)
update protocol
Diffstat (limited to 'src/lib/server.js')
-rw-r--r--src/lib/server.js320
1 files changed, 261 insertions, 59 deletions
diff --git a/src/lib/server.js b/src/lib/server.js
index 08eadca..ca06a6d 100644
--- a/src/lib/server.js
+++ b/src/lib/server.js
@@ -1,14 +1,21 @@
const net = require('net')
const EventEmitter = require('events')
const {getLogger} = require('./logger')
-const isObject = require('lodash/isObject')
+const random = require('lodash/random')
+const config = require('./config')
+const {createCallablePromise} = require('./util')
+const {validateMessageData} = require('./data-validator')
const EOT = 0x04
+const REQUEST_NO_LIMIT = 999999
+
class Message {
static REQUEST = 0
static RESPONSE = 1
+ static PING = 2
+ static PONG = 3
/**
* @param {number} type
@@ -20,37 +27,61 @@ class Message {
this.type = type
}
+ /**
+ * @return {array}
+ */
getAsObject() {
return [this.type]
}
-
}
class ResponseMessage extends Message {
- constructor() {
+ /**
+ * @param {number} requestNo
+ */
+ constructor(requestNo) {
super(Message.RESPONSE)
+ this.requestNo = requestNo
this.error = null
this.data = null
}
+ /**
+ * @param {string} error
+ * @return {ResponseMessage}
+ */
setError(error) {
this.error = error
return this
}
+ /**
+ * @param data
+ * @return {ResponseMessage}
+ */
setData(data) {
this.data = data
return this
}
+ /**
+ * @return {array}
+ */
getAsObject() {
+ let response = {
+ no: this.requestNo
+ }
+
+ if (this.error !== null)
+ response.error = this.error
+
+ if (this.data !== null)
+ response.data = this.data
+
return [
...super.getAsObject(),
- [
- this.error,
- this.data
- ]
+ response
]
}
}
@@ -77,16 +108,28 @@ class RequestMessage extends Message {
* @type {null|string}
*/
this.password = null
+
+ /**
+ * @type {null|number}
+ */
+ this.requestNo = null
}
+ /**
+ * @return {array}
+ */
getAsObject() {
let request = {
+ no: this.requestNo,
type: this.requestType
}
if (this.requestData)
request.data = this.requestData
+ if (this.password)
+ request.password = this.password
+
return [
...super.getAsObject(),
request
@@ -99,8 +142,28 @@ class RequestMessage extends Message {
setPassword(password) {
this.password = password
}
+
+ /**
+ * @param {number} no
+ */
+ setRequestNo(no) {
+ this.requestNo = no
+ }
}
+class PingMessage extends Message {
+ constructor() {
+ super(Message.PING)
+ }
+}
+
+class PongMessage extends Message {
+ constructor() {
+ super(Message.PONG)
+ }
+}
+
+
class Server extends EventEmitter {
constructor() {
@@ -137,14 +200,10 @@ class Server extends EventEmitter {
onConnection = (socket) => {
let connection = new Connection()
connection.setSocket(socket)
- connection.on('message', (message) => {
- this.emit('message', {
- message,
- connection
- })
- })
this.logger.info(`new connection from ${socket.remoteAddress}:${socket.remotePort}`)
+
+ this.emit('new-connection', connection)
}
onListening = () => {
@@ -158,6 +217,7 @@ class Server extends EventEmitter {
}
+
class Connection extends EventEmitter {
constructor() {
@@ -190,9 +250,32 @@ class Connection extends EventEmitter {
this.remotePort = null
/**
- * @type {null|number}
+ * @type {boolean}
+ * @private
+ */
+ this._isAuthorized = config.get('password') === ''
+
+ /**
+ * @type {boolean}
+ * @private
*/
- this.id = null
+ this._textConversationAllowed = false
+
+ /**
+ * @type {boolean}
+ */
+ this._isOutgoing = false
+
+ /**
+ * @type {number}
+ */
+ this._lastOutgoingRequestNo = random(0, REQUEST_NO_LIMIT)
+
+ /**
+ * @type {object.<number, Promise>}
+ * @private
+ */
+ this._requestPromises = {}
this._setLogger()
}
@@ -205,13 +288,14 @@ class Connection extends EventEmitter {
if (this.socket !== null)
throw new Error(`this Connection already has a socket`)
+ this._isOutgoing = true
+
this.socket = new net.Socket()
this.socket.connect({host, port})
this.remoteAddress = host
this.remotePort = port
- this._setId()
this._setLogger()
this._setSocketEvents()
}
@@ -225,7 +309,11 @@ class Connection extends EventEmitter {
this.remoteAddress = socket.remoteAddress
this.remotePort = socket.remotePort
- this._setId()
+ if (this.remoteAddress === '127.0.0.1') {
+ this._isAuthorized = true
+ this._textConversationAllowed = true
+ }
+
this._setLogger()
this._setSocketEvents()
}
@@ -235,14 +323,7 @@ class Connection extends EventEmitter {
*/
_setLogger() {
let addr = this.socket ? this.remoteAddr() : '?'
- this.logger = getLogger(`<Connection ${this.id} ${addr}>`)
- }
-
- /**
- * @private
- */
- _setId() {
- this.id = Math.floor(Math.random() * 10000)
+ this.logger = getLogger(`<Connection ${addr}>`)
}
/**
@@ -304,76 +385,183 @@ class Connection extends EventEmitter {
this.logger.trace(`processChunks (after parsing):`, this.data)
- for (let message of messages) {
+ for (let rawMessage of messages) {
try {
- let buf = message.toString('utf-8')
+ let buf = rawMessage.toString('utf-8')
this.logger.debug(buf)
let json = JSON.parse(buf)
- this._emitMessage(json)
+
+ // try to parse the message
+ let message
+ try {
+ message = this._parseMessage(json)
+ } catch (e) {
+ // message is malformed
+ this.logger.error(e.message)
+
+ // send error to the other size
+ this.send(
+ new ResponseMessage(0).setError(e.message)
+ )
+
+ continue
+ }
+
+ if (message instanceof PingMessage) {
+ this.send(new PongMessage())
+ continue
+ }
+
+ if (message instanceof PongMessage)
+ continue
+
+ if (message instanceof RequestMessage) {
+ if (!this._isAuthorized) {
+ if (message.password !== config.get('password')) {
+ this.send(new ResponseMessage(message.requestNo).setError('invalid password'))
+ this.close()
+ break
+ }
+
+ this._isAuthorized = true
+ }
+
+ this.emit('request-message', message, this)
+ }
+
+ if (message instanceof ResponseMessage) {
+ if (message.requestNo in this._requestPromises) {
+ const P = this._requestPromises[message.requestNo]
+ delete this._requestPromises[message.requestNo]
+
+ P.resolve(message)
+ } else {
+ this.logger.warn('received unexpected Response message:', message)
+ }
+ }
} catch (error) {
this.logger.error('failed to parse data as JSON')
- this.logger.debug(message)
+ this.logger.debug(rawMessage)
}
}
}
/**
+ * Parse incoming message
+ *
* @param {object} json
+ * @return {Message}
* @private
+ * @throws Error
*/
- _emitMessage(json) {
- if (!Array.isArray(json)) {
- this.logger.error('malformed message, JSON array expected', json)
- return
- }
+ _parseMessage(json) {
+ if (!Array.isArray(json))
+ throw new Error('JSON array expected, got: ' + json)
let type = json.shift()
let message
switch (type) {
case Message.REQUEST: {
let data = json.shift()
- if (!data || !isObject(data)) {
- this.logger.error('malformed REQUEST message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['type', 's', true],
+ ['no', 'i', true],
+ ['password', 's', false],
+ ['data', 'snoa', false]
+ ])
+ } catch (e) {
+ throw new Error(`malformed REQUEST message: ${e.message}`)
}
message = new RequestMessage(data.type, data.data || null)
+ message.setRequestNo(data.no)
if (data.password)
message.setPassword(data.password)
- break
+
+ return message
}
case Message.RESPONSE: {
let data = json.shift()
- if (!data || !Array.isArray(data) || data.length < 2) {
- this.logger.error('malformed RESPONSE message')
- return
+
+ try {
+ validateMessageData(data, [
+ // name type required
+ ['no', 'i', true],
+ ['data', 'snoa', false],
+ ['error', 's', false],
+ ])
+ } catch (e) {
+ throw new Error(`malformed RESPONSE message: ${e.message}`)
}
- message = new ResponseMessage()
- message.setError(data[0]).setData(data[1])
+ message = new ResponseMessage(data.no)
+ message.setError(data.error || null)
+ .setData(data.data || null)
- break
+ return message
}
+ case Message.PING:
+ return new PingMessage()
+
+ case Message.PONG:
+ return new PongMessage()
+
default:
- this.logger.error(`malformed message, unexpected type ${type}`)
- return
+ throw new Error(`unexpected type ${type}`)
+ }
+ }
+
+ /**
+ * Send request
+ *
+ * @param {RequestMessage} message
+ * @return {Promise}
+ */
+ sendRequest(message) {
+ if (!(message instanceof RequestMessage))
+ throw new Error('sendRequest only accepts RequestMessage, got:', message)
+
+ // send password once (when talking to jobd-master)
+ if (!this._isAuthorized) {
+ message.setPassword(config.get('password'))
+ this._isAuthorized = true
+ }
+
+ // assign request number
+ const no = this._getNextOutgoingRequestNo()
+ if (this._requestPromises[no] !== undefined) {
+ this.logger.error(`sendRequest: next request's No is ${no}, found a promise awaiting response with the same no, rejecting...`)
+ this._requestPromises[no].reject(new Error(`this should not happen, but another request needs this number (${no})`))
+ delete this._requestPromises[no]
}
- this.emit('message', message)
+ message.setRequestNo(no)
+
+ // send it
+ this.send(message)
+
+ // create and return promise
+ const P = createCallablePromise()
+ this._requestPromises[no] = P
+
+ return P
}
/**
+ * Send any Message
+ *
* @type {Message} data
* @param message
*/
send(message) {
if (!(message instanceof Message))
- throw new Error('send expects Message, got', message)
-
- // TODO set password!
+ throw new Error('send expects Message, got: ' + message)
let json = JSON.stringify(message.getAsObject())
let buf = Buffer.concat([
@@ -391,13 +579,20 @@ class Connection extends EventEmitter {
}
}
+ _getNextOutgoingRequestNo() {
+ this._lastOutgoingRequestNo++;
+ if (this._lastOutgoingRequestNo >= REQUEST_NO_LIMIT)
+ this._lastOutgoingRequestNo = 1
+ return this._lastOutgoingRequestNo
+ }
+
/**
*/
close() {
try {
this.socket.end()
this.socket.destroy()
- this._emitClose()
+ this._handleClose()
} catch (error) {
this.logger.error('close:', error)
}
@@ -406,12 +601,17 @@ class Connection extends EventEmitter {
/**
* @private
*/
- _emitClose() {
- if (this._closeEmitted)
- return
+ _handleClose() {
+ if (!this._closeEmitted) {
+ this._closeEmitted = true
+ this.emit('close')
+ }
+
+ for (const no in this._requestPromises) {
+ this._requestPromises[no].reject(new Error('socket is closed'))
+ }
- this._closeEmitted = true
- this.emit('close')
+ this._requestPromises = {}
}
onConnect = () => {
@@ -433,12 +633,12 @@ class Connection extends EventEmitter {
}
onClose = (hadError) => {
- this._emitClose()
+ this._handleClose()
this.logger.debug(`socket closed` + (hadError ? ` with error` : ''))
}
onError = (error) => {
- this._emitClose()
+ this._handleClose()
this.logger.warn(`socket error:`, error)
}
@@ -448,5 +648,7 @@ module.exports = {
Server,
Connection,
RequestMessage,
- ResponseMessage
+ ResponseMessage,
+ PingMessage,
+ PongMessage,
} \ No newline at end of file