aboutsummaryrefslogtreecommitdiff
path: root/src/lib/workers-list.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
commitfeaa5065f900a9c031ca7d66d80957040e2ee99f (patch)
treec6dbaf2861364725df1c54a8ba3423ca273107e0 /src/lib/workers-list.js
parent142869948c40900569f339a2177e95a3be3bbdfb (diff)
refactor: moved some files to lib/
Diffstat (limited to 'src/lib/workers-list.js')
-rw-r--r--src/lib/workers-list.js145
1 files changed, 145 insertions, 0 deletions
diff --git a/src/lib/workers-list.js b/src/lib/workers-list.js
new file mode 100644
index 0000000..620f711
--- /dev/null
+++ b/src/lib/workers-list.js
@@ -0,0 +1,145 @@
+const intersection = require('lodash/intersection')
+const config = require('./config')
+const {getLogger} = require('./logger')
+const {RequestMessage} = require('./server')
+const throttle = require('lodash/throttle')
+
+class WorkersList {
+
+ constructor() {
+ /**
+ * @type {{connection: Connection, targets: string[]}[]}
+ */
+ this.workers = []
+
+ /**
+ * @type {object.<string, boolean>}
+ */
+ this.targetsToPoke = {}
+
+ /**
+ * @type {object.<string, boolean>}
+ */
+ this.targetsWaitingToPoke = {}
+
+ /**
+ * @type {NodeJS.Timeout}
+ */
+ this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000)
+
+ /**
+ * @type {Logger}
+ */
+ this.logger = getLogger('WorkersList')
+ }
+
+ /**
+ * @param {Connection} connection
+ * @param {string[]} targets
+ */
+ add(connection, targets) {
+ this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`)
+
+ this.workers.push({connection, targets})
+ connection.on('close', () => {
+ this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`)
+ this.workers = this.workers.filter(worker => {
+ return worker.connection !== connection
+ })
+ })
+
+ let waiting = Object.keys(this.targetsWaitingToPoke)
+ if (!waiting.length)
+ return
+
+ let intrs = intersection(waiting, targets)
+ if (intrs.length) {
+ this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker')
+ this._pokeWorkerConnection(connection, intrs)
+ for (let target of intrs)
+ delete this.targetsWaitingToPoke[target]
+ this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke)
+ }
+ }
+
+ /**
+ * @param {string[]} targets
+ */
+ poke(targets) {
+ this.logger.debug('poke:', targets)
+ if (!Array.isArray(targets))
+ throw new Error('targets must be Array')
+
+ for (let t of targets)
+ this.targetsToPoke[t] = true
+
+ this._pokeWorkers()
+ }
+
+ /**
+ * @private
+ */
+ _pokeWorkers = throttle(() => {
+ const targets = Object.keys(this.targetsToPoke)
+ this.targetsToPoke = {}
+
+ const found = {}
+ for (const worker of this.workers) {
+ const intrs = intersection(worker.targets, targets)
+ intrs.forEach(t => {
+ found[t] = true
+ })
+ if (intrs.length > 0)
+ this._pokeWorkerConnection(worker.connection, targets)
+ }
+
+ for (let target of targets) {
+ if (!(target in found)) {
+ this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`)
+ this.targetsWaitingToPoke[target] = true
+ }
+ this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke)
+ }
+ }, config.get('poke_throttle_interval') * 1000, {leading: true})
+
+ /**
+ * @param {Connection} connection
+ * @param {string[]} targets
+ * @private
+ */
+ _pokeWorkerConnection(connection, targets) {
+ this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
+ connection.send(
+ new RequestMessage('poll', {
+ targets
+ })
+ )
+ }
+
+ /**
+ * @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
+ */
+ getInfo() {
+ return this.workers.map(worker => {
+ return {
+ remoteAddr: worker.connection.socket?.remoteAddress,
+ remotePort: worker.connection.socket?.remotePort,
+ targets: worker.targets
+ }
+ })
+ }
+
+ /**
+ * @private
+ */
+ sendPings = () => {
+ this.workers
+ .forEach(w => {
+ this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
+ w.connection.send(new RequestMessage('ping'))
+ })
+ }
+
+}
+
+module.exports = WorkersList \ No newline at end of file