First Commit
This commit is contained in:
Executable
+430
@@ -0,0 +1,430 @@
|
||||
import discord
|
||||
import datetime
|
||||
import random
|
||||
from discord.ext import commands
|
||||
from utils.sql_commands import DatabaseManager
|
||||
import asyncio
|
||||
|
||||
|
||||
def random_gradient():
|
||||
"""
|
||||
Generate a random color in hexadecimal format.
|
||||
|
||||
Returns:
|
||||
int: A random color in hexadecimal format (e.g., 0xFFFFFF).
|
||||
"""
|
||||
return random.randint(0, 0xFFFFFF)
|
||||
|
||||
|
||||
def ordinal(number):
|
||||
"""
|
||||
Convert a number to its ordinal representation (e.g., 1st, 2nd, 3rd).
|
||||
|
||||
Args:
|
||||
number (int): The number to convert to an ordinal.
|
||||
|
||||
Returns:
|
||||
str: The ordinal representation of the given number.
|
||||
"""
|
||||
if 10 <= number % 100 <= 20:
|
||||
suffix = "th"
|
||||
else:
|
||||
suffix = {1: "st", 2: "nd", 3: "rd"}.get(number % 10, "th")
|
||||
return f"{number}{suffix}"
|
||||
|
||||
|
||||
async def get_channel_id(bot, ctx, allow_blank=False):
|
||||
"""
|
||||
Prompt the user to input a channel ID, allowing for options to skip or remove.
|
||||
|
||||
Args:
|
||||
bot (discord.Client): The bot instance.
|
||||
ctx (commands.Context): The context in which the command was invoked.
|
||||
allow_blank (bool, optional): If True, allows 'skip' or 'remove' as options.
|
||||
|
||||
Returns:
|
||||
int or str: The ID of the mentioned channel, "pass" if skipped, or None if removed.
|
||||
"""
|
||||
try:
|
||||
msg = await bot.wait_for(
|
||||
"message", check=lambda message: message.author == ctx.author, timeout=60
|
||||
)
|
||||
|
||||
# Handle blank input options
|
||||
if allow_blank:
|
||||
if msg.content.lower() == "skip":
|
||||
return "pass"
|
||||
if msg.content.lower() == "remove":
|
||||
return None
|
||||
|
||||
# Check if a channel was mentioned
|
||||
if msg.channel_mentions:
|
||||
return msg.channel_mentions[0].id
|
||||
|
||||
await ctx.send("Please mention a valid channel.")
|
||||
return await get_channel_id(bot, ctx, allow_blank)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
await ctx.send("You took too long to respond. Please run the command again.")
|
||||
return None
|
||||
|
||||
|
||||
async def server_join_embed(db: DatabaseManager, member: discord.Member):
|
||||
"""
|
||||
Create a dynamic welcome embed for a new server member.
|
||||
|
||||
Args:
|
||||
db (DatabaseManager): The database manager instance.
|
||||
member (discord.Member): The new member joining the server.
|
||||
|
||||
Returns:
|
||||
discord.Embed: A dynamically generated welcome embed for the new member.
|
||||
"""
|
||||
guild_channels = db.fetch_one(
|
||||
"SELECT * FROM guilds WHERE GUILD = %s", (member.guild.id,)
|
||||
)
|
||||
|
||||
# Prepare welcome messages
|
||||
welcome_messages = [
|
||||
f"🌟 Welcome to {member.guild.name}, {member.name}! 🌟",
|
||||
f"🎉 Greetings, {member.name}! Welcome to {member.guild.name}!",
|
||||
f"🚀 Ahoy, {member.name}! Set sail for adventure in {member.guild.name}!",
|
||||
]
|
||||
selected_message = random.choice(welcome_messages)
|
||||
|
||||
# Create embed
|
||||
embed = discord.Embed(
|
||||
title=selected_message,
|
||||
description="We're delighted to have you join our community. Enjoy your stay!",
|
||||
color=random_gradient(),
|
||||
)
|
||||
embed.set_thumbnail(url=member.display_avatar)
|
||||
|
||||
# Dynamic fields based on available channels
|
||||
fields = [
|
||||
(
|
||||
"➡️ Get Started",
|
||||
f"Check out the {get_channel_mention(guild_channels, 'RULES', member)} channel for server rules.",
|
||||
),
|
||||
(
|
||||
"📚 Server Guide",
|
||||
f"Explore the server guide in {get_channel_mention(guild_channels, 'GUIDE', member)}.",
|
||||
),
|
||||
(
|
||||
"👋 Introduce Yourself",
|
||||
f"Head over to {get_channel_mention(guild_channels, 'INTRODUCTIONS', member)} and let us know a bit about yourself.",
|
||||
),
|
||||
(
|
||||
"💼 Join our Events",
|
||||
f"Participate in {get_channel_mention(guild_channels, 'EVENTS', member)} for exciting events!",
|
||||
),
|
||||
]
|
||||
|
||||
for name, value in fields:
|
||||
if value:
|
||||
embed.add_field(name=name, value=value, inline=False)
|
||||
|
||||
# Member count update
|
||||
if guild_channels.get("MEMBERCOUNT"):
|
||||
member_count_channel = member.guild.get_channel(
|
||||
int(guild_channels["MEMBERCOUNT"])
|
||||
)
|
||||
if member_count_channel:
|
||||
await member_count_channel.edit(
|
||||
name=f"Members: {len(member.guild.members)}"
|
||||
)
|
||||
|
||||
# Joining position
|
||||
join_position = sorted(member.guild.members, key=lambda m: m.joined_at).index(member) + 1 # type: ignore
|
||||
embed.add_field(
|
||||
name="🏆 Joining Position",
|
||||
value=f"You are the {ordinal(join_position)} member to join!",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
# Greeting based on time of day
|
||||
current_time = datetime.datetime.now().time()
|
||||
greeting = (
|
||||
"Good morning!"
|
||||
if 6 <= current_time.hour < 12
|
||||
else "Good afternoon!" if 12 <= current_time.hour < 18 else "Good evening!"
|
||||
)
|
||||
embed.insert_field_at(
|
||||
0, name="🌞 Time of Day Greeting", value=greeting, inline=False
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
|
||||
def get_channel_mention(guild_channels, key, member):
|
||||
"""
|
||||
Retrieve a mention for a channel based on a key in the guild channels data.
|
||||
|
||||
Args:
|
||||
guild_channels (dict): The dictionary containing channel IDs.
|
||||
key (str): The key for the channel to mention.
|
||||
member (discord.Member): The member for whom the mention is being created.
|
||||
|
||||
Returns:
|
||||
str or None: The mention for the channel if found, otherwise None.
|
||||
"""
|
||||
channel_id = guild_channels.get(key)
|
||||
if channel_id:
|
||||
channel = member.guild.get_channel(int(channel_id))
|
||||
return channel.mention if channel else None
|
||||
return None
|
||||
|
||||
|
||||
class Channels(commands.Cog):
|
||||
"""Cog that handles channel-related commands and events."""
|
||||
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.db = DatabaseManager()
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_member_join(self, member: discord.Member):
|
||||
"""
|
||||
Event listener for new member joins. Sends a welcome embed if a welcome channel is set.
|
||||
|
||||
Args:
|
||||
member (discord.Member): The new member joining the server.
|
||||
"""
|
||||
embed = await server_join_embed(self.db, member)
|
||||
|
||||
# Fetch the welcome channel ID for the given guild
|
||||
result = self.db.fetch_one(
|
||||
"SELECT WELCOME FROM guilds WHERE GUILD = %s", (member.guild.id,)
|
||||
)
|
||||
|
||||
# Safely get the WELCOME value, defaulting to None if not found
|
||||
channel_id = result.get("WELCOME") if result else None
|
||||
|
||||
if channel_id is None:
|
||||
return # No welcome channel set
|
||||
|
||||
welcome_channel = self.client.get_channel(int(channel_id))
|
||||
if welcome_channel:
|
||||
await welcome_channel.send(embed=embed)
|
||||
|
||||
@commands.command(
|
||||
name="setchannels",
|
||||
brief="Setup channel IDs in the database",
|
||||
description="Adds or removes channel IDs to/from the database for use by the bot.",
|
||||
)
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def _channel_setup(self, ctx, specific_channel: str | None = None):
|
||||
"""
|
||||
Command to set up channel IDs in the database by prompting the user.
|
||||
|
||||
Args:
|
||||
ctx (commands.Context): The context in which the command was invoked.
|
||||
specific_channel (str, optional): The specific channel to set up, or None to set up all channels.
|
||||
"""
|
||||
channels = {}
|
||||
|
||||
channel_types = [
|
||||
"Welcome",
|
||||
"Rules",
|
||||
"Guide",
|
||||
"Introductions",
|
||||
"Events",
|
||||
"MemberCount",
|
||||
"Logging",
|
||||
"Ticketing",
|
||||
]
|
||||
|
||||
if specific_channel and specific_channel.capitalize() in channel_types:
|
||||
channel_types = [specific_channel.capitalize()]
|
||||
|
||||
await ctx.reply(
|
||||
"Let's set up the channels.\nPlease provide the channel ID,\nskip to keep it unchanged or\nremove to unset the channel:\n\n"
|
||||
)
|
||||
|
||||
# Collect channel IDs
|
||||
for channel_type in channel_types:
|
||||
await ctx.send(f"{len(channels)}. {channel_type} Channel ID:")
|
||||
channels[channel_type] = await get_channel_id(
|
||||
self.client, ctx, allow_blank=True
|
||||
)
|
||||
|
||||
# Check if existing entries need updating
|
||||
overwrite = (self.db.fetch_one("SELECT GUILD FROM guilds WHERE GUILD = %s", (ctx.guild.id,))is not None)
|
||||
|
||||
# Prepare data for the insert or update operation
|
||||
data = tuple(
|
||||
{
|
||||
"GUILD": ctx.guild.id,
|
||||
"WELCOME": channels.get("Welcome"),
|
||||
"RULES": channels.get("Rules"),
|
||||
"GUIDE": channels.get("Guide"),
|
||||
"INTRODUCTIONS": channels.get("Introductions"),
|
||||
"EVENTS": channels.get("Events"),
|
||||
"MEMBERCOUNT": channels.get("MemberCount"),
|
||||
"LOGGING": channels.get("Logging"),
|
||||
"TICKETING": channels.get("Ticketing"),
|
||||
}.values()
|
||||
)
|
||||
|
||||
# Insert or update the channels in the database
|
||||
self.db.insert(
|
||||
"INSERT INTO guilds (GUILD, WELCOME, RULES, GUIDE, INTRODUCTIONS, EVENTS, MEMBERCOUNT, LOGGING, TICKETING) "
|
||||
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ",
|
||||
data,
|
||||
overwrite=overwrite,
|
||||
)
|
||||
|
||||
# Create a response message for the user
|
||||
response = "\n".join(
|
||||
[
|
||||
f"{channel_type} Channel ID: {get_channel_mention(channels, channel_type, ctx.author)}"
|
||||
for channel_type in channels
|
||||
if channel_type != "pass"
|
||||
]
|
||||
)
|
||||
await ctx.send(f"Channels set!\n{response}")
|
||||
|
||||
@commands.command(
|
||||
name="welcomeTest",
|
||||
brief="Test welcome message",
|
||||
description="Sends a welcome message to the welcome channel.",
|
||||
)
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def _welcome_test(self, ctx):
|
||||
"""
|
||||
Command to test the welcome message in the designated welcome channel.
|
||||
|
||||
Args:
|
||||
ctx (commands.Context): The context in which the command was invoked.
|
||||
"""
|
||||
member = ctx.author
|
||||
embed = await server_join_embed(self.db, member)
|
||||
|
||||
channel_id = self.db.fetch_one(
|
||||
"SELECT WELCOME FROM guilds WHERE GUILD=%s", (ctx.guild.id,)
|
||||
).get("WELCOME")
|
||||
if channel_id is None:
|
||||
await ctx.reply("No Welcome Channel Set")
|
||||
return
|
||||
|
||||
welcome_channel = self.client.get_channel(int(channel_id))
|
||||
if welcome_channel:
|
||||
await welcome_channel.send(embed=embed)
|
||||
else:
|
||||
await ctx.reply(
|
||||
"The Welcome Channel is invalid. Please set a valid channel."
|
||||
)
|
||||
|
||||
@commands.command(
|
||||
name="autocreatechannels",
|
||||
brief="Auto-create all recommended channels and categories if missing.",
|
||||
description="Creates all recommended channels and categories if they do not exist.",
|
||||
)
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def autocreatechannels(self, ctx):
|
||||
"""
|
||||
Command to auto-create all recommended channels and categories if missing.
|
||||
"""
|
||||
# Recommended categories and their channels
|
||||
categories_and_channels = {
|
||||
"Welcome & Info": [
|
||||
"welcome",
|
||||
"rules",
|
||||
"server-guide",
|
||||
"introductions",
|
||||
"announcements",
|
||||
],
|
||||
"Community": [
|
||||
"general",
|
||||
"off-topic",
|
||||
"media",
|
||||
"bot-commands",
|
||||
"suggestions",
|
||||
"polls",
|
||||
],
|
||||
"Support": ["help", "faq"],
|
||||
"Events": ["events", "event-signup", "giveaways"],
|
||||
"Moderation": ["mod-log", "reports"],
|
||||
"Voice": ["General Voice", "Music", "AFK"],
|
||||
"Management": ["member-count", "ticketing"],
|
||||
"Staff": ["staff", "staff-announcements", "staff-logs"],
|
||||
}
|
||||
|
||||
created = []
|
||||
for category_name, channel_names in categories_and_channels.items():
|
||||
category = await self.ensure_category(ctx.guild, category_name)
|
||||
for ch_name in channel_names:
|
||||
# For voice channels
|
||||
if (
|
||||
"voice" in ch_name.lower()
|
||||
or ch_name.lower() == "music"
|
||||
or ch_name.lower() == "afk"
|
||||
):
|
||||
ch_type = discord.ChannelType.voice
|
||||
else:
|
||||
ch_type = discord.ChannelType.text
|
||||
channel = discord.utils.get(ctx.guild.channels, name=ch_name)
|
||||
if not channel:
|
||||
channel = await self.ensure_channel(
|
||||
ctx.guild, ch_name, type=ch_type, category=category
|
||||
)
|
||||
created.append(channel.mention)
|
||||
# Set permissions template for some channels
|
||||
if ch_name.lower() == "rules" and isinstance(channel, discord.TextChannel):
|
||||
await self.set_channel_permissions(channel, "rules")
|
||||
if created:
|
||||
await ctx.send(f"Created channels: {', '.join(created)}")
|
||||
else:
|
||||
await ctx.send("All recommended channels already exist.")
|
||||
|
||||
async def ensure_channel(
|
||||
self, guild: discord.Guild, name, type=discord.ChannelType.text, category=None
|
||||
):
|
||||
existing = discord.utils.get(guild.channels, name=name)
|
||||
if existing:
|
||||
return existing
|
||||
if type == discord.ChannelType.voice:
|
||||
return await guild.create_voice_channel(name, category=category)
|
||||
else:
|
||||
return await guild.create_text_channel(name, category=category)
|
||||
|
||||
async def ensure_category(self, guild: discord.Guild, name):
|
||||
existing = discord.utils.get(guild.categories, name=name)
|
||||
if existing:
|
||||
return existing
|
||||
return await guild.create_category(name)
|
||||
|
||||
async def set_channel_permissions(self, channel: discord.TextChannel, template):
|
||||
if template == "rules":
|
||||
await channel.set_permissions(
|
||||
channel.guild.default_role, send_messages=False
|
||||
)
|
||||
# Add more permission logic as needed
|
||||
|
||||
@commands.command()
|
||||
@commands.has_permissions(manage_channels=True)
|
||||
async def cleanup_channels(self, ctx):
|
||||
unused = [ch for ch in ctx.guild.text_channels if ch.last_message_id is None]
|
||||
for ch in unused:
|
||||
await ch.delete(reason="Channel cleanup")
|
||||
await ctx.send(f"Deleted {len(unused)} unused channels.")
|
||||
|
||||
@commands.command()
|
||||
async def channelinfo(self, ctx, channel: discord.TextChannel | None = None):
|
||||
channel = channel or ctx.channel
|
||||
embed = discord.Embed(title=f"Info for #{channel.name}")
|
||||
embed.add_field(name="ID", value=channel.id)
|
||||
embed.add_field(name="Created", value=channel.created_at)
|
||||
embed.add_field(name="Topic", value=channel.topic or "None")
|
||||
embed.add_field(name="Slowmode", value=channel.slowmode_delay)
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
|
||||
async def setup(client):
|
||||
"""
|
||||
Setup function to load the Channels cog.
|
||||
|
||||
Args:
|
||||
client (commands.Bot): The bot instance.
|
||||
"""
|
||||
await client.add_cog(Channels(client))
|
||||
Reference in New Issue
Block a user