225 lines
9.3 KiB
Python
Executable File
225 lines
9.3 KiB
Python
Executable File
import discord, re
|
|
from discord.ext import commands
|
|
from utils.sql_commands import DatabaseManager
|
|
|
|
|
|
class CustomCommandsCog(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.db = DatabaseManager("")
|
|
self.command_cache = {} # {guild_id: [commands_dicts]}
|
|
|
|
def get_guild_commands(self, guild_id):
|
|
if guild_id not in self.command_cache:
|
|
self.command_cache[guild_id] = self.db.fetch_all(
|
|
"SELECT * FROM custom_commands WHERE GUILDID = %s", (guild_id,)
|
|
)
|
|
return self.command_cache[guild_id]
|
|
|
|
def invalidate_cache(self, guild_id):
|
|
if guild_id in self.command_cache:
|
|
del self.command_cache[guild_id]
|
|
|
|
@commands.hybrid_command(name="addcommand")
|
|
@commands.has_permissions(manage_guild=True)
|
|
@commands.cooldown(1, 10, commands.BucketType.user)
|
|
async def add_command(self, ctx, command_name: str, *, response: str):
|
|
"""Add a new custom command"""
|
|
guild_id = str(ctx.guild.id)
|
|
command_name = command_name.lower()
|
|
|
|
existing_command = self.db.fetch_one(
|
|
"SELECT 1 FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s",
|
|
(guild_id, command_name),
|
|
)
|
|
if existing_command:
|
|
await ctx.send(f"A command with the name `{command_name}` already exists.")
|
|
return
|
|
|
|
try:
|
|
# Insert the new command into the database
|
|
self.db.execute_query(
|
|
"INSERT INTO custom_commands (GUILDID, COMMANDNAME, RESPONSE) VALUES (%s, %s, %s)",
|
|
(guild_id, command_name, response),
|
|
)
|
|
self.invalidate_cache(guild_id) # Invalidate cache after change
|
|
await ctx.send(f"Custom command `{command_name}` has been added!")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to add command `{command_name}`. Error: {e}")
|
|
|
|
@commands.hybrid_command(name="delcommand")
|
|
@commands.has_permissions(manage_guild=True)
|
|
@commands.cooldown(1, 10, commands.BucketType.user)
|
|
async def delete_command(self, ctx, command_name: str):
|
|
"""Delete a custom command"""
|
|
guild_id = str(ctx.guild.id)
|
|
command_name = command_name.lower()
|
|
|
|
try:
|
|
# Delete the command from the database
|
|
deleted_rows = self.db.execute_query(
|
|
"DELETE FROM custom_commands WHERE GUILDID = %s AND COMMANDNAME = %s",
|
|
(guild_id, command_name),
|
|
)
|
|
self.invalidate_cache(guild_id) # Invalidate cache after change
|
|
|
|
if deleted_rows is not None and len(deleted_rows) > 0:
|
|
await ctx.send(f"Custom command `{command_name}` has been deleted!")
|
|
else:
|
|
await ctx.send(f"Custom command `{command_name}` not found.")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to delete command `{command_name}`. Error: {e}")
|
|
|
|
@commands.hybrid_command(name="listcommands")
|
|
async def list_commands(self, ctx):
|
|
"""List all custom commands for this guild"""
|
|
guild_id = str(ctx.guild.id)
|
|
|
|
try:
|
|
# Retrieve all commands for the guild
|
|
commands = self.db.fetch_all(
|
|
"SELECT COMMANDNAME, RESPONSE FROM custom_commands WHERE GUILDID = %s",
|
|
(guild_id,),
|
|
)
|
|
|
|
if commands:
|
|
commands_list = "\n".join(
|
|
[f"`{cmd['COMMANDNAME']}`: {cmd['RESPONSE']}" for cmd in commands]
|
|
)
|
|
await ctx.send(f"**Custom Commands:**\n{commands_list}")
|
|
else:
|
|
await ctx.send("No custom commands found for this server.")
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to retrieve commands. Error: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message):
|
|
"""Listen for custom command invocations"""
|
|
if message.author.bot or not message.guild:
|
|
return
|
|
|
|
guild_id = str(message.guild.id)
|
|
command_name = message.content.strip().lower()
|
|
|
|
# Use cache instead of DB call
|
|
data: list = self.get_guild_commands(guild_id)
|
|
if not data:
|
|
return
|
|
|
|
result = None
|
|
|
|
for cmd in data:
|
|
if cmd["MATCHTYPE"] == "exact" and command_name == cmd["COMMANDNAME"]:
|
|
result = cmd
|
|
break
|
|
elif cmd["MATCHTYPE"] == "contains" and cmd["COMMANDNAME"] in command_name:
|
|
result = cmd
|
|
break
|
|
elif cmd["MATCHTYPE"] == "startswith" and command_name.startswith(
|
|
cmd["COMMANDNAME"]
|
|
):
|
|
result = cmd
|
|
break
|
|
elif cmd["MATCHTYPE"] == "endswith" and command_name.endswith(
|
|
cmd["COMMANDNAME"]
|
|
):
|
|
result = cmd
|
|
break
|
|
elif cmd["MATCHTYPE"] == "regex" and re.match(
|
|
re.compile(cmd["REGEX"]), command_name
|
|
):
|
|
result = cmd
|
|
break
|
|
else:
|
|
return
|
|
|
|
if result:
|
|
response = result["RESPONSE"]
|
|
|
|
# Build a dictionary of variables to replace
|
|
variables = {
|
|
"{USER}": message.author.name,
|
|
"{USER_MENTION}": message.author.mention,
|
|
"{USER_ID}": str(message.author.id),
|
|
"{USER_TAG}": str(message.author),
|
|
"{USER_AVATAR}": str(message.author.display_avatar.url),
|
|
"{USER_TOP_ROLE}": getattr(message.author.top_role, "name", ""),
|
|
"{USER_CREATED}": message.author.created_at.strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
),
|
|
"{USER_JOINED}": (
|
|
message.author.joined_at.strftime("%Y-%m-%d %H:%M:%S")
|
|
if message.author.joined_at
|
|
else ""
|
|
),
|
|
"{USER_NICK}": message.author.nick or message.author.name,
|
|
"{USER_COLOR}": str(
|
|
getattr(message.author.color, "to_rgb", lambda: "")()
|
|
),
|
|
"{USER_STATUS}": str(message.author.status),
|
|
"{USER_IS_BOT}": str(message.author.bot),
|
|
"{USER_DISCRIMINATOR}": message.author.discriminator,
|
|
"{CHANNEL}": message.channel.name,
|
|
"{CHANNEL_MENTION}": message.channel.mention,
|
|
"{CHANNEL_ID}": str(message.channel.id),
|
|
"{CHANNEL_TOPIC}": getattr(message.channel, "topic", ""),
|
|
"{CHANNEL_TYPE}": str(message.channel.type),
|
|
"{CHANNEL_CREATED}": message.channel.created_at.strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
),
|
|
"{GUILD}": message.guild.name,
|
|
"{GUILD_ID}": str(message.guild.id),
|
|
"{GUILD_OWNER}": str(message.guild.owner),
|
|
"{GUILD_OWNER_MENTION}": (
|
|
message.guild.owner.mention if message.guild.owner else ""
|
|
),
|
|
"{GUILD_MEMBERCOUNT}": str(message.guild.member_count),
|
|
"{GUILD_ICON}": (
|
|
str(message.guild.icon.url) if message.guild.icon else ""
|
|
),
|
|
"{GUILD_CREATED}": message.guild.created_at.strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
),
|
|
"{GUILD_BOOSTS}": str(
|
|
getattr(message.guild, "premium_subscription_count", "")
|
|
),
|
|
"{GUILD_BOOST_LEVEL}": str(getattr(message.guild, "premium_tier", "")),
|
|
"{MESSAGE}": message.content,
|
|
"{MESSAGE_ID}": str(message.id),
|
|
"{MESSAGE_LINK}": message.jump_url,
|
|
"{MESSAGE_TIMESTAMP}": message.created_at.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"{TIME}": discord.utils.utcnow().strftime("%H:%M:%S UTC"),
|
|
"{DATE}": discord.utils.utcnow().strftime("%Y-%m-%d"),
|
|
"{DAY}": discord.utils.utcnow().strftime("%A"),
|
|
"{MONTH}": discord.utils.utcnow().strftime("%B"),
|
|
"{YEAR}": discord.utils.utcnow().strftime("%Y"),
|
|
"{BOT}": (
|
|
message.guild.me.name if message.guild else self.client.user.name
|
|
),
|
|
"{BOT_MENTION}": (
|
|
message.guild.me.mention
|
|
if message.guild
|
|
else self.client.user.mention
|
|
),
|
|
"{BOT_ID}": str(
|
|
message.guild.me.id if message.guild else self.client.user.id
|
|
),
|
|
"{BOT_AVATAR}": str(
|
|
message.guild.me.display_avatar.url
|
|
if message.guild
|
|
else self.client.user.display_avatar.url
|
|
),
|
|
}
|
|
|
|
# Replace all variables in the response
|
|
for var, value in variables.items():
|
|
if value is None:
|
|
continue
|
|
response = response.replace(var, value)
|
|
|
|
await message.channel.send(response)
|
|
|
|
|
|
async def setup(client):
|
|
await client.add_cog(CustomCommandsCog(client))
|