summaryrefslogtreecommitdiff
path: root/mdf
diff options
context:
space:
mode:
Diffstat (limited to 'mdf')
-rw-r--r--mdf/__init__.py3
-rw-r--r--mdf/database/__init__.py1
-rw-r--r--mdf/database/database.py154
-rw-r--r--mdf/retronews/__init__.py15
-rw-r--r--mdf/retronews/retronews.py397
-rw-r--r--mdf/util/__init__.py0
-rw-r--r--mdf/util/util.py44
7 files changed, 614 insertions, 0 deletions
diff --git a/mdf/__init__.py b/mdf/__init__.py
new file mode 100644
index 0000000..9466436
--- /dev/null
+++ b/mdf/__init__.py
@@ -0,0 +1,3 @@
+from .retronews import retronews
+from .util import util
+from .database import Database \ No newline at end of file
diff --git a/mdf/database/__init__.py b/mdf/database/__init__.py
new file mode 100644
index 0000000..ef3f969
--- /dev/null
+++ b/mdf/database/__init__.py
@@ -0,0 +1 @@
+from .database import Database
diff --git a/mdf/database/database.py b/mdf/database/database.py
new file mode 100644
index 0000000..fd08e38
--- /dev/null
+++ b/mdf/database/database.py
@@ -0,0 +1,154 @@
+import sqlite3
+import logging
+import os.path
+from ..retronews import retronews
+import threading
+
+from typing import Optional
+
+
+class Database:
+ SCHEMA = 6
+
+ def __init__(self):
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ file = os.path.join(os.path.dirname(__file__), '..', '..', 'mdf-retrobase.sqlite3')
+ self.sqlite = sqlite3.connect(file, check_same_thread=False)
+ self.lock = threading.Lock()
+
+ sqlite_version = self._get_sqlite_version()
+ self.logger.debug(f'SQLite version: {sqlite_version}')
+
+ schema_version = self.schema_get_version()
+ self.logger.debug(f'Schema version: {schema_version}')
+
+ self.schema_init(schema_version)
+ self.schema_set_version(self.SCHEMA)
+
+ def __del__(self):
+ if self.sqlite:
+ self.sqlite.commit()
+ self.sqlite.close()
+
+ def _get_sqlite_version(self) -> str:
+ cursor = self.sqlite.cursor()
+ cursor.execute("SELECT sqlite_version()")
+ return cursor.fetchone()[0]
+
+ def schema_get_version(self) -> int:
+ cursor = self.sqlite.execute('PRAGMA user_version')
+ return int(cursor.fetchone()[0])
+
+ def schema_set_version(self, v) -> None:
+ self.sqlite.execute('PRAGMA user_version={:d}'.format(v))
+ self.logger.info(f'Schema set to {v}')
+
+ def cursor(self) -> sqlite3.Cursor:
+ return self.sqlite.cursor()
+
+ def commit(self) -> None:
+ return self.sqlite.commit()
+
+ def schema_init(self, version: int) -> None:
+ cursor = self.cursor()
+
+ if version < 1:
+ # timestamps
+ cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_links (
+ issue_date TEXT PRIMARY KEY,
+ url TEXT NOT NULL,
+ pages INTEGER NOT NULL
+ )""")
+
+ if version < 2:
+ cursor.execute("""CREATE TABLE IF NOT EXISTS mdf_pages (
+ collection_id INTEGER NOT NULL,
+ doc_id INTEGER NOT NULL,
+ page INTEGER NOT NULL,
+ height INTEGER NOT NULL,
+ width INTEGER NOT NULL,
+ dpi INTEGER NOT NULL
+ )""")
+ cursor.execute("""CREATE UNIQUE INDEX mdf_pages_idx ON mdf_pages (collection_id, doc_id, page)""")
+
+ if version < 3:
+ cursor.execute("ALTER TABLE mdf_pages ADD fail INTEGER NOT NULL")
+
+ if version < 4:
+ cursor.execute("""CREATE INDEX mdf_pages_fail_idx ON mdf_pages (fail)""")
+
+ if version < 5:
+ for col in ('collection_id', 'doc_id'):
+ cursor.execute(f"ALTER TABLE mdf_links ADD {col} INTEGER NOT NULL DEFAULT '0'")
+ cursor.execute("CREATE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)")
+
+ if version < 6:
+ cursor.execute("DROP INDEX mdf_links_col_doc_idx")
+ cursor.execute("CREATE UNIQUE INDEX mdf_links_col_doc_idx ON mdf_links (collection_id, doc_id)")
+
+ self.commit()
+
+ def add_link(self, issue_date: str, url: str, pages: int):
+ with self.lock:
+ self.cursor().execute("REPLACE INTO mdf_links (issue_date, url, pages) VALUES (?, ?, ?)",
+ (issue_date, url, str(pages)))
+ self.commit()
+
+ def add_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int):
+ with self.lock:
+ self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, ?, ?, ?, 0)",
+ (collection_id, doc_id, page, width, height, dpi))
+ self.commit()
+
+ def update_page(self, collection_id: int, doc_id: int, page: int, width: int, height: int, dpi: int):
+ with self.lock:
+ self.cursor().execute("UPDATE mdf_pages SET width=?, height=?, dpi=?, fail=0 WHERE collection_id=? AND doc_id=? AND page=?",
+ (width, height, dpi, collection_id, doc_id, page))
+ self.commit()
+
+ def add_page_failed(self, collection_id, doc_id, page):
+ with self.lock:
+ self.cursor().execute("INSERT INTO mdf_pages (collection_id, doc_id, page, width, height, dpi, fail) VALUES (?, ?, ?, 0, 0, 0, 1)",
+ (collection_id, doc_id, page))
+ self.commit()
+
+ def get_existing_pages(self, fail=0):
+ cur = self.cursor()
+ cur.execute("SELECT collection_id, doc_id, page FROM mdf_pages WHERE fail=?", (fail,))
+ return cur.fetchall()
+
+ def get_documents(self, range: Optional[tuple[str, str]] = None):
+ cur = self.cursor()
+ docs = []
+
+ sql = "SELECT issue_date, url, pages FROM mdf_links"
+ if range:
+ sql += f" WHERE issue_date BETWEEN '{range[0]}' AND '{range[1]}'"
+ sql += " ORDER BY issue_date"
+ cur.execute(sql)
+ for issue_date, url, pages in cur.fetchall():
+ pub_date, collection_id, doc_id = retronews.parse_url(url)
+ docs.append(dict(
+ url=url,
+ collection_id=collection_id,
+ doc_id=doc_id,
+ pages=pages
+ ))
+
+ return docs
+
+ def get_doc_pages(self, collection_id, doc_id):
+ cur = self.cursor()
+ cur.execute("SELECT page, width, height, dpi FROM mdf_pages WHERE collection_id=? AND doc_id=?",
+ (collection_id, doc_id))
+ return cur.fetchall()
+
+ def fix_documents(self):
+ cur = self.cursor()
+ cur.execute("SELECT issue_date, url FROM mdf_links")
+ for issue_date, url in cur.fetchall():
+ pub_date, cid, did = retronews.parse_url(url)
+ cur.execute("UPDATE mdf_links SET collection_id=?, doc_id=? WHERE issue_date=?",
+ (cid, did, issue_date))
+ self.commit()
diff --git a/mdf/retronews/__init__.py b/mdf/retronews/__init__.py
new file mode 100644
index 0000000..105ca70
--- /dev/null
+++ b/mdf/retronews/__init__.py
@@ -0,0 +1,15 @@
+from .retronews import (
+ convert_date,
+ parse_url,
+ _doc_info,
+ page_info,
+ thumbnail_url,
+ tile_url,
+ HTILES,
+ VTILES,
+ PAGE_FETCHING_POOL_SIZE,
+ TILE_MERGING_POOL_SIZE,
+ set_tile_merging_pool_size,
+ set_page_fetching_pool_size,
+ grab_magazine
+) \ No newline at end of file
diff --git a/mdf/retronews/retronews.py b/mdf/retronews/retronews.py
new file mode 100644
index 0000000..4697e55
--- /dev/null
+++ b/mdf/retronews/retronews.py
@@ -0,0 +1,397 @@
+import re
+import requests
+import imghdr
+import json
+import os
+import queue
+import shutil
+import traceback
+
+from ..util.util import safe_print, download_file, run
+from typing import Optional
+from threading import Thread
+import urllib.error
+
+_pages_queue = queue.Queue()
+_merging_queue = queue.Queue()
+
+VTILES = 3
+HTILES = 2
+TILE_MERGING_POOL_SIZE = 8
+PAGE_FETCHING_POOL_SIZE = 8
+
+
+MONTHS = dict(
+ jan=1,
+ feb=2,
+ mar=3,
+ apr=4,
+ may=5,
+ jun=6,
+ jul=7,
+ juillet=7,
+ aout=8,
+ aug=8,
+ sep=9,
+ oct=10,
+ nov=11,
+ novembre=11, # https://www.retronews.fr/journal/mercure-de-france/15-novembre-1905/118/2617647/1
+ dec=12
+)
+
+
+def convert_date(s: str) -> tuple[str, str, str]:
+ m = re.match(r'^(\d{2})-(.*?)-(\d{4})$', s).groups()
+ year = m[2]
+ month = '%02d' % MONTHS[m[1]]
+ day = m[0]
+ return year, month, day
+
+
+def parse_url(url: str) -> tuple:
+ return re.search(r'/(?:[\-\d\w]+)/([^/]+)/(\d+)/(\d+)/', url).groups()
+
+
+def _doc_info(collection_id, doc_id):
+ r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}')
+ return r.json()
+
+
+def page_info(collection_id, doc_id, page):
+ r = requests.get(f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/')
+ return r.json()
+
+
+def thumbnail_url(collection_id, doc_id, page) -> str:
+ return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/thumbnail'
+
+
+def tile_url(collection_id, doc_id, page, v_tile, h_tile) -> str:
+ return f'https://pv5web.retronews.fr/api/document/{collection_id}/{doc_id}/page/{page}/tile/{h_tile}/{v_tile}/0'
+
+
+
+
+class DownloaderThread(Thread):
+ _url: str
+ _save_as: str
+ _download_result: Optional[bool]
+ _handle_http: bool
+ user_info: dict
+
+ def __init__(self, url: str, save_as: str, thread_name=None, handle_http=False, user_info=None):
+ super().__init__()
+ if user_info is None:
+ user_info = {}
+ if thread_name:
+ self.name = thread_name
+
+ self._url = url
+ self._save_as = save_as
+ self._download_result = None
+ self._handle_http = handle_http
+ self.user_info = user_info
+
+ def run(self):
+ try:
+ self._download_result = download_file(self._url, self._save_as, handle_http_errors=not self._handle_http)
+ except urllib.error.HTTPError:
+ pass
+
+ def is_downloaded(self) -> bool:
+ return self._download_result is True
+
+
+class TileMergeWorker(Thread):
+ _working_dir: str
+ _number: int
+
+ def __init__(self, working_dir: str, number: int):
+ super().__init__()
+ self._working_dir = working_dir
+ self._number = number
+
+ def run(self):
+ safe_print(f'[tile merger {self._number}] started')
+
+ while not _merging_queue.empty():
+ try:
+ page = _merging_queue.get_nowait()
+ page_dir = os.path.join(self._working_dir, str(page))
+ thumbnail_path = os.path.join(page_dir, 'thumbnail.jpg')
+ meta_path = os.path.join(page_dir, 'meta.json')
+
+ if os.path.exists(thumbnail_path):
+ shutil.copy(thumbnail_path, os.path.join(self._working_dir, f'{page}.jpg'))
+ continue
+
+ if os.path.exists(meta_path):
+ with open(meta_path, 'r') as f:
+ meta = json.loads(f.read())
+ htiles = meta['h']
+ vtiles = meta['v']
+ else:
+ htiles = HTILES
+ vtiles = VTILES
+
+ hfiles = []
+ for h in range(htiles):
+ vfiles = []
+ for v in range(vtiles):
+ vfiles.append(f'v{v}_h{h}.jpg')
+ run(['convert', '-append', *vfiles, f'{h}.jpg'], cwd=page_dir)
+ hfiles.append(f'{h}.jpg')
+
+ run(['convert', '+append', *hfiles, os.path.join(self._working_dir, f'{page}.jpg')], cwd=page_dir)
+ # shutil.rmtree(page_dir)
+
+ safe_print(f'[tile merger {self._number}] page {page} done')
+
+ except queue.Empty:
+ break
+
+
+class PageFetchWorker(Thread):
+ _working_dir: str
+ _number: int
+ _failed: bool
+ _error: Optional[str]
+ _probe_pages: Optional[list[int]]
+ _probe_all: bool
+
+ def __init__(self, working_dir: str, number: int, collection_id, doc_id, probe_pages: Optional[list[int]] = None, probe_all=False):
+ super().__init__()
+ self._working_dir = working_dir
+ self._number = number
+ self._collection_id = collection_id
+ self._doc_id = doc_id
+ self._failed = False
+ self._error = None
+ self._probe_pages = probe_pages
+ self._probe_all = probe_all
+
+ def run(self):
+ safe_print(f'[pf-{self._number}] started')
+ page = 0
+
+ try:
+ while not _pages_queue.empty():
+ try:
+ page = _pages_queue.get_nowait()
+ safe_print(f'[pf-{self._number}] page {page} started')
+
+ if self._probe_all or (self._probe_pages is not None and page in self._probe_pages):
+ self.probe_dl(page)
+ else:
+ try:
+ self.normal_dl(page)
+ except OSError:
+ safe_print(f'[pf-{self._number}] normal_dl() failed, trying probe_dl()')
+ self.probe_dl(page)
+
+ except queue.Empty:
+ break
+
+ except Exception as e:
+ self._failed = True
+ self._error = f'while fetching page {page}: {str(e)}' + traceback.format_exc()
+
+ def _get_page_dir(self, page):
+ page_dir = os.path.join(self._working_dir, str(page))
+ if not os.path.exists(page_dir):
+ os.makedirs(page_dir)
+ return page_dir
+
+ def is_failed(self) -> bool:
+ return self._failed
+
+ def get_error(self) -> str:
+ return self._error if self._error is not None else ''
+
+ def normal_dl(self, page):
+ page_dir = self._get_page_dir(page)
+ dl_tasks = []
+ for horiz_tile in range(HTILES):
+ for vert_tile in range(VTILES):
+ url = tile_url(self._collection_id, self._doc_id, page, h_tile=horiz_tile, v_tile=vert_tile)
+ output_file = f'{page_dir}/v{vert_tile}_h{horiz_tile}.jpg'
+ if os.path.isfile(output_file):
+ if os.path.getsize(output_file) < 4:
+ os.unlink(output_file)
+ # safe_print(f'[pf-{self._number}] already exists')
+ continue
+
+ dl_tasks.append(DownloaderThread(url=url,
+ save_as=os.path.join(page_dir, output_file),
+ thread_name=f'p{page}-v{vert_tile}-h{horiz_tile}'))
+
+ for task in dl_tasks:
+ task.start()
+
+ data_error = False
+
+ for task in dl_tasks:
+ task.join()
+ if not task.is_downloaded():
+ # safe_print(f'failed to download file {task._url}')
+ raise OSError(f'network error, failed to download {task._url}')
+
+ elif not imghdr.what(task._save_as):
+ data_error = True
+
+ if data_error:
+ self.thumbnail_dl(page)
+ else:
+ safe_print(f'[pf-{self._number}] page {page}: all files saved')
+
+ def probe_dl(self, page):
+ page_dir = self._get_page_dir(page)
+ real_h = 0
+ real_v = 0
+ data_error = False
+ dl_tasks = []
+ for h in range(10):
+ for v in range(10):
+ url = tile_url(self._collection_id, self._doc_id, page, h_tile=h, v_tile=v)
+ output_file = f'{page_dir}/{h}x{v}.jpg'
+ if os.path.isfile(output_file):
+ safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} ALREADY')
+ if os.path.getsize(output_file) < 4:
+ os.unlink(output_file)
+ continue
+
+ dl_tasks.append(DownloaderThread(url=url,
+ save_as=os.path.join(page_dir, output_file),
+ handle_http=True,
+ thread_name=f'p{page}-v{v}-h{h}',
+ user_info=dict(h=h, v=v)))
+
+ for task in dl_tasks:
+ task.start()
+ for task in dl_tasks:
+ task.join()
+
+ if task.is_downloaded():
+ task_h = task.user_info['h']
+ task_v = task.user_info['v']
+ if task_h > real_h:
+ real_h = task_h
+ if task_v > real_v:
+ real_v = task_v
+
+ if not imghdr.what(task._save_as):
+ data_error = True
+
+ # try:
+ # if not download_file(url, output_file, handle_http_errors=False):
+ # raise OSError('network failure')
+ # if not imghdr.what(output_file):
+ # data_error = True
+ # break
+ # real_v = v
+ # real_h = h
+ # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} OK')
+ #
+ # except urllib.error.HTTPError:
+ # safe_print(f'[pf-{self._number}] probing page {page}: v={v} h={h} FAIL')
+ # break
+
+ if data_error:
+ self.thumbnail_dl(page)
+ else:
+ with open(os.path.join(page_dir, 'meta.json'), 'w') as f:
+ f.write(json.dumps(dict(v=real_v+1, h=real_h+1)))
+ safe_print(f'[pf-{self._number}] page {page}: all files saved (seemingly...)')
+
+ def thumbnail_dl(self, page):
+ page_dir = self._get_page_dir(page)
+ thumb_url = thumbnail_url(self._collection_id, self._doc_id, page)
+ if not download_file(thumb_url, os.path.join(page_dir, 'thumbnail.jpg')):
+ raise RuntimeError(f'network error, failed to download thumbnail ({thumb_url})')
+ safe_print(f'[pf-{self._number}] page {page}: corrupt files; replaced with a thumbnail')
+
+
+def grab_magazine(url: str,
+ output_root: str,
+ probe_pages: Optional[list[int]] = None,
+ probe_all=False, only_fetch=False, force_overwrite=False):
+ try:
+ pub_date, collection_id, doc_id = parse_url(url)
+ except AttributeError:
+ return False
+
+ data = _doc_info(collection_id, doc_id)
+ pages = int(data['nbPages'])
+ print(f'found {pages} pages')
+
+ y, m, d = convert_date(pub_date)
+ if os.path.exists(os.path.join(output_root, f'{y}-{m}-{d}.pdf')):
+ if not force_overwrite:
+ print(f'{y}-{m}-{d}.pdf already exists, not continuing')
+ return True
+ else:
+ os.unlink(os.path.join(output_root, f'{y}-{m}-{d}.pdf'))
+ print(f'{y}-{m}-{d}.pdf already exists, deleting and continuing (force_overwrite=on)')
+
+ output_dir = os.path.join(output_root, pub_date)
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir)
+
+ # fetch pages
+ for page in range(pages):
+ _pages_queue.put(page+1)
+
+ pool = []
+ for i in range(PAGE_FETCHING_POOL_SIZE):
+ pool.append(PageFetchWorker(working_dir=output_dir,
+ number=i+1,
+ collection_id=collection_id,
+ doc_id=doc_id,
+ probe_pages=probe_pages,
+ probe_all=probe_all))
+ for worker in pool:
+ worker.start()
+
+ for worker in pool:
+ worker.join()
+ if worker.is_failed():
+ with open(os.path.join(output_dir, 'error.txt'), 'w') as f:
+ f.write(f'error: {worker.get_error()}')
+ print(f'ERROR: failed to download {pub_date} magazine')
+ return False
+
+ if only_fetch:
+ return True
+
+ # merge tiles
+ for page in range(pages):
+ page += 1
+ _merging_queue.put(page)
+
+ pool = []
+ for i in range(TILE_MERGING_POOL_SIZE):
+ pool.append(TileMergeWorker(working_dir=output_dir, number=i+1))
+ for worker in pool:
+ worker.start()
+ try:
+ for worker in pool:
+ worker.join()
+
+ # merge images into pdf
+ files = [str(page + 1) + '.jpg' for page in range(pages)]
+ run(['convert', *files, os.path.join(output_root, f'{y}-{m}-{d}.pdf')], cwd=output_dir)
+ shutil.rmtree(output_dir)
+ except:
+ traceback.print_exc()
+
+ return True
+
+
+def set_tile_merging_pool_size(size):
+ global TILE_MERGING_POOL_SIZE
+ TILE_MERGING_POOL_SIZE = size
+
+
+def set_page_fetching_pool_size(size):
+ global PAGE_FETCHING_POOL_SIZE
+ PAGE_FETCHING_POOL_SIZE = size
diff --git a/mdf/util/__init__.py b/mdf/util/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/mdf/util/__init__.py
diff --git a/mdf/util/util.py b/mdf/util/util.py
new file mode 100644
index 0000000..b233d88
--- /dev/null
+++ b/mdf/util/util.py
@@ -0,0 +1,44 @@
+import subprocess
+import urllib.request
+import urllib.error
+
+from time import sleep
+from threading import Lock
+import http.client
+
+
+_print_lock = Lock()
+
+
+def safe_print(*args, **kwargs):
+ with _print_lock:
+ print(*args, **kwargs)
+
+
+def run(args: list, **kwargs):
+ p = subprocess.run(args, **kwargs)
+ if p.returncode != 0:
+ raise OSError(f'convert returned {p.returncode} ('+' '.join(args)+')')
+
+
+def download_file(url, output, handle_http_errors=True) -> bool:
+ tries_left = 3
+ ok = False
+ while tries_left > 0:
+ try:
+ urllib.request.urlretrieve(url, output)
+ ok = True
+ break
+ except http.client.RemoteDisconnected:
+ ok = False
+ print(' caught an exception, sleeping for 2 seconds and retrying...')
+ sleep(2)
+ tries_left -= 1
+ except urllib.error.HTTPError as e:
+ if not handle_http_errors:
+ raise e
+ else:
+ print(f' failed to download {url}: {str(e)}')
+ return False
+ return ok
+