aboutsummaryrefslogtreecommitdiff
path: root/src/workers-list.js
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
committerEvgeny Zinoviev <me@ch1p.io>2021-02-28 16:11:57 +0300
commitfeaa5065f900a9c031ca7d66d80957040e2ee99f (patch)
treec6dbaf2861364725df1c54a8ba3423ca273107e0 /src/workers-list.js
parent142869948c40900569f339a2177e95a3be3bbdfb (diff)
refactor: moved some files to lib/
Diffstat (limited to 'src/workers-list.js')
-rw-r--r--src/workers-list.js145
1 files changed, 0 insertions, 145 deletions
diff --git a/src/workers-list.js b/src/workers-list.js
deleted file mode 100644
index 620f711..0000000
--- a/src/workers-list.js
+++ /dev/null
@@ -1,145 +0,0 @@
-const intersection = require('lodash/intersection')
-const config = require('./config')
-const {getLogger} = require('./logger')
-const {RequestMessage} = require('./server')
-const throttle = require('lodash/throttle')
-
-class WorkersList {
-
- constructor() {
- /**
- * @type {{connection: Connection, targets: string[]}[]}
- */
- this.workers = []
-
- /**
- * @type {object.<string, boolean>}
- */
- this.targetsToPoke = {}
-
- /**
- * @type {object.<string, boolean>}
- */
- this.targetsWaitingToPoke = {}
-
- /**
- * @type {NodeJS.Timeout}
- */
- this.pingInterval = setInterval(this.sendPings, config.get('ping_interval') * 1000)
-
- /**
- * @type {Logger}
- */
- this.logger = getLogger('WorkersList')
- }
-
- /**
- * @param {Connection} connection
- * @param {string[]} targets
- */
- add(connection, targets) {
- this.logger.info(`add: connection from ${connection.remoteAddr()}, targets ${JSON.stringify(targets)}`)
-
- this.workers.push({connection, targets})
- connection.on('close', () => {
- this.logger.info(`connection from ${connection.remoteAddr()} closed, removing worker`)
- this.workers = this.workers.filter(worker => {
- return worker.connection !== connection
- })
- })
-
- let waiting = Object.keys(this.targetsWaitingToPoke)
- if (!waiting.length)
- return
-
- let intrs = intersection(waiting, targets)
- if (intrs.length) {
- this.logger.info('add: found intersection with waiting targets:', intrs, 'going to poke new worker')
- this._pokeWorkerConnection(connection, intrs)
- for (let target of intrs)
- delete this.targetsWaitingToPoke[target]
- this.logger.trace(`add: this.targetsWaitingToPoke:`, this.targetsWaitingToPoke)
- }
- }
-
- /**
- * @param {string[]} targets
- */
- poke(targets) {
- this.logger.debug('poke:', targets)
- if (!Array.isArray(targets))
- throw new Error('targets must be Array')
-
- for (let t of targets)
- this.targetsToPoke[t] = true
-
- this._pokeWorkers()
- }
-
- /**
- * @private
- */
- _pokeWorkers = throttle(() => {
- const targets = Object.keys(this.targetsToPoke)
- this.targetsToPoke = {}
-
- const found = {}
- for (const worker of this.workers) {
- const intrs = intersection(worker.targets, targets)
- intrs.forEach(t => {
- found[t] = true
- })
- if (intrs.length > 0)
- this._pokeWorkerConnection(worker.connection, targets)
- }
-
- for (let target of targets) {
- if (!(target in found)) {
- this.logger.debug(`_pokeWorkers: worker responsible for ${target} not found. we'll remember it`)
- this.targetsWaitingToPoke[target] = true
- }
- this.logger.trace('_pokeWorkers: this.targetsWaitingToPoke:', this.targetsWaitingToPoke)
- }
- }, config.get('poke_throttle_interval') * 1000, {leading: true})
-
- /**
- * @param {Connection} connection
- * @param {string[]} targets
- * @private
- */
- _pokeWorkerConnection(connection, targets) {
- this.logger.debug('_pokeWorkerConnection:', connection.remoteAddr(), targets)
- connection.send(
- new RequestMessage('poll', {
- targets
- })
- )
- }
-
- /**
- * @return {{targets: string[], remoteAddr: string, remotePort: number}[]}
- */
- getInfo() {
- return this.workers.map(worker => {
- return {
- remoteAddr: worker.connection.socket?.remoteAddress,
- remotePort: worker.connection.socket?.remotePort,
- targets: worker.targets
- }
- })
- }
-
- /**
- * @private
- */
- sendPings = () => {
- this.workers
- .forEach(w => {
- this.logger.trace(`sending ping to ${w.connection.remoteAddr()}`)
- w.connection.send(new RequestMessage('ping'))
- })
- }
-
-}
-
-module.exports = WorkersList \ No newline at end of file