import mysql.connector from mysql.connector import pooling from dotenv import load_dotenv import os import random import re import time from datetime import datetime, timedelta import logging import pandas as pd # Configure logging logging.basicConfig( level=logging.INFO, filename="database.log", format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class DatabaseManager: _instances = {} def __new__(cls, env="development"): instance_key = env or "default" if instance_key in cls._instances: return cls._instances[instance_key] instance = super().__new__(cls) cls._instances[instance_key] = instance return instance def __init__(self, env="development"): if getattr(self, "_initialized", False): return self._initialized = True env_file = f".env.{env}" if env else ".env" if not os.path.exists(env_file): env_file = ".env" self.load_env(env_file) self.config = { "host": os.getenv("SQLHOST", "localhost"), "user": os.getenv("SQLUSER", "root"), "password": os.getenv("SQLPASS", ""), "database": os.getenv("SQLDB", "testdb"), "pool_reset_session": os.getenv("POOL_RESET_SESSION", "false").lower() in ("true", "1", "yes"), } self.pool = pooling.MySQLConnectionPool( pool_name="mypool", pool_size=5, **self.config ) logger.info("Database connection pool created.") # Ensure required tables exist self.create_table_if_not_exists( "feedback", """ ID INT AUTO_INCREMENT PRIMARY KEY, USER VARCHAR(100), GUILDID BIGINT, TIMESTAMP VARCHAR(32), CONTENT TEXT """, ) self.create_table_if_not_exists( "afk_status", """ USERID BIGINT, GUILDID BIGINT, TIMESTAMP VARCHAR(32), REASON TEXT, PRIMARY KEY (USERID, GUILDID) """, ) self.create_table_if_not_exists( "lottery_tickets", """ ID INT AUTO_INCREMENT PRIMARY KEY, USERID BIGINT NOT NULL, TIMESTAMP DATETIME NOT NULL, TICKET_TYPE VARCHAR(32) NOT NULL, group_id VARCHAR(32) DEFAULT NULL """, ) self.create_table_if_not_exists( "lottery_results", """ ID INT AUTO_INCREMENT PRIMARY KEY, WINNER_ID BIGINT NOT NULL, AMOUNT INT NOT NULL, DRAW_TIME DATETIME NOT NULL, CLAIMED BOOLEAN DEFAULT 0, WIN_TYPE VARCHAR(16) DEFAULT 'user' """, ) self.create_table_if_not_exists( "lottery_state", """ id INT PRIMARY KEY, jackpot INT NOT NULL """, ) self.create_table_if_not_exists( "lottery_draw_time", """ id INT PRIMARY KEY, last_draw DATETIME NOT NULL """, ) self.create_table_if_not_exists( "lottery_luck", """ USERID BIGINT PRIMARY KEY, LUCK INT NOT NULL DEFAULT 0 """, ) self.create_table_if_not_exists( "lottery_groups", """ group_id VARCHAR(32) PRIMARY KEY, creator_id BIGINT NOT NULL """, ) self.create_table_if_not_exists( "lottery_group_members", """ group_id VARCHAR(32), user_id BIGINT, PRIMARY KEY (group_id, user_id) """, ) self.create_table_if_not_exists( "gamble_rooms", """ channel_id BIGINT PRIMARY KEY, host_id BIGINT NOT NULL, invited TEXT NOT NULL, inactivity INT NOT NULL """, ) self.create_table_if_not_exists( "economy", """ ID BIGINT PRIMARY KEY, WALLET BIGINT NOT NULL DEFAULT 0, BANK BIGINT NOT NULL DEFAULT 0, DAILY DOUBLE DEFAULT 0 """, ) self.create_table_if_not_exists( "transactions", """ ID INT AUTO_INCREMENT PRIMARY KEY, USERID BIGINT NOT NULL, TYPE VARCHAR(50), AMOUNT DECIMAL(18,2), TIME DATETIME DEFAULT CURRENT_TIMESTAMP """, ) self.create_table_if_not_exists( "custom_commands", """ ID INT AUTO_INCREMENT PRIMARY KEY, GUILDID VARCHAR(32) NOT NULL, COMMANDNAME VARCHAR(100) NOT NULL, RESPONSE TEXT NOT NULL, MATCHTYPE VARCHAR(20) NOT NULL DEFAULT 'exact' """, ) self.create_table_if_not_exists( "guilds", """ GUILD BIGINT PRIMARY KEY, WELCOME BIGINT DEFAULT NULL, RULES BIGINT DEFAULT NULL, GUIDE BIGINT DEFAULT NULL, INTRODUCTIONS BIGINT DEFAULT NULL, EVENTS BIGINT DEFAULT NULL, MEMBERCOUNT BIGINT DEFAULT NULL, LOGGING BIGINT DEFAULT NULL, TICKETING BIGINT DEFAULT NULL """, ) self.create_table_if_not_exists( "rewards", """ ID INT AUTO_INCREMENT PRIMARY KEY, type VARCHAR(50) NOT NULL, amount INT NOT NULL DEFAULT 0, description TEXT DEFAULT NULL """, ) self.create_table_if_not_exists( "logs", """ ID INT AUTO_INCREMENT PRIMARY KEY, guild_id BIGINT NOT NULL, user_id BIGINT NOT NULL, type VARCHAR(50) NOT NULL, message TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP """, ) self.create_table_if_not_exists( "gamble_limits", """ USERID BIGINT PRIMARY KEY, DAILY_LIMIT BIGINT DEFAULT NULL, EXCLUDED_UNTIL DATETIME DEFAULT NULL """, ) self.create_table_if_not_exists( "users", """ ID BIGINT PRIMARY KEY, XP INT DEFAULT 0, LEVEL INT DEFAULT 0, birthday VARCHAR(10) DEFAULT NULL """, ) def load_env(self, env_file): load_dotenv(env_file) logger.info(f"Loaded environment variables from {env_file}") def get_connection(self): return self.pool.get_connection() def _sanitize_identifier(self, identifier: str) -> str: if not re.match(r"^[A-Za-z0-9_]+$", identifier): raise ValueError(f"Invalid SQL identifier: {identifier}") return identifier def _parse_insert_columns(self, query: str) -> list[str]: match = re.search( r"INSERT\s+INTO\s+\S+\s*\(([^)]+)\)\s*VALUES", query, re.IGNORECASE, ) if not match: raise ValueError( "Insert query must contain a column list for overwrite upsert support." ) return [col.strip() for col in match.group(1).split(",") if col.strip()] def execute_query(self, query, params=None, retries=3, delay=1): connection = None cursor = None for attempt in range(retries): try: connection = self.get_connection() cursor = connection.cursor(dictionary=True, buffered=True) cursor.execute(query, params or ()) connection.commit() logger.info(f"Executed query: {query} with params: {params}") return cursor.rowcount except mysql.connector.Error as err: logger.warning(f"Attempt {attempt + 1} failed: {err}") time.sleep(delay * (2**attempt)) finally: if cursor: cursor.close() if connection: connection.close() logger.error(f"All {retries} attempts failed for query: {query}") return None def insert(self, query: str, params: tuple, overwrite: bool = True) -> None: """ Inserts data into the database using the given query and parameters. Args: query (str): The SQL query to execute. params (tuple): The parameters to pass into the query. overwrite (bool, optional): Whether to perform an upsert operation. Defaults to True. Raises: ValueError: If no parameters are provided. """ if not params: raise ValueError("Params must be provided for the insert operation.") if overwrite: columns = self._parse_insert_columns(query) update_set = ", ".join(f"{col} = VALUES({col})" for col in columns) query = f"{query} ON DUPLICATE KEY UPDATE {update_set}" rowcount = self.execute_query(query, params) if rowcount is None: logger.error(f"Insert failed with query: {query}.") else: logger.info(f"Insert completed with query: {query}.") def bulk_insert(self, query, params=None): if not params: logger.warning("No data provided for bulk insert.") return if not isinstance(params, list) or not all(isinstance(d, dict) for d in params): raise ValueError("Params must be a list of dictionaries for bulk insert.") keys = list(params[0].keys()) placeholders = ", ".join(["%s"] * len(keys)) query = f"{query} ({', '.join(keys)}) VALUES ({placeholders})" values = [tuple(data[key] for key in keys) for data in params] connection = None cursor = None try: connection = self.get_connection() cursor = connection.cursor() cursor.executemany(query, values) connection.commit() logger.info( f"Bulk insert completed for {len(params)} records with query: {query}." ) except mysql.connector.Error as err: logger.error(f"Bulk insert failed: {err}") if connection: connection.rollback() finally: if cursor: cursor.close() if connection: connection.close() def delete(self, table_name: str, condition: dict) -> None: """Deletes a record from the specified table based on the condition provided.""" table_name = self._sanitize_identifier(table_name) condition_column, condition_value = next(iter(condition.items())) condition_column = self._sanitize_identifier(condition_column) query = f"DELETE FROM {table_name} WHERE {condition_column} = %s" self.execute_query(query, (condition_value,)) def fetch_one(self, query, params=None): connection = None cursor = None try: connection = self.get_connection() cursor = connection.cursor(dictionary=True, buffered=True) cursor.execute(query, params or ()) return cursor.fetchone() finally: if cursor: cursor.close() if connection: connection.close() def fetch_all(self, query, params=None): connection = None cursor = None try: connection = self.get_connection() cursor = connection.cursor(dictionary=True, buffered=True) cursor.execute(query, params or ()) return cursor.fetchall() finally: if cursor: cursor.close() if connection: connection.close() def fetch_as_dataframe(self, query, params=None): connection = None cursor = None try: connection = self.get_connection() cursor = connection.cursor(dictionary=True, buffered=True) cursor.execute(query, params or ()) if cursor.with_rows: results = cursor.fetchall() return pd.DataFrame(results) if results else pd.DataFrame() logger.warning("No result set to fetch from.") return pd.DataFrame() finally: if cursor: cursor.close() if connection: connection.close() def create_table_if_not_exists(self, table_name, schema): table_name = self._sanitize_identifier(table_name) query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})" self.execute_query(query) logger.info(f"Ensured table {table_name} exists with schema: {schema}") def cleanup_old_transactions(self, days=365): cutoff_date = datetime.now() - timedelta(days=days) self.execute_query("DELETE FROM transactions WHERE TIME < %s", (cutoff_date,)) logger.info(f"Old transactions older than {days} days have been cleaned up.") @staticmethod def random_timestamp(start_date, end_date): time_between_dates = end_date - start_date random_date = start_date + timedelta( days=random.randint(0, time_between_dates.days) ) return random_date def build_query(self, base_query, filters): conditions = " AND ".join([f"{k} = %s" for k in filters.keys()]) return f"{base_query} WHERE {conditions}", list(filters.values()) def initialize_database(env="development"): """Initialize the database schema and return a shared DatabaseManager.""" return DatabaseManager(env) # SQL scripts to create tables create_feedback_table = """ CREATE TABLE IF NOT EXISTS feedback ( ID INT AUTO_INCREMENT PRIMARY KEY, USER VARCHAR(100), GUILDID BIGINT, TIMESTAMP VARCHAR(32), CONTENT TEXT ); """ create_afk_status_table = """ CREATE TABLE IF NOT EXISTS afk_status ( USERID BIGINT, GUILDID BIGINT, REASON TEXT, PRIMARY KEY (USERID, GUILDID) ); """ create_lottery_tickets_table = """ CREATE TABLE IF NOT EXISTS lottery_tickets ( ID INT AUTO_INCREMENT PRIMARY KEY, USERID BIGINT NOT NULL, TIMESTAMP DATETIME NOT NULL, TICKET_TYPE VARCHAR(32) NOT NULL, group_id VARCHAR(32) DEFAULT NULL ); """ create_lottery_results_table = """ CREATE TABLE IF NOT EXISTS lottery_results ( ID INT AUTO_INCREMENT PRIMARY KEY, WINNER_ID BIGINT NOT NULL, AMOUNT INT NOT NULL, DRAW_TIME DATETIME NOT NULL, CLAIMED BOOLEAN DEFAULT 0, WIN_TYPE VARCHAR(16) DEFAULT 'user' ); """ create_lottery_state_table = """ CREATE TABLE IF NOT EXISTS lottery_state ( id INT PRIMARY KEY, jackpot INT NOT NULL ); -- Insert initial row if not exists: INSERT IGNORE INTO lottery_state (id, jackpot) VALUES (1, 0); """ create_lottery_draw_time_table = """ CREATE TABLE IF NOT EXISTS lottery_draw_time ( id INT PRIMARY KEY, last_draw DATETIME NOT NULL ); -- Insert initial row if not exists: INSERT IGNORE INTO lottery_draw_time (id, last_draw) VALUES (1, NOW()); """ create_lottery_luck_table = """ CREATE TABLE IF NOT EXISTS lottery_luck ( USERID BIGINT PRIMARY KEY, LUCK INT NOT NULL DEFAULT 0 ); """ create_lottery_groups_table = """ CREATE TABLE IF NOT EXISTS lottery_groups ( group_id VARCHAR(32) PRIMARY KEY, creator_id BIGINT NOT NULL ); """ create_lottery_group_members_table = """ CREATE TABLE IF NOT EXISTS lottery_group_members ( group_id VARCHAR(32), user_id BIGINT, PRIMARY KEY (group_id, user_id) ); """ create_gamble_rooms_table = """ CREATE TABLE IF NOT EXISTS gamble_rooms ( channel_id BIGINT PRIMARY KEY, host_id BIGINT NOT NULL, invited TEXT NOT NULL, inactivity INT NOT NULL ); """