From 5e7d34458a6e60487393caa4f320ab1cfc1cf8e5 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Wed, 24 Feb 2021 03:59:25 +0300 Subject: initial commit --- src/workers-list.js | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 src/workers-list.js (limited to 'src/workers-list.js') diff --git a/src/workers-list.js b/src/workers-list.js new file mode 100644 index 0000000..96a127a --- /dev/null +++ b/src/workers-list.js @@ -0,0 +1,145 @@ +const intersection = require('lodash/intersection') +const {masterConfig} = require('./config') +const {getLogger} = require('./logger') +const {RequestMessage} = require('./server') +const throttle = require('lodash/throttle') + +class WorkersList { + + constructor() { + /** + * @type {{connection: Connection, targets: string[]}[]} + */ + this.workers = [] + + /** + * @type {object.} + */ + this.targetsToPoke = {} + + /** + * @type {object.} + */ + this.targetsWaitingToPoke = {} + + /** + * @type {NodeJS.Timeout} + */ + this.pingInterval = setInterval(this.sendPings, masterConfig.ping_interval * 1000) + + /** + * @type {Logger} + */ + this.logger = getLogger('WorkersList') + } + + /** + * @param {Connection} connection + * @param {string[]} targets + */ + add(connection, targets) { + this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) + + this.workers.push({connection, targets}) + connection.on('close', () => { + this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) + this.workers = this.workers.filter(worker => { + return worker.connection !== connection + }) + }) + + let waiting = Object.keys(this.targetsWaitingToPoke) + if (!waiting.length) + return + + let intrs = intersection(waiting, targets) + if (intrs.length) { + this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') + this._pokeWorkerConnection(connection, intrs) + for (let target of intrs) + delete this.targetsWaitingToPoke[target] + this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) + } + } + + /** + * @param {string[]} targets + */ + poke(targets) { + this.logger.debug('poke:', targets) + if (!Array.isArray(targets)) + throw new Error('targets must be Array') + + for (let t of targets) + this.targetsToPoke[t] = true + + this._pokeWorkers() + } + + /** + * @private + */ + _pokeWorkers = throttle(() => { + const targets = Object.keys(this.targetsToPoke) + this.targetsToPoke = {} + + const found = {} + for (const worker of this.workers) { + const intrs = intersection(worker.targets, targets) + intrs.forEach(t => { + found[t] = true + }) + if (intrs.length > 0) + this._pokeWorkerConnection(worker.connection, targets) + } + + for (let target of targets) { + if (!(target in found)) { + this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) + this.targetsWaitingToPoke[target] = true + } + this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) + } + }, masterConfig.poke_throttle_interval * 1000, {leading: true}) + + /** + * @param {Connection} connection + * @param {string[]} targets + * @private + */ + _pokeWorkerConnection(connection, targets) { + this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) + connection.send( + new RequestMessage('poll', { + targets + }) + ) + } + + /** + * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} + */ + getInfo() { + return this.workers.map(worker => { + return { + remoteAddr: worker.connection.socket?.remoteAddress, + remotePort: worker.connection.socket?.remotePort, + targets: worker.targets + } + }) + } + + /** + * @private + */ + sendPings = () => { + this.workers + .forEach(w => { + this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) + w.connection.send(new RequestMessage('ping')) + }) + } + +} + +module.exports = WorkersList \ No newline at end of file -- cgit v1.2.3