From feaa5065f900a9c031ca7d66d80957040e2ee99f Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 28 Feb 2021 16:11:57 +0300 Subject: refactor: moved some files to lib/ --- src/workers-list.js | 145 ---------------------------------------------------- 1 file changed, 145 deletions(-) delete mode 100644 src/workers-list.js (limited to 'src/workers-list.js') diff --git a/src/workers-list.js b/src/workers-list.js deleted file mode 100644 index 620f711..0000000 --- a/src/workers-list.js +++ /dev/null @@ -1,145 +0,0 @@ -const intersection = require('lodash/intersection') -const config = require('./config') -const {getLogger} = require('./logger') -const {RequestMessage} = require('./server') -const throttle = require('lodash/throttle') - -class WorkersList { - - constructor() { - /** - * @type {{connection: Connection, targets: string[]}[]} - */ - this.workers = [] - - /** - * @type {object.} - */ - this.targetsToPoke = {} - - /** - * @type {object.} - */ - this.targetsWaitingToPoke = {} - - /** - * @type {NodeJS.Timeout} - */ - this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000) - - /** - * @type {Logger} - */ - this.logger = getLogger('WorkersList') - } - - /** - * @param {Connection} connection - * @param {string[]} targets - */ - add(connection, targets) { - this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`) - - this.workers.push({connection, targets}) - connection.on('close', () => { - this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`) - this.workers = this.workers.filter(worker => { - return worker.connection !== connection - }) - }) - - let waiting = Object.keys(this.targetsWaitingToPoke) - if (!waiting.length) - return - - let intrs = intersection(waiting, targets) - if (intrs.length) { - this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker') - this._pokeWorkerConnection(connection, intrs) - for (let target of intrs) - delete this.targetsWaitingToPoke[target] - this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke) - } - } - - /** - * @param {string[]} targets - */ - poke(targets) { - this.logger.debug('poke:', targets) - if (!Array.isArray(targets)) - throw new Error('targets must be Array') - - for (let t of targets) - this.targetsToPoke[t] = true - - this._pokeWorkers() - } - - /** - * @private - */ - _pokeWorkers = throttle(() => { - const targets = Object.keys(this.targetsToPoke) - this.targetsToPoke = {} - - const found = {} - for (const worker of this.workers) { - const intrs = intersection(worker.targets, targets) - intrs.forEach(t => { - found[t] = true - }) - if (intrs.length > 0) - this._pokeWorkerConnection(worker.connection, targets) - } - - for (let target of targets) { - if (!(target in found)) { - this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`) - this.targetsWaitingToPoke[target] = true - } - this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke) - } - }, config.get('poke_throttle_interval') * 1000, {leading: true}) - - /** - * @param {Connection} connection - * @param {string[]} targets - * @private - */ - _pokeWorkerConnection(connection, targets) { - this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets) - connection.send( - new RequestMessage('poll', { - targets - }) - ) - } - - /** - * @return {{targets: string[], remoteAddr: string, remotePort: number}[]} - */ - getInfo() { - return this.workers.map(worker => { - return { - remoteAddr: worker.connection.socket?.remoteAddress, - remotePort: worker.connection.socket?.remotePort, - targets: worker.targets - } - }) - } - - /** - * @private - */ - sendPings = () => { - this.workers - .forEach(w => { - this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`) - w.connection.send(new RequestMessage('ping')) - }) - } - -} - -module.exports = WorkersList \ No newline at end of file -- cgit v1.2.3