import mysql.connector from mysql.connector import pooling from dotenv import load_dotenv import os import random import time from datetime import datetime, timedelta import logging import pandas as pd from copy import copy # Configure logging logging.basicConfig( level=logging.INFO, filename="database.log", format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class DatabaseManager: def __init__(self, env="development"): # Load environment variables based on environment self.load_env(env) self.config = { "host": os.getenv("SQLHOST", "localhost"), "user": os.getenv("SQLUSER", "root"), "password": os.getenv("SQLPASS", ""), "database": os.getenv("SQLDB", "testdb"), "pool_reset_session": bool(os.getenv("POOL_RESET_SESSION", False)), } self.pool = pooling.MySQLConnectionPool( pool_name="mypool", pool_size=5, **self.config ) logger.info("Database connection pool created.") # Ensure required tables exist self.create_table_if_not_exists( "feedback", """ ID INT AUTO_INCREMENT PRIMARY KEY, USER VARCHAR(100), GUILDID BIGINT, TIMESTAMP VARCHAR(32), CONTENT TEXT """, ) self.create_table_if_not_exists( "afk_status", """ USERID BIGINT, GUILDID BIGINT, TIMESTAMP VARCHAR(32), REASON TEXT, PRIMARY KEY (USERID, GUILDID) """, ) self.create_table_if_not_exists( "lottery_tickets", """ ID INT AUTO_INCREMENT PRIMARY KEY, USERID BIGINT NOT NULL, TIMESTAMP DATETIME NOT NULL, TICKET_TYPE VARCHAR(32) NOT NULL, group_id VARCHAR(32) DEFAULT NULL """, ) self.create_table_if_not_exists( "lottery_results", """ ID INT AUTO_INCREMENT PRIMARY KEY, WINNER_ID BIGINT NOT NULL, AMOUNT INT NOT NULL, DRAW_TIME DATETIME NOT NULL, CLAIMED BOOLEAN DEFAULT 0, WIN_TYPE VARCHAR(16) DEFAULT 'user' """, ) self.create_table_if_not_exists( "lottery_state", """ id INT PRIMARY KEY, jackpot INT NOT NULL """, ) self.create_table_if_not_exists( "lottery_draw_time", """ id INT PRIMARY KEY, last_draw DATETIME NOT NULL """, ) self.create_table_if_not_exists( "lottery_luck", """ USERID BIGINT PRIMARY KEY, LUCK INT NOT NULL DEFAULT 0 """, ) self.create_table_if_not_exists( "lottery_groups", """ group_id VARCHAR(32) PRIMARY KEY, creator_id BIGINT NOT NULL """, ) self.create_table_if_not_exists( "lottery_group_members", """ group_id VARCHAR(32), user_id BIGINT, PRIMARY KEY (group_id, user_id) """, ) self.create_table_if_not_exists( "gamble_rooms", """ channel_id BIGINT PRIMARY KEY, host_id BIGINT NOT NULL, invited TEXT NOT NULL, inactivity INT NOT NULL """, ) def load_env(self, env): env_file = f".env" load_dotenv(env_file) logger.info(f"Loaded environment variables from {env_file}") def get_connection(self): return self.pool.get_connection() def execute_query(self, query, params=None, retries=3, delay=1): cursor = None connection = None for attempt in range(retries): try: connection = self.get_connection() cursor = connection.cursor(dictionary=True, buffered=True) cursor.execute(query, params or ()) logger.info(f"Executed query: {query} with params: {params}") connection.commit() return copy(cursor) except mysql.connector.Error as err: logger.warning(f"Attempt {attempt + 1} failed: {err}") time.sleep(delay * (2**attempt)) finally: if cursor: cursor.close() if connection: connection.close() logger.error(f"All {retries} attempts failed for query: {query}") return None def insert(self, query: str, params: tuple, overwrite: bool = True) -> None: """ Inserts data into the database using the given query and parameters. Args: query (str): The SQL query to execute. params (tuple): The parameters to pass into the query. overwrite (bool, optional): Whether to perform an upsert operation. Defaults to False. Raises: ValueError: If no parameters are provided. """ if not params: raise ValueError("Params must be provided for the insert operation.") try: if overwrite: columns = [ col.split("=")[0].strip() for col in query.split("VALUES")[0] .split("(")[1] .split(")")[0] .split(",") ] update_set = ", ".join(f"{col} = VALUES({col})" for col in columns) query = f"{query} ON DUPLICATE KEY UPDATE {update_set}" cursor = self.execute_query(query, params) if cursor: logger.info(f"Insert completed with query: {query}.") except mysql.connector.Error as err: logger.error(f"Insert failed with query: {query}. Error: {err}") def bulk_insert(self, query, params=None): if not params: logger.warning("No data provided for bulk insert.") return # Assuming params is a list of dictionaries if not isinstance(params, list) or not all(isinstance(d, dict) for d in params): raise ValueError("Params must be a list of dictionaries for bulk insert.") keys = params[0].keys() placeholders = ", ".join(["%s"] * len(keys)) query = f"{query} ({', '.join(keys)}) VALUES ({placeholders})" values = [tuple(data.values()) for data in params] connection = None cursor = None try: connection = self.get_connection() cursor = connection.cursor() cursor.executemany(query, values) connection.commit() logger.info( f"Bulk insert completed for {len(params)} records with query: {query}." ) except mysql.connector.Error as err: logger.error(f"Bulk insert failed: {err}") if connection: connection.rollback() # Roll back on error finally: if cursor: cursor.close() if connection: connection.close() def delete(self, table_name: str, condition: dict) -> None: """Deletes a record from the specified table based on the condition provided.""" condition_column, condition_value = next(iter(condition.items())) query = f"DELETE FROM {table_name} WHERE {condition_column} = %s" self.execute_query(query, (condition_value,)) def fetch_one(self, query, params=None): cursor = self.execute_query(query, params) return cursor.fetchone() if cursor else {} def fetch_all(self, query, params=None): cursor = self.execute_query(query, params) return cursor.fetchall() if cursor else [] def fetch_as_dataframe(self, query, params=None): cursor = self.execute_query(query, params) if cursor: try: # Ensure cursor has a result to fetch if cursor.with_rows: results = cursor.fetchall() return pd.DataFrame(results) if results else pd.DataFrame() else: logger.warning("No result set to fetch from.") return pd.DataFrame() finally: cursor.close() return pd.DataFrame() def create_table_if_not_exists(self, table_name, schema): query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})" self.execute_query(query) logger.info(f"Ensured table {table_name} exists with schema: {schema}") def cleanup_old_transactions(self, days=365): cutoff_date = datetime.now() - timedelta(days=days) self.execute_query("DELETE FROM transactions WHERE TIME < %s", (cutoff_date,)) logger.info(f"Old transactions older than {days} days have been cleaned up.") @staticmethod def random_timestamp(start_date, end_date): time_between_dates = end_date - start_date random_date = start_date + timedelta( days=random.randint(0, time_between_dates.days) ) return random_date def build_query(self, base_query, filters): conditions = " AND ".join([f"{k} = %s" for k in filters.keys()]) return f"{base_query} WHERE {conditions}", list(filters.values()) # SQL scripts to create tables create_feedback_table = """ CREATE TABLE IF NOT EXISTS feedback ( ID INT AUTO_INCREMENT PRIMARY KEY, USER VARCHAR(100), GUILDID BIGINT, TIMESTAMP VARCHAR(32), CONTENT TEXT ); """ create_afk_status_table = """ CREATE TABLE IF NOT EXISTS afk_status ( USERID BIGINT, GUILDID BIGINT, REASON TEXT, PRIMARY KEY (USERID, GUILDID) ); """ create_lottery_tickets_table = """ CREATE TABLE IF NOT EXISTS lottery_tickets ( ID INT AUTO_INCREMENT PRIMARY KEY, USERID BIGINT NOT NULL, TIMESTAMP DATETIME NOT NULL, TICKET_TYPE VARCHAR(32) NOT NULL, group_id VARCHAR(32) DEFAULT NULL ); """ create_lottery_results_table = """ CREATE TABLE IF NOT EXISTS lottery_results ( ID INT AUTO_INCREMENT PRIMARY KEY, WINNER_ID BIGINT NOT NULL, AMOUNT INT NOT NULL, DRAW_TIME DATETIME NOT NULL, CLAIMED BOOLEAN DEFAULT 0, WIN_TYPE VARCHAR(16) DEFAULT 'user' ); """ create_lottery_state_table = """ CREATE TABLE IF NOT EXISTS lottery_state ( id INT PRIMARY KEY, jackpot INT NOT NULL ); -- Insert initial row if not exists: INSERT IGNORE INTO lottery_state (id, jackpot) VALUES (1, 0); """ create_lottery_draw_time_table = """ CREATE TABLE IF NOT EXISTS lottery_draw_time ( id INT PRIMARY KEY, last_draw DATETIME NOT NULL ); -- Insert initial row if not exists: INSERT IGNORE INTO lottery_draw_time (id, last_draw) VALUES (1, NOW()); """ create_lottery_luck_table = """ CREATE TABLE IF NOT EXISTS lottery_luck ( USERID BIGINT PRIMARY KEY, LUCK INT NOT NULL DEFAULT 0 ); """ create_lottery_groups_table = """ CREATE TABLE IF NOT EXISTS lottery_groups ( group_id VARCHAR(32) PRIMARY KEY, creator_id BIGINT NOT NULL ); """ create_lottery_group_members_table = """ CREATE TABLE IF NOT EXISTS lottery_group_members ( group_id VARCHAR(32), user_id BIGINT, PRIMARY KEY (group_id, user_id) ); """ create_gamble_rooms_table = """ CREATE TABLE IF NOT EXISTS gamble_rooms ( channel_id BIGINT PRIMARY KEY, host_id BIGINT NOT NULL, invited TEXT NOT NULL, inactivity INT NOT NULL ); """