from threading import Lock from lib.util import Colored from lib.scanner import PortState try: from ch1p import telegram_notify except ModuleNotFoundError: pass class Results: def __init__(self): self.warnings = [] self.mutex = Lock() def add(self, worker): host = worker.get_host() with self.mutex: if not worker.done: print(f'{Colored.RED}{worker.name}: scanning failed{Colored.END}') return if worker.name != host: print(f'{worker.name} ({host}):') else: print(f'{host}:') opened = [] results = worker.get_results() for port, state in results: if state != PortState.OPEN: continue opened.append(port) if not worker.is_expected(port): self.warnings.append(f'{worker.name} ({host}): port {port} is open') print(f' {Colored.RED}{port} opened{Colored.END}') else: print(f' {Colored.GREEN}{port} opened{Colored.END}') if worker.opened: for port in worker.opened: if port not in opened: self.warnings.append( f'{worker.name} ({host}): port {port} is NOT open') print(f' {Colored.RED}{port} not opened{Colored.END}') print() def has_warnings(self): return len(self.warnings) > 0 def notify(self, chat_id=None, token=None): text = '❗️Attention!\n\n' text += '\n'.join(self.warnings) telegram_notify(text, parse_mode='html', chat_id=chat_id, token=token)