From 5fd7512f903522a47c416ebcda3b6acc6b080e49 Mon Sep 17 00:00:00 2001 From: Evgeny Zinoviev Date: Sun, 16 Jun 2024 00:04:44 +0300 Subject: initial --- fb/database.py | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 fb/database.py (limited to 'fb/database.py') diff --git a/fb/database.py b/fb/database.py new file mode 100644 index 0000000..3c31878 --- /dev/null +++ b/fb/database.py @@ -0,0 +1,118 @@ +import sqlite3 +import logging +import os +import threading + +from .util import stringify +from typing import Optional + + +class Database: + SCHEMA = 2 + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + config_dir = os.path.join( + os.getenv('HOME'), + '.config', + 'forgottenbooks' + ) + if not os.path.exists(config_dir): + os.makedirs(config_dir) + + file = os.path.join(config_dir, 'fb.sqlite3') + self.sqlite = sqlite3.connect(file, check_same_thread=False) + self.lock = threading.Lock() + + sqlite_version = self._get_sqlite_version() + self.logger.debug(f'SQLite version: {sqlite_version}') + + schema_version = self.schema_get_version() + self.logger.debug(f'Schema version: {schema_version}') + + self.schema_init(schema_version) + self.schema_set_version(self.SCHEMA) + + def __del__(self): + if self.sqlite: + self.sqlite.commit() + self.sqlite.close() + + def _get_sqlite_version(self) -> str: + cursor = self.sqlite.cursor() + cursor.execute("SELECT sqlite_version()") + return cursor.fetchone()[0] + + def schema_get_version(self) -> int: + cursor = self.sqlite.execute('PRAGMA user_version') + return int(cursor.fetchone()[0]) + + def schema_set_version(self, v) -> None: + self.sqlite.execute('PRAGMA user_version={:d}'.format(v)) + self.logger.info(f'Schema set to {v}') + + def cursor(self) -> sqlite3.Cursor: + return self.sqlite.cursor() + + def commit(self) -> None: + return self.sqlite.commit() + + def schema_init(self, version: int) -> None: + cursor = self.cursor() + + if version < 1: + cursor.execute("""CREATE TABLE IF NOT EXISTS books ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + author TEXT NOT NULL, + meta TEXT NOT NULL, + bc TEXT NO NULL, + details TEXT NOT NULL + )""") + + if version < 2: + cursor.execute("""CREATE TABLE IF NOT EXISTS books_failed (id INTEGER PRIMARY KEY)""") + + def add_book(self, + book_id: int, + name: str, + author: str, + meta: list, + bc: list, + details: dict): + with self.lock: + cur = self.cursor() + cur.execute("INSERT INTO books (id, name, author, meta, bc, details) VALUES (?, ?, ?, ?, ?, ?)", + (str(book_id), name, author, stringify(meta), stringify(bc), stringify(details))) + cur.close() + self.commit() + + def add_failed_book(self, book_id: int): + with self.lock: + cur = self.cursor() + cur.execute("INSERT INTO books_failed (id) VALUES (?)", (str(book_id),)) + cur.close() + + def get_max_book_id(self) -> Optional[int]: + cur = self.cursor() + cur.execute("SELECT MAX(id) FROM books") + res = cur.fetchone()[0] + cur.close() + return int(res) if res is not None else None + + def get_book(self, book_id) -> Optional[dict]: + cur = self.cursor() + cur.execute("SELECT * FROM books WHERE id=?", (book_id,)) + all = cur.fetchall() + cur.close() + return all[0] if len(all) > 0 else None + + def get_ids(self, id_from: int, id_to: int) -> list[int]: + cur = self.cursor() + cur.execute("SELECT id FROM books WHERE id BETWEEN ? AND ? ORDER BY id", (id_from, id_to,)) + l = [] + for row in cur.fetchall(): + l.append(int(row[0])) + cur.close() + return l -- cgit v1.2.3