#!/usr/bin/env python3 import retronews import threading import queue import sqlite3 from database import Database from argparse import ArgumentParser db = Database() print_lock = threading.Lock() ok_lock = threading.Lock() fail_lock = threading.Lock() tasks_queue = queue.Queue() done_ok = 0 done_fail = 0 def incr_ok(): global done_ok with ok_lock: done_ok += 1 print_state() def incr_fail(): global done_fail with fail_lock: done_fail += 1 print_state() def print_state(): with print_lock: print(f'ok={done_ok} fail={done_fail}') class PageWorker(threading.Thread): _do_update: bool def __init__(self, do_update: bool): super().__init__() self._do_update = do_update def run(self): while not tasks_queue.empty(): try: collection_id, doc_id, page = tasks_queue.get_nowait() try: info = retronews.page_info(collection_id, doc_id, page) try: f = getattr(db, 'add_page' if not self._do_update else 'update_page') f(collection_id, doc_id, page, info['width'], info['height'], info['dpi']) except sqlite3.IntegrityError: with print_lock: print(f'error: unique failed for ({collection_id}, {doc_id}, {page})') incr_ok() except: # traceback.print_exc() if self._do_update: with print_lock: print(f'error: skipping updating the page ({collection_id}, {doc_id}, {page}) cause failed again') else: db.add_page_failed(collection_id, doc_id, page) incr_fail() except queue.Empty: break if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('--threads', type=int, required=True) parser.add_argument('--fails', action='store_true') args = parser.parse_args() if args.fails: pages = db.get_existing_pages(fail=1) for cid, did, page in pages: tasks_queue.put((cid, did, page)) pages = None else: ex_pages = db.get_existing_pages() ex_map = {} for cid, did, page in ex_pages: ex_map[f'{cid}_{did}_{page}'] = 1 docs = db.get_documents() for doc in docs: for page in range(doc['pages']): page += 1 if f"{doc['collection_id']}_{doc['doc_id']}_{page}" not in ex_map: tasks_queue.put((doc['collection_id'], doc['doc_id'], page)) ex_pages = None ex_map = None docs = None pool = [] for i in range(args.threads): pool.append(PageWorker(do_update=args.fails)) for t in pool: t.start() for t in pool: t.join()