import time import logging from abc import ABC, abstractmethod from mysql.connector import connect, MySQLConnection, Error from typing import Optional from ..config import ConfigUnit logger = logging.getLogger(__name__) datetime_fmt = '%Y-%m-%d %H:%M:%S' class MySQLCredsConfig(ConfigUnit, ABC): @classmethod def schema(cls) -> Optional[dict]: schema = {} for k in ('host', 'database', 'user', 'password'): schema[k] = dict(type='string', required=True) return schema class MySQLHomeCredsConfig(MySQLCredsConfig): NAME = 'mysql_home_creds' class MySQLCloudCredsConfig(MySQLCredsConfig): NAME = 'mysql_cloud_creds' def mysql_now() -> str: return time.strftime('%Y-%m-%d %H:%M:%S') class MySQLDatabase(ABC): _enable_pings: bool _link: MySQLConnection _time_zone: Optional[str] @abstractmethod def creds(self) -> MySQLCredsConfig: pass def __init__(self, enable_pings=False, time_zone='+01:00'): self._enable_pings = enable_pings self._time_zone = time_zone self._connect() def _connect(self): c = self.creds() self._link = connect( host=c['host'], user=c['user'], password=c['password'], database=c['database'], ) if self._time_zone: self._link.time_zone = self._time_zone def cursor(self, **kwargs): if self._enable_pings: try: self._link.ping(reconnect=True, attempts=2) except Error as e: logger.exception(e) self._connect() return self._link.cursor(**kwargs) def commit(self): self._link.commit() class MySQLHomeDatabase(MySQLDatabase): def creds(self) -> MySQLCredsConfig: return MySQLHomeCredsConfig()