Files
DiscordBot/cogs/customCommands.py
T
Nobody2503 be89cc3acd Refactor database management and schema initialization
- Removed the old npc_memory.db file.
- Updated time.txt with a new timestamp.
- Refactored transaction recording in bank_functions.py to use parameterized queries.
- Enhanced DatabaseManager in sql_commands.py to support singleton pattern and improved table creation logic.
- Added methods for sanitizing SQL identifiers and parsing insert columns for upsert operations.
- Improved error handling and connection management in execute_query, fetch_one, fetch_all, and fetch_as_dataframe methods.
- Introduced a new bootstrap_database.py script for initializing the database schema.
- Updated app.py to use the new initialize_database function for database management.
2026-05-31 11:16:44 +00:00

225 lines
9.3 KiB
Python
Executable File

import discord, re
from discord.ext import commands
from utils.sql_commands import DatabaseManager
class CustomCommandsCog(commands.Cog):
def __init__(self, client):
self.client = client
self.db = DatabaseManager("")
self.command_cache = {} # {guild_id: [commands_dicts]}
def get_guild_commands(self, guild_id):
if guild_id not in self.command_cache:
self.command_cache[guild_id] = self.db.fetch_all(
"SELECT * FROM custom_commands WHERE GUILDID = %s", (guild_id,)
)
return self.command_cache[guild_id]
def invalidate_cache(self, guild_id):
if guild_id in self.command_cache:
del self.command_cache[guild_id]
@commands.hybrid_command(name="addcommand")
@commands.has_permissions(manage_guild=True)
@commands.cooldown(1, 10, commands.BucketType.user)
async def add_command(self, ctx, command_name: str, *, response: str):
"""Add a new custom command"""
guild_id = str(ctx.guild.id)
command_name = command_name.lower()
existing_command = self.db.fetch_one(
"SELECT 1 FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s",
(guild_id, command_name),
)
if existing_command:
await ctx.send(f"A command with the name `{command_name}` already exists.")
return
try:
# Insert the new command into the database
self.db.execute_query(
"INSERT INTO custom_commands (GUILDID, COMMANDNAME, RESPONSE) VALUES (%s, %s, %s)",
(guild_id, command_name, response),
)
self.invalidate_cache(guild_id) # Invalidate cache after change
await ctx.send(f"Custom command `{command_name}` has been added!")
except Exception as e:
await ctx.send(f"Failed to add command `{command_name}`. Error: {e}")
@commands.hybrid_command(name="delcommand")
@commands.has_permissions(manage_guild=True)
@commands.cooldown(1, 10, commands.BucketType.user)
async def delete_command(self, ctx, command_name: str):
"""Delete a custom command"""
guild_id = str(ctx.guild.id)
command_name = command_name.lower()
try:
# Delete the command from the database
deleted_rows = self.db.execute_query(
"DELETE FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s",
(guild_id, command_name),
)
self.invalidate_cache(guild_id) # Invalidate cache after change
if deleted_rows and deleted_rows > 0:
await ctx.send(f"Custom command `{command_name}` has been deleted!")
else:
await ctx.send(f"Custom command `{command_name}` not found.")
except Exception as e:
await ctx.send(f"Failed to delete command `{command_name}`. Error: {e}")
@commands.hybrid_command(name="listcommands")
async def list_commands(self, ctx):
"""List all custom commands for this guild"""
guild_id = str(ctx.guild.id)
try:
# Retrieve all commands for the guild
commands = self.db.fetch_all(
"SELECT COMMANDNAME, RESPONSE FROM custom_commands WHERE GUILDID = %s",
(guild_id,),
)
if commands:
commands_list = "\n".join(
[f"`{cmd['COMMANDNAME']}`: {cmd['RESPONSE']}" for cmd in commands]
)
await ctx.send(f"**Custom Commands:**\n{commands_list}")
else:
await ctx.send("No custom commands found for this server.")
except Exception as e:
await ctx.send(f"Failed to retrieve commands. Error: {e}")
@commands.Cog.listener()
async def on_message(self, message):
"""Listen for custom command invocations"""
if message.author.bot or not message.guild:
return
guild_id = str(message.guild.id)
command_name = message.content.strip().lower()
# Use cache instead of DB call
data: list = self.get_guild_commands(guild_id)
if not data:
return
result = None
for cmd in data:
if cmd["MATCHTYPE"] == "exact" and command_name == cmd["COMMANDNAME"]:
result = cmd
break
elif cmd["MATCHTYPE"] == "contains" and cmd["COMMANDNAME"] in command_name:
result = cmd
break
elif cmd["MATCHTYPE"] == "startswith" and command_name.startswith(
cmd["COMMANDNAME"]
):
result = cmd
break
elif cmd["MATCHTYPE"] == "endswith" and command_name.endswith(
cmd["COMMANDNAME"]
):
result = cmd
break
elif cmd["MATCHTYPE"] == "regex" and re.match(
re.compile(cmd["REGEX"]), command_name
):
result = cmd
break
else:
return
if result:
response = result["RESPONSE"]
# Build a dictionary of variables to replace
variables = {
"{USER}": message.author.name,
"{USER_MENTION}": message.author.mention,
"{USER_ID}": str(message.author.id),
"{USER_TAG}": str(message.author),
"{USER_AVATAR}": str(message.author.display_avatar.url),
"{USER_TOP_ROLE}": getattr(message.author.top_role, "name", ""),
"{USER_CREATED}": message.author.created_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"{USER_JOINED}": (
message.author.joined_at.strftime("%Y-%m-%d %H:%M:%S")
if message.author.joined_at
else ""
),
"{USER_NICK}": message.author.nick or message.author.name,
"{USER_COLOR}": str(
getattr(message.author.color, "to_rgb", lambda: "")()
),
"{USER_STATUS}": str(message.author.status),
"{USER_IS_BOT}": str(message.author.bot),
"{USER_DISCRIMINATOR}": message.author.discriminator,
"{CHANNEL}": message.channel.name,
"{CHANNEL_MENTION}": message.channel.mention,
"{CHANNEL_ID}": str(message.channel.id),
"{CHANNEL_TOPIC}": getattr(message.channel, "topic", ""),
"{CHANNEL_TYPE}": str(message.channel.type),
"{CHANNEL_CREATED}": message.channel.created_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"{GUILD}": message.guild.name,
"{GUILD_ID}": str(message.guild.id),
"{GUILD_OWNER}": str(message.guild.owner),
"{GUILD_OWNER_MENTION}": (
message.guild.owner.mention if message.guild.owner else ""
),
"{GUILD_MEMBERCOUNT}": str(message.guild.member_count),
"{GUILD_ICON}": (
str(message.guild.icon.url) if message.guild.icon else ""
),
"{GUILD_CREATED}": message.guild.created_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"{GUILD_BOOSTS}": str(
getattr(message.guild, "premium_subscription_count", "")
),
"{GUILD_BOOST_LEVEL}": str(getattr(message.guild, "premium_tier", "")),
"{MESSAGE}": message.content,
"{MESSAGE_ID}": str(message.id),
"{MESSAGE_LINK}": message.jump_url,
"{MESSAGE_TIMESTAMP}": message.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"{TIME}": discord.utils.utcnow().strftime("%H:%M:%S UTC"),
"{DATE}": discord.utils.utcnow().strftime("%Y-%m-%d"),
"{DAY}": discord.utils.utcnow().strftime("%A"),
"{MONTH}": discord.utils.utcnow().strftime("%B"),
"{YEAR}": discord.utils.utcnow().strftime("%Y"),
"{BOT}": (
message.guild.me.name if message.guild else self.client.user.name
),
"{BOT_MENTION}": (
message.guild.me.mention
if message.guild
else self.client.user.mention
),
"{BOT_ID}": str(
message.guild.me.id if message.guild else self.client.user.id
),
"{BOT_AVATAR}": str(
message.guild.me.display_avatar.url
if message.guild
else self.client.user.display_avatar.url
),
}
# Replace all variables in the response
for var, value in variables.items():
if value is None:
continue
response = response.replace(var, value)
await message.channel.send(response)
async def setup(client):
await client.add_cog(CustomCommandsCog(client))