aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeny Zinoviev <me@ch1p.io>2022-05-07 16:53:03 +0300
committerEvgeny Zinoviev <me@ch1p.io>2022-05-07 16:53:03 +0300
commit3bfca2f2fbabb72ffbda01e016fd53852eb3998e (patch)
treeca7fafb886e47e07a52c0092f490061245fdc2e4
parentd554e1c1c9ff6ae5b828dc4281208bb705dd350b (diff)
refactor a bitHEADmaster
-rw-r--r--mosgorsud/__init__.py5
-rw-r--r--mosgorsud/parser.py (renamed from mgs.py)87
-rwxr-xr-xtelegram_notify.py43
-rwxr-xr-xto_csv.py20
4 files changed, 90 insertions, 65 deletions
diff --git a/mosgorsud/__init__.py b/mosgorsud/__init__.py
new file mode 100644
index 0000000..3d9be46
--- /dev/null
+++ b/mosgorsud/__init__.py
@@ -0,0 +1,5 @@
+from .parser import get_cases
+
+__all__ = [
+ 'get_cases'
+] \ No newline at end of file
diff --git a/mgs.py b/mosgorsud/parser.py
index 96e22a9..0130354 100644
--- a/mgs.py
+++ b/mosgorsud/parser.py
@@ -5,9 +5,15 @@ import os
import tempfile
import random
import string
+import logging
+
from bs4 import BeautifulSoup
from typing import List, Dict
+logger = logging.getLogger(__name__)
+
+BASE_URL = "https://mos-gorsud.ru/mgs/defend"
+
headers = {
'Referer': 'https://mos-gorsud.ru/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
@@ -24,35 +30,55 @@ def get_links(s: str) -> List[str]:
return list(set(re.findall(regex, s)))
-class MGSPiracy:
- BASE_URL = "https://mos-gorsud.ru/mgs/defend"
+def get_full_url(url: str) -> str:
+ if not url.startswith('http:') and not url.startswith('https:'):
+ if not url.startswith('/'):
+ url = '/' + url
+ url = 'https://mos-gorsud.ru' + url
+ return url
+
+
+def get_document_text(url: str) -> str:
+ print(f'downloading {url}')
+
+ r = requests.get(url, allow_redirects=True, headers=headers)
+ content_disposition = r.headers['Content-Disposition']
+ filename, file_extension = os.path.splitext(re.search('attachment; filename="(.*?)"', content_disposition).group(1))
+
+ tempname = '%s/%s%s' % (tempfile.gettempdir(), strgen(10), file_extension)
+
+ with open(tempname, 'wb') as f:
+ f.write(r.content)
- def __init__(self, from_page: int, to_page: int):
- self.from_page = from_page
- self.to_page = to_page
+ text = textract.process(tempname).decode('utf-8')
+ os.unlink(tempname)
- def get_cases(self) -> List[Dict]:
- cases = []
+ return text
- for page in range(self.from_page, self.to_page+1):
- print(f'page {page}')
- url = self.BASE_URL + '?page=' + str(page)
- r = requests.get(url, headers=headers)
+def get_cases(from_page: int, to_page: int) -> List[Dict]:
+ cases = []
- soup = BeautifulSoup(r.text, "html.parser")
- rows = soup.select('.searchResultContainer table.custom_table tbody tr')
+ for page in range(from_page, to_page+1):
+ url = f'{BASE_URL}?page={page}'
+ print(f'page {page} ({url})')
- for row in rows:
- cols = row.find_all('td')
+ r = requests.get(url, headers=headers)
+ soup = BeautifulSoup(r.text, "html.parser")
+ rows = soup.select('.searchResultContainer table.custom_table tbody tr')
+
+ for row in rows:
+ cols = row.find_all('td')
+
+ try:
date = cols[0].get_text().strip()
statement_number = cols[1].get_text().strip()
applicant = cols[3].get_text().strip()
object = cols[4].get_text().strip()
- link = self.mgs_url(cols[5].find('a')['href'])
+ link = get_full_url(cols[5].find('a')['href'])
- decision_text = self.get_document_text(link)
+ decision_text = get_document_text(link)
violation_links = '\n'.join(get_links(decision_text))
cases.append(dict(
@@ -65,28 +91,7 @@ class MGSPiracy:
decision_text=decision_text
))
- return cases
-
- def mgs_url(self, url: str) -> str:
- if not url.startswith('http:') and not url.startswith('https:'):
- if not url.startswith('/'):
- url = '/' + url
- url = 'https://mos-gorsud.ru' + url
- return url
-
- def get_document_text(self, url: str) -> str:
- print(f'downloading {url}')
-
- r = requests.get(url, allow_redirects=True, headers=headers)
- content_disposition = r.headers['Content-Disposition']
- filename, file_extension = os.path.splitext(re.search('attachment; filename="(.*?)"', content_disposition).group(1))
-
- tempname = '%s/%s%s' % (tempfile.gettempdir(), strgen(10), file_extension)
-
- with open(tempname, 'wb') as f:
- f.write(r.content)
-
- text = textract.process(tempname).decode('utf-8')
- os.unlink(tempname)
+ except (TypeError, KeyError) as e:
+ logger.exception(e)
- return text
+ return cases
diff --git a/telegram_notify.py b/telegram_notify.py
index a834b34..0ccb2b9 100755
--- a/telegram_notify.py
+++ b/telegram_notify.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python3
import traceback
-from mgs import MGSPiracy
+import mosgorsud
+import requests
+import urllib3.exceptions
+
from argparse import ArgumentParser
from ch1p import State, telegram_notify
from html import escape
@@ -10,21 +13,24 @@ if __name__ == '__main__':
# parse arguments
parser = ArgumentParser()
parser.add_argument('--state-file', required=True)
- parser.add_argument('--token', help='Telegram bot token', required=True)
- parser.add_argument('--chat-id', type=int, help='Telegram chat id (with bot)', required=True)
- parser.add_argument('--from', type=int, default=1, help='First page', dest='_from')
- parser.add_argument('--to', type=int, default=5, help='Last page')
+ parser.add_argument('--token', required=True,
+ help='Telegram bot token',)
+ parser.add_argument('--chat-id', type=int, required=True,
+ help='Telegram chat id (with bot)')
+ parser.add_argument('--from', type=int, default=1, dest='_from',
+ help='First page')
+ parser.add_argument('--to', type=int, default=5,
+ help='Last page')
parser.add_argument('--domains', nargs='+', required=True)
- args = parser.parse_args()
+ arg = parser.parse_args()
try:
# get recent cases
- mgs = MGSPiracy(from_page=args._from, to_page=args.to)
- cases = mgs.get_cases()
+ cases = mosgorsud.get_cases(from_page=arg._from, to_page=arg.to)
# read state
- state = State(file=args.state_file,
+ state = State(file=arg.state_file,
default=dict(cases=[]))
# loop through cases
@@ -34,10 +40,10 @@ if __name__ == '__main__':
continue
matched = False
- for mydomain in args.domains:
+ for mydomain in arg.domains:
if mydomain in case['decision_text']:
matched = True
- results.append('%s found in %s' % (mydomain, case['statement_number']))
+ results.append('%s found in %s (%s)' % (mydomain, case['statement_number'], case['doc_link']))
state['cases'].append(case['statement_number'])
if matched:
@@ -50,14 +56,19 @@ if __name__ == '__main__':
telegram_notify(text=escape(text),
parse_mode='HTML',
- token=args.token,
- chat_id=args.chat_id)
+ token=arg.token,
+ chat_id=arg.chat_id)
except KeyboardInterrupt:
pass
+ except (TimeoutError, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError):
+ telegram_notify(text='mosgorsud error: network timeout',
+ token=arg.token,
+ chat_id=arg.chat_id)
+
except:
- telegram_notify(text='error: '+escape(traceback.format_exc()),
+ telegram_notify(text='mosgorsud error: '+escape(traceback.format_exc()),
parse_mode='HTML',
- token=args.token,
- chat_id=args.chat_id)
+ token=arg.token,
+ chat_id=arg.chat_id)
diff --git a/to_csv.py b/to_csv.py
index e08bc24..85674fd 100755
--- a/to_csv.py
+++ b/to_csv.py
@@ -1,22 +1,26 @@
#!/usr/bin/env python3
import csv
-from mgs import MGSPiracy
+import mosgorsud
+
from argparse import ArgumentParser
+
if __name__ == '__main__':
# parse arguments
argp = ArgumentParser()
- argp.add_argument('--output', type=str, default='output.csv', help='CSV output file name')
- argp.add_argument('--from', type=int, default=0, help='First page', dest='_from')
- argp.add_argument('--to', type=int, default=10, help='Last page')
- args = argp.parse_args()
+ argp.add_argument('--output', type=str, default='output.csv',
+ help='CSV output file name')
+ argp.add_argument('--from', type=int, default=1, dest='_from',
+ help='First page')
+ argp.add_argument('--to', type=int, default=10,
+ help='Last page')
+ arg = argp.parse_args()
# get cases
- mgs = MGSPiracy(from_page=args._from, to_page=args.to)
- cases = mgs.get_cases()
+ cases = mosgorsud.get_cases(from_page=arg._from, to_page=arg.to)
# write to csv
- f = open(args.output, 'w', newline='')
+ f = open(arg.output, 'w', newline='')
csv_writer = csv.writer(f)
for case in cases: