import discord, re from discord.ext import commands from utils.sql_commands import DatabaseManager class CustomCommandsCog(commands.Cog): def __init__(self, client): self.client = client self.db = DatabaseManager("") self.command_cache = {} # {guild_id: [commands_dicts]} def get_guild_commands(self, guild_id): if guild_id not in self.command_cache: self.command_cache[guild_id] = self.db.fetch_all( "SELECT * FROM custom_commands WHERE GUILDID = %s", (guild_id,) ) return self.command_cache[guild_id] def invalidate_cache(self, guild_id): if guild_id in self.command_cache: del self.command_cache[guild_id] @commands.hybrid_command(name="addcommand") @commands.has_permissions(manage_guild=True) @commands.cooldown(1, 10, commands.BucketType.user) async def add_command(self, ctx, command_name: str, *, response: str): """Add a new custom command""" guild_id = str(ctx.guild.id) command_name = command_name.lower() existing_command = self.db.fetch_one( "SELECT 1 FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s", (guild_id, command_name), ) if existing_command: await ctx.send(f"A command with the name `{command_name}` already exists.") return try: # Insert the new command into the database self.db.execute_query( "INSERT INTO custom_commands (GUILDID, COMMANDNAME, RESPONSE) VALUES (%s, %s, %s)", (guild_id, command_name, response), ) self.invalidate_cache(guild_id) # Invalidate cache after change await ctx.send(f"Custom command `{command_name}` has been added!") except Exception as e: await ctx.send(f"Failed to add command `{command_name}`. Error: {e}") @commands.hybrid_command(name="delcommand") @commands.has_permissions(manage_guild=True) @commands.cooldown(1, 10, commands.BucketType.user) async def delete_command(self, ctx, command_name: str): """Delete a custom command""" guild_id = str(ctx.guild.id) command_name = command_name.lower() try: # Delete the command from the database deleted_rows = self.db.execute_query( "DELETE FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s", (guild_id, command_name), ) self.invalidate_cache(guild_id) # Invalidate cache after change if deleted_rows is not None and len(deleted_rows) > 0: await ctx.send(f"Custom command `{command_name}` has been deleted!") else: await ctx.send(f"Custom command `{command_name}` not found.") except Exception as e: await ctx.send(f"Failed to delete command `{command_name}`. Error: {e}") @commands.hybrid_command(name="listcommands") async def list_commands(self, ctx): """List all custom commands for this guild""" guild_id = str(ctx.guild.id) try: # Retrieve all commands for the guild commands = self.db.fetch_all( "SELECT COMMANDNAME, RESPONSE FROM custom_commands WHERE GUILDID = %s", (guild_id,), ) if commands: commands_list = "\n".join( [f"`{cmd['COMMANDNAME']}`: {cmd['RESPONSE']}" for cmd in commands] ) await ctx.send(f"**Custom Commands:**\n{commands_list}") else: await ctx.send("No custom commands found for this server.") except Exception as e: await ctx.send(f"Failed to retrieve commands. Error: {e}") @commands.Cog.listener() async def on_message(self, message): """Listen for custom command invocations""" if message.author.bot or not message.guild: return guild_id = str(message.guild.id) command_name = message.content.strip().lower() # Use cache instead of DB call data: list = self.get_guild_commands(guild_id) if not data: return result = None for cmd in data: if cmd["MATCHTYPE"] == "exact" and command_name == cmd["COMMANDNAME"]: result = cmd break elif cmd["MATCHTYPE"] == "contains" and cmd["COMMANDNAME"] in command_name: result = cmd break elif cmd["MATCHTYPE"] == "startswith" and command_name.startswith( cmd["COMMANDNAME"] ): result = cmd break elif cmd["MATCHTYPE"] == "endswith" and command_name.endswith( cmd["COMMANDNAME"] ): result = cmd break elif cmd["MATCHTYPE"] == "regex" and re.match( re.compile(cmd["REGEX"]), command_name ): result = cmd break else: return if result: response = result["RESPONSE"] # Build a dictionary of variables to replace variables = { "{USER}": message.author.name, "{USER_MENTION}": message.author.mention, "{USER_ID}": str(message.author.id), "{USER_TAG}": str(message.author), "{USER_AVATAR}": str(message.author.display_avatar.url), "{USER_TOP_ROLE}": getattr(message.author.top_role, "name", ""), "{USER_CREATED}": message.author.created_at.strftime( "%Y-%m-%d %H:%M:%S" ), "{USER_JOINED}": ( message.author.joined_at.strftime("%Y-%m-%d %H:%M:%S") if message.author.joined_at else "" ), "{USER_NICK}": message.author.nick or message.author.name, "{USER_COLOR}": str( getattr(message.author.color, "to_rgb", lambda: "")() ), "{USER_STATUS}": str(message.author.status), "{USER_IS_BOT}": str(message.author.bot), "{USER_DISCRIMINATOR}": message.author.discriminator, "{CHANNEL}": message.channel.name, "{CHANNEL_MENTION}": message.channel.mention, "{CHANNEL_ID}": str(message.channel.id), "{CHANNEL_TOPIC}": getattr(message.channel, "topic", ""), "{CHANNEL_TYPE}": str(message.channel.type), "{CHANNEL_CREATED}": message.channel.created_at.strftime( "%Y-%m-%d %H:%M:%S" ), "{GUILD}": message.guild.name, "{GUILD_ID}": str(message.guild.id), "{GUILD_OWNER}": str(message.guild.owner), "{GUILD_OWNER_MENTION}": ( message.guild.owner.mention if message.guild.owner else "" ), "{GUILD_MEMBERCOUNT}": str(message.guild.member_count), "{GUILD_ICON}": ( str(message.guild.icon.url) if message.guild.icon else "" ), "{GUILD_CREATED}": message.guild.created_at.strftime( "%Y-%m-%d %H:%M:%S" ), "{GUILD_BOOSTS}": str( getattr(message.guild, "premium_subscription_count", "") ), "{GUILD_BOOST_LEVEL}": str(getattr(message.guild, "premium_tier", "")), "{MESSAGE}": message.content, "{MESSAGE_ID}": str(message.id), "{MESSAGE_LINK}": message.jump_url, "{MESSAGE_TIMESTAMP}": message.created_at.strftime("%Y-%m-%d %H:%M:%S"), "{TIME}": discord.utils.utcnow().strftime("%H:%M:%S UTC"), "{DATE}": discord.utils.utcnow().strftime("%Y-%m-%d"), "{DAY}": discord.utils.utcnow().strftime("%A"), "{MONTH}": discord.utils.utcnow().strftime("%B"), "{YEAR}": discord.utils.utcnow().strftime("%Y"), "{BOT}": ( message.guild.me.name if message.guild else self.client.user.name ), "{BOT_MENTION}": ( message.guild.me.mention if message.guild else self.client.user.mention ), "{BOT_ID}": str( message.guild.me.id if message.guild else self.client.user.id ), "{BOT_AVATAR}": str( message.guild.me.display_avatar.url if message.guild else self.client.user.display_avatar.url ), } # Replace all variables in the response for var, value in variables.items(): if value is None: continue response = response.replace(var, value) await message.channel.send(response) async def setup(client): await client.add_cog(CustomCommandsCog(client))