summaryrefslogtreecommitdiff
path: root/fb/database.py
blob: 3c31878d024e648dabb8b821d95c8a73488baeda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import sqlite3
import logging
import os
import threading

from .util import stringify
from typing import Optional


class Database:
    SCHEMA = 2

    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)

        config_dir = os.path.join(
            os.getenv('HOME'),
            '.config',
            'forgottenbooks'
        )
        if not os.path.exists(config_dir):
            os.makedirs(config_dir)

        file = os.path.join(config_dir, 'fb.sqlite3')
        self.sqlite = sqlite3.connect(file, check_same_thread=False)
        self.lock = threading.Lock()

        sqlite_version = self._get_sqlite_version()
        self.logger.debug(f'SQLite version: {sqlite_version}')

        schema_version = self.schema_get_version()
        self.logger.debug(f'Schema version: {schema_version}')

        self.schema_init(schema_version)
        self.schema_set_version(self.SCHEMA)

    def __del__(self):
        if self.sqlite:
            self.sqlite.commit()
            self.sqlite.close()

    def _get_sqlite_version(self) -> str:
        cursor = self.sqlite.cursor()
        cursor.execute("SELECT sqlite_version()")
        return cursor.fetchone()[0]

    def schema_get_version(self) -> int:
        cursor = self.sqlite.execute('PRAGMA user_version')
        return int(cursor.fetchone()[0])

    def schema_set_version(self, v) -> None:
        self.sqlite.execute('PRAGMA user_version={:d}'.format(v))
        self.logger.info(f'Schema set to {v}')

    def cursor(self) -> sqlite3.Cursor:
        return self.sqlite.cursor()

    def commit(self) -> None:
        return self.sqlite.commit()

    def schema_init(self, version: int) -> None:
        cursor = self.cursor()

        if version < 1:
            cursor.execute("""CREATE TABLE IF NOT EXISTS books (
                    id INTEGER PRIMARY KEY,
                    name TEXT NOT NULL,
                    author TEXT NOT NULL,
                    meta TEXT NOT NULL, 
                    bc TEXT NO NULL,
                    details TEXT NOT NULL
                )""")

        if version < 2:
            cursor.execute("""CREATE TABLE IF NOT EXISTS books_failed (id INTEGER PRIMARY KEY)""")

    def add_book(self,
                 book_id: int,
                 name: str,
                 author: str,
                 meta: list,
                 bc: list,
                 details: dict):
        with self.lock:
            cur = self.cursor()
            cur.execute("INSERT INTO books (id, name, author, meta, bc, details) VALUES (?, ?, ?, ?, ?, ?)",
                              (str(book_id), name, author, stringify(meta), stringify(bc), stringify(details)))
            cur.close()
            self.commit()

    def add_failed_book(self, book_id: int):
        with self.lock:
            cur = self.cursor()
            cur.execute("INSERT INTO books_failed (id) VALUES (?)", (str(book_id),))
            cur.close()

    def get_max_book_id(self) -> Optional[int]:
        cur = self.cursor()
        cur.execute("SELECT MAX(id) FROM books")
        res = cur.fetchone()[0]
        cur.close()
        return int(res) if res is not None else None

    def get_book(self, book_id) -> Optional[dict]:
        cur = self.cursor()
        cur.execute("SELECT * FROM books WHERE id=?", (book_id,))
        all = cur.fetchall()
        cur.close()
        return all[0] if len(all) > 0 else None

    def get_ids(self, id_from: int, id_to: int) -> list[int]:
        cur = self.cursor()
        cur.execute("SELECT id FROM books WHERE id BETWEEN ? AND ? ORDER BY id", (id_from, id_to,))
        l = []
        for row in cur.fetchall():
            l.append(int(row[0]))
        cur.close()
        return l