1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#!/usr/bin/env python3
import retronews
import threading
import queue
import sqlite3
from database import Database
from argparse import ArgumentParser
db = Database()
print_lock = threading.Lock()
ok_lock = threading.Lock()
fail_lock = threading.Lock()
tasks_queue = queue.Queue()
done_ok = 0
done_fail = 0
def incr_ok():
global done_ok
with ok_lock:
done_ok += 1
print_state()
def incr_fail():
global done_fail
with fail_lock:
done_fail += 1
print_state()
def print_state():
with print_lock:
print(f'ok={done_ok} fail={done_fail}')
class PageWorker(threading.Thread):
_do_update: bool
def __init__(self, do_update: bool):
super().__init__()
self._do_update = do_update
def run(self):
while not tasks_queue.empty():
try:
collection_id, doc_id, page = tasks_queue.get_nowait()
try:
info = retronews.page_info(collection_id, doc_id, page)
try:
f = getattr(db, 'add_page' if not self._do_update else 'update_page')
f(collection_id, doc_id, page, info['width'], info['height'], info['dpi'])
except sqlite3.IntegrityError:
with print_lock:
print(f'error: unique failed for ({collection_id}, {doc_id}, {page})')
incr_ok()
except:
# traceback.print_exc()
if self._do_update:
with print_lock:
print(f'error: skipping updating the page ({collection_id}, {doc_id}, {page}) cause failed again')
else:
db.add_page_failed(collection_id, doc_id, page)
incr_fail()
except queue.Empty:
break
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--threads', type=int, required=True)
parser.add_argument('--fails', action='store_true')
args = parser.parse_args()
if args.fails:
pages = db.get_existing_pages(fail=1)
for cid, did, page in pages:
tasks_queue.put((cid, did, page))
pages = None
else:
ex_pages = db.get_existing_pages()
ex_map = {}
for cid, did, page in ex_pages:
ex_map[f'{cid}_{did}_{page}'] = 1
docs = db.get_documents()
for doc in docs:
for page in range(doc['pages']):
page += 1
if f"{doc['collection_id']}_{doc['doc_id']}_{page}" not in ex_map:
tasks_queue.put((doc['collection_id'], doc['doc_id'], page))
ex_pages = None
ex_map = None
docs = None
pool = []
for i in range(args.threads):
pool.append(PageWorker(do_update=args.fails))
for t in pool:
t.start()
for t in pool:
t.join()
|