From abd1975def213891afdf0d87adbf79c2c7dbc0cb Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sat, 30 Dec 2023 15:37:08 +0300 Subject: upd --- database/__init__.py | 1 - database/database.py | 152 --------------- dl-from-db.py | 45 +++++ dl-retronews.py | 80 ++++++++ grab-retronews.py | 457 --------------------------------------------- mdf/__init__.py | 3 + mdf/database/__init__.py | 1 + mdf/database/database.py | 154 +++++++++++++++ mdf/retronews/__init__.py | 15 ++ mdf/retronews/retronews.py | 397 +++++++++++++++++++++++++++++++++++++++ mdf/util/__init__.py | 0 mdf/util/util.py | 44 +++++ retronews/__init__.py | 8 - retronews/retronews.py | 50 ----- 14 files changed, 739 insertions(+), 668 deletions(-) delete mode 100644 database/__init__.py delete mode 100644 database/database.py create mode 100755 dl-from-db.py create mode 100755 dl-retronews.py delete mode 100755 grab-retronews.py create mode 100644 mdf/__init__.py create mode 100644 mdf/database/__init__.py create mode 100644 mdf/database/database.py create mode 100644 mdf/retronews/__init__.py create mode 100644 mdf/retronews/retronews.py create mode 100644 mdf/util/__init__.py create mode 100644 mdf/util/util.py delete mode 100644 retronews/__init__.py delete mode 100644 retronews/retronews.py diff --git a/database/__init__.py b/database/__init__.py deleted file mode 100644 index ef3f969..0000000 --- a/database/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .database import Database diff --git a/database/database.py b/database/database.py deleted file mode 100644 index 04902f1..0000000 --- a/database/database.py +++ /dev/null @@ -1,152 +0,0 @@ -import sqlite3 -import logging -import os.path -import retronews -import threading - -from typing import Optional - - -class Database: - SCHEMA = 6 - - def __init__(self): - self.logger = logging.getLogger(self.__class__.__name__) - - file = os.path.join(os.path.dirname(__file__), '..', 'mdf-retrobase.sqlite3') - self.sqlite = sqlite3.connect(file, check_same_thread=False) - self.lock = threading.Lock() - - sqlite_version = self._get_sqlite_version() - self.logger.debug(f'SQLite version: {sqlite_version}') - - schema_version = self.schema_get_version() - self.logger.debug(f'Schema version: {schema_version}') - - self.schema_init(schema_version) - self.schema_set_version(self.SCHEMA) - - def __del__(self): - if self.sqlite: - self.sqlite.commit() - self.sqlite.close() - - def _get_sqlite_version(self) -> str: - cursor = self.sqlite.cursor() - cursor.execute("SELECT sqlite_version()") - return cursor.fetchone()[0] - - def schema_get_version(self) -> int: - cursor = self.sqlite.execute('PRAGMA user_version') - return int(cursor.fetchone()[0]) - - def schema_set_version(self, v) -> None: - self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) - self.logger.info(f'Schema set to {v}') - - def cursor(self) -> sqlite3.Cursor: - return self.sqlite.cursor() - - def commit(self) -> None: - return self.sqlite.commit() - - def schema_init(self, version: int) -> None: - cursor = self.cursor() - - if version < 1: - # timestamps - cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_links ( - issue_date TEXT PRIMARY KEY, - url TEXT NOT NULL, - pages INTEGER NOT NULL - )""") - - if version < 2: - cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_pages ( - collection_id INTEGER NOT NULL, - doc_id INTEGER NOT NULL, - page INTEGER NOT NULL, - height INTEGER NOT NULL, - width INTEGER NOT NULL, - dpi INTEGER NOT NULL - )""") - cursor.execute("""CREATE UNIQUE INDEX mdf_pages_idx ON mdf_pages (collection_id, doc_id, page)""") - - if version < 3: - cursor.execute("ALTER TABLE mdf_pages ADD fail INTEGER NOT NULL") - - if version < 4: - cursor.execute("""CREATE INDEX mdf_pages_fail_idx ON mdf_pages (fail)""") - - if version < 5: - for col in ('collection_id', 'doc_id'): - cursor.execute(f"ALTER TABLE mdf_links ADD {col} INTEGER NOT NULL DEFAULT '0'") - cursor.execute("CREATE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)") - - if version < 6: - cursor.execute("DROP INDEX mdf_links_col_doc_idx") - cursor.execute("CREATE UNIQUE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)") - - self.commit() - - def add_link(self, issue_date: str, url: str, pages: int): - with self.lock: - self.cursor().execute("REPLACE INTO mdf_links (issue_date, url, pages) VALUES (?, ?, ?)", - (issue_date, url, str(pages))) - self.commit() - - def add_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int): - with self.lock: - self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, ?, ?, ?, 0)", - (collection_id, doc_id, page, width, height, dpi)) - self.commit() - - def update_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int): - with self.lock: - self.cursor().execute("UPDATE mdf_pages SET width=?, height=?, dpi=?, fail=0 WHERE collection_id=? AND doc_id=? AND page=?", - (width, height, dpi, collection_id, doc_id, page)) - self.commit() - - def add_page_failed(self, collection_id, doc_id, page): - with self.lock: - self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, 0, 0, 0, 1)", - (collection_id, doc_id, page)) - self.commit() - - def get_existing_pages(self, fail=0): - cur = self.cursor() - cur.execute("SELECT collection_id, doc_id, page FROM mdf_pages WHERE fail=?", (fail,)) - return cur.fetchall() - - def get_documents(self, range: Optional[tuple[str, str]] = None): - cur = self.cursor() - docs = [] - - sql = "SELECT issue_date, url, pages FROM mdf_links" - if range: - sql += f" WHERE issue_date BETWEEN '{range[0]}' AND '{range[1]}'" - cur.execute(sql) - for issue_date, url, pages in cur.fetchall(): - pub_date, collection_id, doc_id = retronews.parse_url(url) - docs.append(dict( - collection_id=collection_id, - doc_id=doc_id, - pages=pages - )) - - return docs - - def get_doc_pages(self, collection_id, doc_id): - cur = self.cursor() - cur.execute("SELECT page, width, height, dpi FROM mdf_pages WHERE collection_id=? AND doc_id=?", - (collection_id, doc_id)) - return cur.fetchall() - - def fix_documents(self): - cur = self.cursor() - cur.execute("SELECT issue_date, url FROM mdf_links") - for issue_date, url in cur.fetchall(): - pub_date, cid, did = retronews.parse_url(url) - cur.execute("UPDATE mdf_links SET collection_id=?, doc_id=? WHERE issue_date=?", - (cid, did, issue_date)) - self.commit() diff --git a/dl-from-db.py b/dl-from-db.py new file mode 100755 index 0000000..526c3a4 --- /dev/null +++ b/dl-from-db.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + +import logging + +from mdf import Database, retronews +from argparse import ArgumentParser + +database = Database() + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--output', type=str, required=True, + help='output directory') + parser.add_argument('--from-date', type=str) + parser.add_argument('--to-date', type=str) + parser.add_argument('--merge-threads', default=retronews.TILE_MERGING_POOL_SIZE, type=int) + parser.add_argument('--fetch-threads', default=retronews.PAGE_FETCHING_POOL_SIZE, type=int) + parser.add_argument('--only-fetch', action='store_true', + help='only fetch magazine tiles and exit, do not merge anything') + parser.add_argument('--force-overwrite', action='store_true', + help='if file yyyy-mm-dd.pdf already exists, delete it and start over') + parser.add_argument('--force-probe', action='store_true', + help='force all pages to use the \'probe\' method') + parser.add_argument('--fetch-probe-pages', nargs='+', type=int, + help='force some pages to use the \'probe\' method, when count of vertical and horizontal tiles is unknown') + + args = parser.parse_args() + + retronews.set_tile_merging_pool_size(args.merge_threads) + retronews.set_page_fetching_pool_size(args.fetch_threads) + + for doc in database.get_documents((args.from_date, args.to_date)): + url = doc['url'] + print(f'grabbing {url}...') + if not retronews.grab_magazine(url, + output_root=args.output, + probe_pages=args.fetch_probe_pages, + probe_all=args.force_probe, + only_fetch=args.only_fetch, + force_overwrite=args.force_overwrite): + logging.error(f'failed to grab {url}') diff --git a/dl-retronews.py b/dl-retronews.py new file mode 100755 index 0000000..6ee4325 --- /dev/null +++ b/dl-retronews.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 + +import warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) + +import re +import requests +import logging + +from mdf import retronews +from argparse import ArgumentParser + + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--url', type=str, required=True) + parser.add_argument('--output', type=str, required=True, + help='output directory') + parser.add_argument('--merge-threads', default=retronews.TILE_MERGING_POOL_SIZE, type=int) + parser.add_argument('--fetch-threads', default=retronews.PAGE_FETCHING_POOL_SIZE, type=int) + parser.add_argument('--continue-prev', action='store_true', + help='keep scrapping backwards in time') + parser.add_argument('--continue-next', action='store_true', + help='keep scrapping forwards in time') + parser.add_argument('--only-fetch', action='store_true', + help='only fetch magazine tiles and exit, do not merge anything') + parser.add_argument('--force-overwrite', action='store_true', + help='if file yyyy-mm-dd.pdf already exists, delete it and start over') + parser.add_argument('--force-probe', action='store_true', + help='force all pages to use the \'probe\' method') + parser.add_argument('--fetch-probe-pages', nargs='+', type=int, + help='force some pages to use the \'probe\' method, when count of vertical and horizontal tiles is unknown') + + args = parser.parse_args() + + with_continuation = args.continue_prev or args.continue_next + if args.fetch_probe_pages and with_continuation: + raise RuntimeError('--fetch-probe-pages cannot be used together with --continue-* options, it\'s a one time hack') + if args.only_fetch and with_continuation: + raise RuntimeError('--only-fetch cannot be used together with --continue-* options, it\'s a one time hack') + + TILE_MERGING_POOL_SIZE = args.merge_threads + PAGE_FETCHING_POOL_SIZE = args.fetch_threads + + url = args.url + while True: + print(f'grabbing {url}...') + if not retronews.grab_magazine(url, + output_root=args.output, + probe_pages=args.fetch_probe_pages, + probe_all=args.force_probe, + only_fetch=args.only_fetch, + force_overwrite=args.force_overwrite): + logging.error('failed to grab') + break + + if not args.continue_prev and not args.continue_next: + break + + r = requests.get(url) + + try: + next_url = None + if args.continue_next: + next_url = re.search(r'SUIVANT', r.text, re.S).groups()[0] + elif args.continue_prev: + next_url = re.search(r'\s+\s+PRÉCÉDENT', r.text, re.S).groups()[0] + + if not next_url: + break + + if next_url.startswith('/'): + next_url = f'https://www.retronews.fr{next_url}' + + url = next_url + + except: + print('error: failed to find previous link! exiting') + break diff --git a/grab-retronews.py b/grab-retronews.py deleted file mode 100755 index ac4dbf1..0000000 --- a/grab-retronews.py +++ /dev/null @@ -1,457 +0,0 @@ -#!/usr/bin/env python3 -import logging -import warnings -warnings.filterwarnings("ignore", category=DeprecationWarning) - -import os -import sys -import json -import re -import imghdr -import requests -import urllib.request -import urllib.error -import http.client -import subprocess -import shutil -import queue -import traceback -import retronews -import logging - -from database import Database -from typing import Optional -from threading import Thread, Lock -from time import sleep -from argparse import ArgumentParser - -warnings.filterwarnings("ignore", category=DeprecationWarning) - -VTILES = 3 -HTILES = 2 -TILE_MERGING_POOL_SIZE = 8 -PAGE_FETCHING_POOL_SIZE = 8 - -database = Database() -print_lock = Lock() - -pages_queue = queue.Queue() -merging_queue = queue.Queue() - - -def safe_print(*args, **kwargs): - with print_lock: - print(*args, **kwargs) - - -def run(args: list, **kwargs): - p = subprocess.run(args, **kwargs) - if p.returncode != 0: - raise OSError(f'convert returned {p.returncode} ('+' '.join(args)+')') - - -class DownloaderThread(Thread): - _url: str - _save_as: str - _download_result: Optional[bool] - _handle_http: bool - user_info: dict - - def __init__(self, url: str, save_as: str, thread_name=None, handle_http=False, user_info=None): - super().__init__() - if user_info is None: - user_info = {} - if thread_name: - self.name = thread_name - - self._url = url - self._save_as = save_as - self._download_result = None - self._handle_http = handle_http - self.user_info = user_info - - def run(self): - try: - self._download_result = download_file(self._url, self._save_as, handle_http_errors=not self._handle_http) - except urllib.error.HTTPError: - pass - - def is_downloaded(self) -> bool: - return self._download_result is True - - -class TileMergeWorker(Thread): - _working_dir: str - _number: int - - def __init__(self, working_dir: str, number: int): - super().__init__() - self._working_dir = working_dir - self._number = number - - def run(self): - safe_print(f'[tile merger {self._number}] started') - - while not merging_queue.empty(): - try: - page = merging_queue.get_nowait() - page_dir = os.path.join(self._working_dir, str(page)) - thumbnail_path = os.path.join(page_dir, 'thumbnail.jpg') - meta_path = os.path.join(page_dir, 'meta.json') - - if os.path.exists(thumbnail_path): - shutil.copy(thumbnail_path, os.path.join(self._working_dir, f'{page}.jpg')) - continue - - if os.path.exists(meta_path): - with open(meta_path, 'r') as f: - meta = json.loads(f.read()) - htiles = meta['h'] - vtiles = meta['v'] - else: - htiles = HTILES - vtiles = VTILES - - hfiles = [] - for h in range(htiles): - vfiles = [] - for v in range(vtiles): - vfiles.append(f'{h}x{v}.jpg') - run(['convert', '-append', *vfiles, f'{h}.jpg'], cwd=page_dir) - hfiles.append(f'{h}.jpg') - - run(['convert', '+append', *hfiles, os.path.join(self._working_dir, f'{page}.jpg')], cwd=page_dir) - # shutil.rmtree(page_dir) - - safe_print(f'[tile merger {self._number}] page {page} done') - - except queue.Empty: - break - - -class PageFetchWorker(Thread): - _working_dir: str - _number: int - _failed: bool - _error: Optional[str] - _probe_pages: Optional[list[int]] - _probe_all: bool - - def __init__(self, working_dir: str, number: int, collection_id, doc_id, probe_pages: Optional[list[int]] = None, probe_all=False): - super().__init__() - self._working_dir = working_dir - self._number = number - self._collection_id = collection_id - self._doc_id = doc_id - self._failed = False - self._error = None - self._probe_pages = probe_pages - self._probe_all = probe_all - - def run(self): - safe_print(f'[pf-{self._number}] started') - page = 0 - - try: - while not pages_queue.empty(): - try: - page = pages_queue.get_nowait() - safe_print(f'[pf-{self._number}] page {page} started') - - if self._probe_all or page in self._probe_pages: - self.probe_dl(page) - else: - try: - self.normal_dl(page) - except OSError: - safe_print(f'[pf-{self._number}] normal_dl() failed, trying probe_dl()') - self.probe_dl(page) - - except queue.Empty: - break - - except Exception as e: - self._failed = True - self._error = f'while fetching page {page}: {str(e)}' - - def _get_page_dir(self, page): - page_dir = os.path.join(self._working_dir, str(page)) - if not os.path.exists(page_dir): - os.makedirs(page_dir) - return page_dir - - def is_failed(self) -> bool: - return self._failed - - def get_error(self) -> str: - return self._error if self._error is not None else '' - - def normal_dl(self, page): - page_dir = self._get_page_dir(page) - dl_tasks = [] - for horiz_tile in range(HTILES): - for vert_tile in range(VTILES): - url = retronews.tile_url(self._collection_id, self._doc_id, page, h_tile=horiz_tile, v_tile=vert_tile) - output_file = f'{page_dir}/v{vert_tile}_h{horiz_tile}.jpg' - if os.path.isfile(output_file): - if os.path.getsize(output_file) < 4: - os.unlink(output_file) - # safe_print(f'[pf-{self._number}] already exists') - continue - - dl_tasks.append(DownloaderThread(url=url, - save_as=os.path.join(page_dir, output_file), - thread_name=f'p{page}-v{vert_tile}-h{horiz_tile}')) - - for task in dl_tasks: - task.start() - - data_error = False - - for task in dl_tasks: - task.join() - if not task.is_downloaded(): - # safe_print(f'failed to download file {task._url}') - raise OSError(f'network error, failed to download {task._url}') - - elif not imghdr.what(task._save_as): - data_error = True - - if data_error: - self.thumbnail_dl(page) - else: - safe_print(f'[pf-{self._number}] page {page}: all files saved') - - def probe_dl(self, page): - page_dir = self._get_page_dir(page) - real_h = 0 - real_v = 0 - data_error = False - dl_tasks = [] - for h in range(10): - for v in range(10): - url = retronews.tile_url(self._collection_id, self._doc_id, page, h_tile=h, v_tile=v) - output_file = f'{page_dir}/{h}x{v}.jpg' - if os.path.isfile(output_file): - safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} ALREADY') - if os.path.getsize(output_file) < 4: - os.unlink(output_file) - continue - - dl_tasks.append(DownloaderThread(url=url, - save_as=os.path.join(page_dir, output_file), - handle_http=True, - thread_name=f'p{page}-v{v}-h{h}', - user_info=dict(h=h, v=v))) - - for task in dl_tasks: - task.start() - for task in dl_tasks: - task.join() - - if task.is_downloaded(): - task_h = task.user_info['h'] - task_v = task.user_info['v'] - if task_h > real_h: - real_h = task_h - if task_v > real_v: - real_v = task_v - - if not imghdr.what(task._save_as): - data_error = True - - # try: - # if not download_file(url, output_file, handle_http_errors=False): - # raise OSError('network failure') - # if not imghdr.what(output_file): - # data_error = True - # break - # real_v = v - # real_h = h - # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} OK') - # - # except urllib.error.HTTPError: - # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} FAIL') - # break - - if data_error: - self.thumbnail_dl(page) - else: - with open(os.path.join(page_dir, 'meta.json'), 'w') as f: - f.write(json.dumps(dict(v=real_v+1, h=real_h+1))) - safe_print(f'[pf-{self._number}] page {page}: all files saved (seemingly...)') - - def thumbnail_dl(self, page): - page_dir = self._get_page_dir(page) - thumbnail_url = retronews.thumbnail_url(self._collection_id, self._doc_id, page) - if not download_file(thumbnail_url, os.path.join(page_dir, 'thumbnail.jpg')): - raise RuntimeError(f'network error, failed to download thumbnail ({thumbnail_url})') - safe_print(f'[pf-{self._number}] page {page}: corrupt files; replaced with a thumbnail') - - -def download_file(url, output, handle_http_errors=True) -> bool: - tries_left = 3 - ok = False - while tries_left > 0: - try: - urllib.request.urlretrieve(url, output) - ok = True - break - except http.client.RemoteDisconnected: - ok = False - print(' caught an exception, sleeping for 2 seconds and retrying...') - sleep(2) - tries_left -= 1 - except urllib.error.HTTPError as e: - if not handle_http_errors: - raise e - else: - print(f' failed to download {url}: {str(e)}') - return False - return ok - - -def grab_magazine(url: str, - output_root: str, - probe_pages: Optional[list[int]] = None, - probe_all=False, only_fetch=False, force_overwrite=False): - try: - pub_date, collection_id, doc_id = retronews.parse_url(url) - except AttributeError: - return False - - data = retronews.doc_info(collection_id, doc_id) - pages = int(data['nbPages']) - print(f'found {pages} pages') - - y, m, d = retronews.convert_date(pub_date) - if os.path.exists(os.path.join(output_root, f'{y}-{m}-{d}.pdf')): - if not force_overwrite: - print(f'{y}-{m}-{d}.pdf already exists, not continuing') - return True - else: - os.unlink(os.path.join(output_root, f'{y}-{m}-{d}.pdf')) - print(f'{y}-{m}-{d}.pdf already exists, deleting and continuing (force_overwrite=on)') - - output_dir = os.path.join(output_root, pub_date) - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - # fetch pages - for page in range(pages): - pages_queue.put(page+1) - - pool = [] - for i in range(PAGE_FETCHING_POOL_SIZE): - pool.append(PageFetchWorker(working_dir=output_dir, - number=i+1, - collection_id=collection_id, - doc_id=doc_id, - probe_pages=probe_pages, - probe_all=probe_all)) - for worker in pool: - worker.start() - - for worker in pool: - worker.join() - if worker.is_failed(): - with open(os.path.join(output_dir, 'error.txt'), 'w') as f: - f.write(f'error: {worker.get_error()}') - print(f'ERROR: failed to download {pub_date} magazine') - return False - - if only_fetch: - return True - - # merge tiles - for page in range(pages): - page += 1 - merging_queue.put(page) - - pool = [] - for i in range(TILE_MERGING_POOL_SIZE): - pool.append(TileMergeWorker(working_dir=output_dir, number=i+1)) - for worker in pool: - worker.start() - try: - for worker in pool: - worker.join() - - # merge images into pdf - files = [str(page + 1) + '.jpg' for page in range(pages)] - run(['convert', *files, os.path.join(output_root, f'{y}-{m}-{d}.pdf')], cwd=output_dir) - shutil.rmtree(output_dir) - except: - traceback.print_exc() - - return True - - -if __name__ == '__main__': - parser = ArgumentParser() - parser.add_argument('--url', type=str, required=True) - parser.add_argument('--output', type=str, required=True, - help='output directory') - parser.add_argument('--merge-threads', default=TILE_MERGING_POOL_SIZE, type=int) - parser.add_argument('--fetch-threads', default=PAGE_FETCHING_POOL_SIZE, type=int) - parser.add_argument('--continue-prev', action='store_true', - help='keep scrapping backwards in time') - parser.add_argument('--continue-next', action='store_true', - help='keep scrapping forwards in time') - parser.add_argument('--only-fetch', action='store_true', - help='only fetch magazine tiles and exit, do not merge anything') - parser.add_argument('--force-overwrite', action='store_true', - help='if file yyyy-mm-dd.pdf already exists, delete it and start over') - parser.add_argument('--force-probe', action='store_true', - help='force all pages to use the \'probe\' method') - parser.add_argument('--fetch-probe-pages', nargs='+', type=int, - help='force some pages to use the \'probe\' method, when count of vertical and horizontal tiles is unknown') - - args = parser.parse_args() - - with_continuation = args.continue_prev or args.continue_next - if args.fetch_probe_pages and with_continuation: - raise RuntimeError('--fetch-probe-pages cannot be used together with --continue-* options, it\'s a one time hack') - if args.only_fetch and with_continuation: - raise RuntimeError('--only-fetch cannot be used together with --continue-* options, it\'s a one time hack') - - TILE_MERGING_POOL_SIZE = args.merge_threads - PAGE_FETCHING_POOL_SIZE = args.fetch_threads - - url = args.url - while True: - print(f'grabbing {url}...') - if not grab_magazine(url, - output_root=args.output, - probe_pages=args.fetch_probe_pages, - probe_all=args.force_probe, - only_fetch=args.only_fetch, - force_overwrite=args.force_overwrite): - logging.error('failed to grab') - break - - if not args.continue_prev and not args.continue_next: - break - - r = requests.get(url) - - try: - next_url = None - if args.continue_next: - next_url = re.search(r'SUIVANT', r.text, re.S).groups()[0] - elif args.continue_prev: - next_url = re.search(r'\s+\s+PRÉCÉDENT', r.text, re.S).groups()[0] - - if not next_url: - if not next_url: - break - - if next_url.startswith('/'): - next_url = f'https://www.retronews.fr{next_url}' - - url = next_url - - except: - print('error: failed to find previous link! exiting') - break - diff --git a/mdf/__init__.py b/mdf/__init__.py new file mode 100644 index 0000000..9466436 --- /dev/null +++ b/mdf/__init__.py @@ -0,0 +1,3 @@ +from .retronews import retronews +from .util import util +from .database import Database \ No newline at end of file diff --git a/mdf/database/__init__.py b/mdf/database/__init__.py new file mode 100644 index 0000000..ef3f969 --- /dev/null +++ b/mdf/database/__init__.py @@ -0,0 +1 @@ +from .database import Database diff --git a/mdf/database/database.py b/mdf/database/database.py new file mode 100644 index 0000000..fd08e38 --- /dev/null +++ b/mdf/database/database.py @@ -0,0 +1,154 @@ +import sqlite3 +import logging +import os.path +from ..retronews import retronews +import threading + +from typing import Optional + + +class Database: + SCHEMA = 6 + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + file = os.path.join(os.path.dirname(__file__), '..', '..', 'mdf-retrobase.sqlite3') + self.sqlite = sqlite3.connect(file, check_same_thread=False) + self.lock = threading.Lock() + + sqlite_version = self._get_sqlite_version() + self.logger.debug(f'SQLite version: {sqlite_version}') + + schema_version = self.schema_get_version() + self.logger.debug(f'Schema version: {schema_version}') + + self.schema_init(schema_version) + self.schema_set_version(self.SCHEMA) + + def __del__(self): + if self.sqlite: + self.sqlite.commit() + self.sqlite.close() + + def _get_sqlite_version(self) -> str: + cursor = self.sqlite.cursor() + cursor.execute("SELECT sqlite_version()") + return cursor.fetchone()[0] + + def schema_get_version(self) -> int: + cursor = self.sqlite.execute('PRAGMA user_version') + return int(cursor.fetchone()[0]) + + def schema_set_version(self, v) -> None: + self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) + self.logger.info(f'Schema set to {v}') + + def cursor(self) -> sqlite3.Cursor: + return self.sqlite.cursor() + + def commit(self) -> None: + return self.sqlite.commit() + + def schema_init(self, version: int) -> None: + cursor = self.cursor() + + if version < 1: + # timestamps + cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_links ( + issue_date TEXT PRIMARY KEY, + url TEXT NOT NULL, + pages INTEGER NOT NULL + )""") + + if version < 2: + cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_pages ( + collection_id INTEGER NOT NULL, + doc_id INTEGER NOT NULL, + page INTEGER NOT NULL, + height INTEGER NOT NULL, + width INTEGER NOT NULL, + dpi INTEGER NOT NULL + )""") + cursor.execute("""CREATE UNIQUE INDEX mdf_pages_idx ON mdf_pages (collection_id, doc_id, page)""") + + if version < 3: + cursor.execute("ALTER TABLE mdf_pages ADD fail INTEGER NOT NULL") + + if version < 4: + cursor.execute("""CREATE INDEX mdf_pages_fail_idx ON mdf_pages (fail)""") + + if version < 5: + for col in ('collection_id', 'doc_id'): + cursor.execute(f"ALTER TABLE mdf_links ADD {col} INTEGER NOT NULL DEFAULT '0'") + cursor.execute("CREATE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)") + + if version < 6: + cursor.execute("DROP INDEX mdf_links_col_doc_idx") + cursor.execute("CREATE UNIQUE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)") + + self.commit() + + def add_link(self, issue_date: str, url: str, pages: int): + with self.lock: + self.cursor().execute("REPLACE INTO mdf_links (issue_date, url, pages) VALUES (?, ?, ?)", + (issue_date, url, str(pages))) + self.commit() + + def add_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int): + with self.lock: + self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, ?, ?, ?, 0)", + (collection_id, doc_id, page, width, height, dpi)) + self.commit() + + def update_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int): + with self.lock: + self.cursor().execute("UPDATE mdf_pages SET width=?, height=?, dpi=?, fail=0 WHERE collection_id=? AND doc_id=? AND page=?", + (width, height, dpi, collection_id, doc_id, page)) + self.commit() + + def add_page_failed(self, collection_id, doc_id, page): + with self.lock: + self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, 0, 0, 0, 1)", + (collection_id, doc_id, page)) + self.commit() + + def get_existing_pages(self, fail=0): + cur = self.cursor() + cur.execute("SELECT collection_id, doc_id, page FROM mdf_pages WHERE fail=?", (fail,)) + return cur.fetchall() + + def get_documents(self, range: Optional[tuple[str, str]] = None): + cur = self.cursor() + docs = [] + + sql = "SELECT issue_date, url, pages FROM mdf_links" + if range: + sql += f" WHERE issue_date BETWEEN '{range[0]}' AND '{range[1]}'" + sql += " ORDER BY issue_date" + cur.execute(sql) + for issue_date, url, pages in cur.fetchall(): + pub_date, collection_id, doc_id = retronews.parse_url(url) + docs.append(dict( + url=url, + collection_id=collection_id, + doc_id=doc_id, + pages=pages + )) + + return docs + + def get_doc_pages(self, collection_id, doc_id): + cur = self.cursor() + cur.execute("SELECT page, width, height, dpi FROM mdf_pages WHERE collection_id=? AND doc_id=?", + (collection_id, doc_id)) + return cur.fetchall() + + def fix_documents(self): + cur = self.cursor() + cur.execute("SELECT issue_date, url FROM mdf_links") + for issue_date, url in cur.fetchall(): + pub_date, cid, did = retronews.parse_url(url) + cur.execute("UPDATE mdf_links SET collection_id=?, doc_id=? WHERE issue_date=?", + (cid, did, issue_date)) + self.commit() diff --git a/mdf/retronews/__init__.py b/mdf/retronews/__init__.py new file mode 100644 index 0000000..105ca70 --- /dev/null +++ b/mdf/retronews/__init__.py @@ -0,0 +1,15 @@ +from .retronews import ( + convert_date, + parse_url, + _doc_info, + page_info, + thumbnail_url, + tile_url, + HTILES, + VTILES, + PAGE_FETCHING_POOL_SIZE, + TILE_MERGING_POOL_SIZE, + set_tile_merging_pool_size, + set_page_fetching_pool_size, + grab_magazine +) \ No newline at end of file diff --git a/mdf/retronews/retronews.py b/mdf/retronews/retronews.py new file mode 100644 index 0000000..4697e55 --- /dev/null +++ b/mdf/retronews/retronews.py @@ -0,0 +1,397 @@ +import re +import requests +import imghdr +import json +import os +import queue +import shutil +import traceback + +from ..util.util import safe_print, download_file, run +from typing import Optional +from threading import Thread +import urllib.error + +_pages_queue = queue.Queue() +_merging_queue = queue.Queue() + +VTILES = 3 +HTILES = 2 +TILE_MERGING_POOL_SIZE = 8 +PAGE_FETCHING_POOL_SIZE = 8 + + +MONTHS = dict( + jan=1, + feb=2, + mar=3, + apr=4, + may=5, + jun=6, + jul=7, + juillet=7, + aout=8, + aug=8, + sep=9, + oct=10, + nov=11, + novembre=11, # https://www.retronews.fr/journal/mercure-de-france/15-novembre-1905/118/2617647/1 + dec=12 +) + + +def convert_date(s: str) -> tuple[str, str, str]: + m = re.match(r'^(\d{2})-(.*?)-(\d{4})$', s).groups() + year = m[2] + month = '%02d' % MONTHS[m[1]] + day = m[0] + return year, month, day + + +def parse_url(url: str) -> tuple: + return re.search(r'/(?:[\-\d\w]+)/([^/]+)/(\d+)/(\d+)/', url).groups() + + +def _doc_info(collection_id, doc_id): + r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}') + return r.json() + + +def page_info(collection_id, doc_id, page): + r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/') + return r.json() + + +def thumbnail_url(collection_id, doc_id, page) -> str: + return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/thumbnail' + + +def tile_url(collection_id, doc_id, page, v_tile, h_tile) -> str: + return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/tile/{h_tile}/{v_tile}/0' + + + + +class DownloaderThread(Thread): + _url: str + _save_as: str + _download_result: Optional[bool] + _handle_http: bool + user_info: dict + + def __init__(self, url: str, save_as: str, thread_name=None, handle_http=False, user_info=None): + super().__init__() + if user_info is None: + user_info = {} + if thread_name: + self.name = thread_name + + self._url = url + self._save_as = save_as + self._download_result = None + self._handle_http = handle_http + self.user_info = user_info + + def run(self): + try: + self._download_result = download_file(self._url, self._save_as, handle_http_errors=not self._handle_http) + except urllib.error.HTTPError: + pass + + def is_downloaded(self) -> bool: + return self._download_result is True + + +class TileMergeWorker(Thread): + _working_dir: str + _number: int + + def __init__(self, working_dir: str, number: int): + super().__init__() + self._working_dir = working_dir + self._number = number + + def run(self): + safe_print(f'[tile merger {self._number}] started') + + while not _merging_queue.empty(): + try: + page = _merging_queue.get_nowait() + page_dir = os.path.join(self._working_dir, str(page)) + thumbnail_path = os.path.join(page_dir, 'thumbnail.jpg') + meta_path = os.path.join(page_dir, 'meta.json') + + if os.path.exists(thumbnail_path): + shutil.copy(thumbnail_path, os.path.join(self._working_dir, f'{page}.jpg')) + continue + + if os.path.exists(meta_path): + with open(meta_path, 'r') as f: + meta = json.loads(f.read()) + htiles = meta['h'] + vtiles = meta['v'] + else: + htiles = HTILES + vtiles = VTILES + + hfiles = [] + for h in range(htiles): + vfiles = [] + for v in range(vtiles): + vfiles.append(f'v{v}_h{h}.jpg') + run(['convert', '-append', *vfiles, f'{h}.jpg'], cwd=page_dir) + hfiles.append(f'{h}.jpg') + + run(['convert', '+append', *hfiles, os.path.join(self._working_dir, f'{page}.jpg')], cwd=page_dir) + # shutil.rmtree(page_dir) + + safe_print(f'[tile merger {self._number}] page {page} done') + + except queue.Empty: + break + + +class PageFetchWorker(Thread): + _working_dir: str + _number: int + _failed: bool + _error: Optional[str] + _probe_pages: Optional[list[int]] + _probe_all: bool + + def __init__(self, working_dir: str, number: int, collection_id, doc_id, probe_pages: Optional[list[int]] = None, probe_all=False): + super().__init__() + self._working_dir = working_dir + self._number = number + self._collection_id = collection_id + self._doc_id = doc_id + self._failed = False + self._error = None + self._probe_pages = probe_pages + self._probe_all = probe_all + + def run(self): + safe_print(f'[pf-{self._number}] started') + page = 0 + + try: + while not _pages_queue.empty(): + try: + page = _pages_queue.get_nowait() + safe_print(f'[pf-{self._number}] page {page} started') + + if self._probe_all or (self._probe_pages is not None and page in self._probe_pages): + self.probe_dl(page) + else: + try: + self.normal_dl(page) + except OSError: + safe_print(f'[pf-{self._number}] normal_dl() failed, trying probe_dl()') + self.probe_dl(page) + + except queue.Empty: + break + + except Exception as e: + self._failed = True + self._error = f'while fetching page {page}: {str(e)}' + traceback.format_exc() + + def _get_page_dir(self, page): + page_dir = os.path.join(self._working_dir, str(page)) + if not os.path.exists(page_dir): + os.makedirs(page_dir) + return page_dir + + def is_failed(self) -> bool: + return self._failed + + def get_error(self) -> str: + return self._error if self._error is not None else '' + + def normal_dl(self, page): + page_dir = self._get_page_dir(page) + dl_tasks = [] + for horiz_tile in range(HTILES): + for vert_tile in range(VTILES): + url = tile_url(self._collection_id, self._doc_id, page, h_tile=horiz_tile, v_tile=vert_tile) + output_file = f'{page_dir}/v{vert_tile}_h{horiz_tile}.jpg' + if os.path.isfile(output_file): + if os.path.getsize(output_file) < 4: + os.unlink(output_file) + # safe_print(f'[pf-{self._number}] already exists') + continue + + dl_tasks.append(DownloaderThread(url=url, + save_as=os.path.join(page_dir, output_file), + thread_name=f'p{page}-v{vert_tile}-h{horiz_tile}')) + + for task in dl_tasks: + task.start() + + data_error = False + + for task in dl_tasks: + task.join() + if not task.is_downloaded(): + # safe_print(f'failed to download file {task._url}') + raise OSError(f'network error, failed to download {task._url}') + + elif not imghdr.what(task._save_as): + data_error = True + + if data_error: + self.thumbnail_dl(page) + else: + safe_print(f'[pf-{self._number}] page {page}: all files saved') + + def probe_dl(self, page): + page_dir = self._get_page_dir(page) + real_h = 0 + real_v = 0 + data_error = False + dl_tasks = [] + for h in range(10): + for v in range(10): + url = tile_url(self._collection_id, self._doc_id, page, h_tile=h, v_tile=v) + output_file = f'{page_dir}/{h}x{v}.jpg' + if os.path.isfile(output_file): + safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} ALREADY') + if os.path.getsize(output_file) < 4: + os.unlink(output_file) + continue + + dl_tasks.append(DownloaderThread(url=url, + save_as=os.path.join(page_dir, output_file), + handle_http=True, + thread_name=f'p{page}-v{v}-h{h}', + user_info=dict(h=h, v=v))) + + for task in dl_tasks: + task.start() + for task in dl_tasks: + task.join() + + if task.is_downloaded(): + task_h = task.user_info['h'] + task_v = task.user_info['v'] + if task_h > real_h: + real_h = task_h + if task_v > real_v: + real_v = task_v + + if not imghdr.what(task._save_as): + data_error = True + + # try: + # if not download_file(url, output_file, handle_http_errors=False): + # raise OSError('network failure') + # if not imghdr.what(output_file): + # data_error = True + # break + # real_v = v + # real_h = h + # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} OK') + # + # except urllib.error.HTTPError: + # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} FAIL') + # break + + if data_error: + self.thumbnail_dl(page) + else: + with open(os.path.join(page_dir, 'meta.json'), 'w') as f: + f.write(json.dumps(dict(v=real_v+1, h=real_h+1))) + safe_print(f'[pf-{self._number}] page {page}: all files saved (seemingly...)') + + def thumbnail_dl(self, page): + page_dir = self._get_page_dir(page) + thumb_url = thumbnail_url(self._collection_id, self._doc_id, page) + if not download_file(thumb_url, os.path.join(page_dir, 'thumbnail.jpg')): + raise RuntimeError(f'network error, failed to download thumbnail ({thumb_url})') + safe_print(f'[pf-{self._number}] page {page}: corrupt files; replaced with a thumbnail') + + +def grab_magazine(url: str, + output_root: str, + probe_pages: Optional[list[int]] = None, + probe_all=False, only_fetch=False, force_overwrite=False): + try: + pub_date, collection_id, doc_id = parse_url(url) + except AttributeError: + return False + + data = _doc_info(collection_id, doc_id) + pages = int(data['nbPages']) + print(f'found {pages} pages') + + y, m, d = convert_date(pub_date) + if os.path.exists(os.path.join(output_root, f'{y}-{m}-{d}.pdf')): + if not force_overwrite: + print(f'{y}-{m}-{d}.pdf already exists, not continuing') + return True + else: + os.unlink(os.path.join(output_root, f'{y}-{m}-{d}.pdf')) + print(f'{y}-{m}-{d}.pdf already exists, deleting and continuing (force_overwrite=on)') + + output_dir = os.path.join(output_root, pub_date) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # fetch pages + for page in range(pages): + _pages_queue.put(page+1) + + pool = [] + for i in range(PAGE_FETCHING_POOL_SIZE): + pool.append(PageFetchWorker(working_dir=output_dir, + number=i+1, + collection_id=collection_id, + doc_id=doc_id, + probe_pages=probe_pages, + probe_all=probe_all)) + for worker in pool: + worker.start() + + for worker in pool: + worker.join() + if worker.is_failed(): + with open(os.path.join(output_dir, 'error.txt'), 'w') as f: + f.write(f'error: {worker.get_error()}') + print(f'ERROR: failed to download {pub_date} magazine') + return False + + if only_fetch: + return True + + # merge tiles + for page in range(pages): + page += 1 + _merging_queue.put(page) + + pool = [] + for i in range(TILE_MERGING_POOL_SIZE): + pool.append(TileMergeWorker(working_dir=output_dir, number=i+1)) + for worker in pool: + worker.start() + try: + for worker in pool: + worker.join() + + # merge images into pdf + files = [str(page + 1) + '.jpg' for page in range(pages)] + run(['convert', *files, os.path.join(output_root, f'{y}-{m}-{d}.pdf')], cwd=output_dir) + shutil.rmtree(output_dir) + except: + traceback.print_exc() + + return True + + +def set_tile_merging_pool_size(size): + global TILE_MERGING_POOL_SIZE + TILE_MERGING_POOL_SIZE = size + + +def set_page_fetching_pool_size(size): + global PAGE_FETCHING_POOL_SIZE + PAGE_FETCHING_POOL_SIZE = size diff --git a/mdf/util/__init__.py b/mdf/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mdf/util/util.py b/mdf/util/util.py new file mode 100644 index 0000000..b233d88 --- /dev/null +++ b/mdf/util/util.py @@ -0,0 +1,44 @@ +import subprocess +import urllib.request +import urllib.error + +from time import sleep +from threading import Lock +import http.client + + +_print_lock = Lock() + + +def safe_print(*args, **kwargs): + with _print_lock: + print(*args, **kwargs) + + +def run(args: list, **kwargs): + p = subprocess.run(args, **kwargs) + if p.returncode != 0: + raise OSError(f'convert returned {p.returncode} ('+' '.join(args)+')') + + +def download_file(url, output, handle_http_errors=True) -> bool: + tries_left = 3 + ok = False + while tries_left > 0: + try: + urllib.request.urlretrieve(url, output) + ok = True + break + except http.client.RemoteDisconnected: + ok = False + print(' caught an exception, sleeping for 2 seconds and retrying...') + sleep(2) + tries_left -= 1 + except urllib.error.HTTPError as e: + if not handle_http_errors: + raise e + else: + print(f' failed to download {url}: {str(e)}') + return False + return ok + diff --git a/retronews/__init__.py b/retronews/__init__.py deleted file mode 100644 index ae3b518..0000000 --- a/retronews/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from .retronews import ( - convert_date, - parse_url, - doc_info, - page_info, - thumbnail_url, - tile_url -) \ No newline at end of file diff --git a/retronews/retronews.py b/retronews/retronews.py deleted file mode 100644 index 9e80c58..0000000 --- a/retronews/retronews.py +++ /dev/null @@ -1,50 +0,0 @@ -import re -import requests - -MONTHS = dict( - jan=1, - feb=2, - mar=3, - apr=4, - may=5, - jun=6, - jul=7, - juillet=7, - aout=8, - aug=8, - sep=9, - oct=10, - nov=11, - novembre=11, # https://www.retronews.fr/journal/mercure-de-france/15-novembre-1905/118/2617647/1 - dec=12 -) - - -def convert_date(s: str) -> tuple[str, str, str]: - m = re.match(r'^(\d{2})-(.*?)-(\d{4})$', s).groups() - year = m[2] - month = '%02d' % MONTHS[m[1]] - day = m[0] - return year, month, day - - -def parse_url(url: str) -> tuple: - return re.search(r'/(?:[\-\d\w]+)/([^/]+)/(\d+)/(\d+)/', url).groups() - - -def doc_info(collection_id, doc_id): - r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}') - return r.json() - - -def page_info(collection_id, doc_id, page): - r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/') - return r.json() - - -def thumbnail_url(collection_id, doc_id, page) -> str: - return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/thumbnail' - - -def tile_url(collection_id, doc_id, page, v_tile, h_tile) -> str: - return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/tile/{h_tile}/{v_tile}/0' -- cgit v1.2.3