Refactor bot setup and improve error handling; update command prefix handling and enhance session security in Flask app. Add profile and balance card generation features with image processing capabilities.

This commit is contained in:
2026-05-31 12:07:12 +00:00
parent 1b91cbcb2f
commit 55b16529b5
4 changed files with 356 additions and 66 deletions
+254
View File
@@ -0,0 +1,254 @@
import discord
from discord.ext import commands
from PIL import Image, ImageDraw, ImageFont, ImageOps
import io
import os
from dotenv import load_dotenv
import requests
from random import randint
# Bot setup
load_dotenv()
TOKEN = os.getenv("TOKEN")
intents = discord.Intents.all()
bot = commands.Bot(command_prefix="!?", intents=intents)
# Simple user data structure
user_data = {}
def get_user_data(user_id):
"""Get or initialize user data."""
if user_id not in user_data:
user_data[user_id] = {"xp": 0, "level": 53}
return user_data[user_id]
import io
from PIL import Image, ImageDraw, ImageFont
async def create_profile_card(user, background_image_path):
"""
Create a profile card image for a user with a custom background and text overlay.
Args:
user (discord.Member): The user whose profile card is being generated.
background_image_path (str): Path to the background image.
Returns:
io.BytesIO: A BytesIO object containing the profile card image.
"""
# Profile card dimensions
card_width, card_height = 400, 200
text_color = (255, 255, 255)
progress_bar_color = (0, 255, 0)
outline_color = (255, 255, 255)
tint_color = (50, 50, 50)
transparency = 25
opacity = int(255 * transparency / 100)
# Load and resize the background image
background_image = Image.open(background_image_path).resize((card_width, card_height))
# Create a new image with the background
card_image = Image.new("RGBA", (card_width, card_height))
card_image.paste(background_image, (0, 0))
# Create a semi-transparent overlay
overlay = Image.new("RGBA", card_image.size, tint_color + (opacity,))
card_image = Image.alpha_composite(card_image, overlay)
draw = ImageDraw.Draw(card_image)
# Load fonts
font_path = "arial.ttf"
font = ImageFont.truetype(font_path, 20)
# Draw user info
draw.text((10, 10), f"User: {user.display_name}", fill=text_color, font=font)
user_info = get_user_data(user.id)
draw.text((10, 40), f"Level: {user_info['level']}", fill=text_color, font=font)
draw.text((10, 70), f"XP: {user_info['xp']}/{user_info['level'] * 100}", fill=text_color, font=font)
# Draw the progress bar
max_xp = user_info['level'] * 100
progress_ratio = user_info['xp'] / max_xp if max_xp > 0 else 0
progress_width = int(progress_ratio * 200)
draw.rectangle([(10, 100), (210, 120)], outline=outline_color, width=2)
draw.rectangle([(10, 100), (10 + progress_width, 120)], fill=progress_bar_color)
# Draw the user's profile picture
try:
profile_size = 96
margin = 30
profile_picture_data = await user.avatar.read()
profile_image = Image.open(io.BytesIO(profile_picture_data)).resize((profile_size, profile_size))
card_image.paste(profile_image, (card_width - profile_size - margin, margin), profile_image.convert("RGBA"))
except Exception as error:
print(f"Error fetching profile picture: {error}")
# Save image to a BytesIO object
image_bytes = io.BytesIO()
card_image.save(image_bytes, format="PNG")
image_bytes.seek(0)
return image_bytes
async def create_balance_card(user, credits, tokens, bucks):
# Card dimensions
width, height = 450, 250
card = Image.new("RGB", (width, height), (255, 255, 255))
draw = ImageDraw.Draw(card)
# Gradient Background
gradient_color_1 = (38, 0, 77) # Dark purple
gradient_color_2 = (128, 0, 128) # Purple
for y in range(height):
blend = y / height
r = int(gradient_color_1[0] * (1 - blend) + gradient_color_2[0] * blend)
g = int(gradient_color_1[1] * (1 - blend) + gradient_color_2[1] * blend)
b = int(gradient_color_1[2] * (1 - blend) + gradient_color_2[2] * blend)
draw.line([(0, y), (width, y)], fill=(r, g, b))
# Avatar with circular border
try:
avatar_bytes = await user.avatar.read()
avatar = Image.open(io.BytesIO(avatar_bytes)).resize((60, 60))
except Exception:
# Fallback: blank avatar
avatar = Image.new("RGBA", (60, 60), (128, 128, 128, 255))
mask = Image.new("L", avatar.size, 0)
ImageDraw.Draw(mask).ellipse((0, 0, 60, 60), fill=255)
avatar = ImageOps.fit(avatar, mask.size, centering=(0.5, 0.5))
avatar.putalpha(mask)
# Draw avatar circle background
avatar_bg = Image.new("RGBA", (70, 70), (255, 255, 255, 0))
draw_bg = ImageDraw.Draw(avatar_bg)
draw_bg.ellipse((0, 0, 70, 70), fill=(255, 255, 255, 100)) # Border color
card.paste(avatar_bg, (20, height - 110), avatar_bg)
card.paste(avatar, (25, height - 105), avatar)
# Fonts
title_font = ImageFont.load_default()
balance_font = ImageFont.load_default()
# Header Text
draw.text((110, 20), "tatsu.", font=title_font, fill="white")
draw.text((width - 90, 25), "Member", font=title_font, fill="white")
# Balance Background Box
balance_box = (100, 70, width - 20, 200)
draw.rounded_rectangle(balance_box, radius=15, fill=(255, 255, 255, 50))
# Balance Items
icon_x = 120
text_x = icon_x + 35
# Credits
draw.ellipse((icon_x, 90, icon_x + 25, 115), fill=(255, 215, 0)) # Coin icon color
draw.text((text_x, 90), f"{credits:,} Credits", font=balance_font, fill="black")
# Tokens
draw.ellipse(
(icon_x, 125, icon_x + 25, 150), fill=(0, 255, 127)
) # Token icon color
draw.text((text_x, 125), f"{tokens:,} Tokens", font=balance_font, fill="black")
# Bucks
draw.ellipse(
(icon_x, 160, icon_x + 25, 185), fill=(50, 205, 50)
) # Bucks icon color
draw.text((text_x, 160), f"{bucks:,} Guild Bucks", font=balance_font, fill="black")
# User tag
user_tag = f"{user.name}#{user.discriminator}"
draw.text((110, height - 35), user_tag, font=title_font, fill="white")
# Add Chip (Credit Card Style)
chip_width, chip_height = 50, 35
chip_x = width - 80
chip_y = 100
draw.rectangle(
[chip_x, chip_y, chip_x + chip_width, chip_y + chip_height],
fill=(192, 192, 192), # Light gray color for the chip
outline="white",
width=2,
)
# Chip lines for texture
line_spacing = 7
for i in range(1, chip_height // line_spacing):
y = chip_y + i * line_spacing
draw.line(
[(chip_x + 5, y), (chip_x + chip_width - 5, y)], fill="white", width=1
)
# Save to a BytesIO object to send on Discord
with io.BytesIO() as image_binary:
card.save(image_binary, "PNG")
image_binary.seek(0)
return discord.File(fp=image_binary, filename="balance_card.png")
@bot.event
async def on_ready():
if bot.user is not None:
print(f"Logged in as {bot.user.name}")
else:
print(f"Logged in")
@bot.command(name="profile")
async def profile(ctx):
"""Command to display user profile."""
user = ctx.author
user_info = get_user_data(user.id)
# Create profile card image
profile_image = create_profile_card(user, f"images/image_{randint(1,100)}.jpg")
# Send the image in the channel
await ctx.send(file=discord.File(await profile_image, "profile.png"))
@bot.command()
async def balance(ctx):
user = ctx.author
# Example balance data; replace with actual data retrieval
credits = 9493006
tokens = 58
bucks = 234328
file = await create_balance_card(user, credits, tokens, bucks)
await ctx.send(file=file)
# Example command to add XP for testing
@bot.command(name="addxp")
async def add_xp(ctx, amount: int):
"""Command to add XP for testing."""
user_info = get_user_data(ctx.author.id)
user_info["xp"] += amount
# Level up logic (example: every 100 XP earns a level)
if user_info["xp"] >= user_info["level"] * 100:
user_info["level"] += 1
user_info["xp"] = 0 # Reset XP after leveling up
await ctx.send(
f"Added {amount} XP to {ctx.author.mention}. Total XP: {user_info['xp']} | Level: {user_info['level']}"
)
if TOKEN is not None:
bot.run(TOKEN)
else:
print(f"{TOKEN=}")
+42
View File
@@ -0,0 +1,42 @@
import requests
import os
from PIL import Image
from io import BytesIO
from random import shuffle
# Step 1: Fetch JSON data from the URL
url = "https://picsum.photos/v2/list?limit=1000"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
print(f"Failed to fetch image list: {e}")
data = []
# Step 2: Extract image download links
image_urls = [item["download_url"] for item in data]
shuffle(image_urls)
# Step 3: Create a directory to save images
os.makedirs("images", exist_ok=True)
# Desired size for resizing
desired_size = (450, 250)
# Step 4: Download and resize each image
for i, image_url in enumerate(image_urls):
try:
img_response = requests.get(image_url, timeout=10)
img_response.raise_for_status()
# Open image from response content
img = Image.open(BytesIO(img_response.content))
# Resize the image
img = img.resize(desired_size, Image.Resampling.LANCZOS)
# Save the resized image
img.save(f"images/image_{17}.jpg")
print(f"Downloaded and resized: image_{17}.jpg")
else:
print(f"Failed to download image from {image_url}")
if input() !="":
break
+4 -19
View File
@@ -3,15 +3,7 @@ import discord
from discord.ext import commands
import os
from dotenv import load_dotenv
from web.app import app
from utils.sql_commands import initialize_database
import threading
import itertools
def run_web():
app.run(debug=False, host="0.0.0.0", port=8080)
return
class MyNewHelp(commands.MinimalHelpCommand):
@@ -25,17 +17,12 @@ class MyNewHelp(commands.MinimalHelpCommand):
class Client(commands.Bot):
def __init__(self):
super().__init__(
command_prefix=self.iterate_prefix("Vamoc")+self.iterate_prefix("!V"),
command_prefix=["Vamoc", "!V"],
strip_after_prefix=True,
case_insensitive=True,
intents=discord.Intents.all(),
help_command=MyNewHelp(),
)
def iterate_prefix(self, prefix):
prefixes = list(map(''.join, itertools.product(*zip(prefix.upper(), prefix.lower()))))
print(prefixes)
return prefixes
async def setup_hook(self): # overwriting a handler
cogs_folder = f"{os.path.abspath(os.path.dirname(__file__))}/cogs"
@@ -53,12 +40,10 @@ def main():
load_dotenv()
initialize_database()
client = Client()
token = os.getenv("TOKEN")
if token is not None:
threading.Thread(target=run_web, daemon=True).start()
token = os.getenv("DISCORD_TOKEN") or os.getenv("TOKEN")
if not token:
raise SystemExit("ERROR: Discord token not found. Set DISCORD_TOKEN or TOKEN in environment.")
client.run(token)
else:
print("Token is missing.")
if __name__ == "__main__":
+55 -46
View File
@@ -20,6 +20,13 @@ import json
load_dotenv()
app = Flask(__name__)
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SECURE=os.getenv("FLASK_ENV") == "production",
SESSION_COOKIE_SAMESITE="Lax",
PERMANENT_SESSION_LIFETIME=datetime.timedelta(hours=1),
)
# Ensure a secret key is set for session security. In production this MUST be
# provided via the SECRET_KEY environment variable. In development we generate
# a temporary one (not suitable for production).
@@ -39,6 +46,19 @@ DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI")
DISCORD_API_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.getenv("TOKEN")
http = requests.Session()
http.headers.update({"User-Agent": "DiscordBotWeb/1.0"})
def discord_request(method, endpoint, headers=None, **kwargs):
url = f"{DISCORD_API_BASE_URL}{endpoint}"
try:
response = http.request(method, url, headers=headers, timeout=REQUEST_TIMEOUT, **kwargs)
response.raise_for_status()
return response
except requests.RequestException as exc:
app.logger.warning("Discord API request failed: %s %s %s", method, url, exc)
raise
# Default request timeout (seconds) for external HTTP calls
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "10"))
@@ -84,18 +104,13 @@ def callback():
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
response = requests.post(
f"{DISCORD_API_BASE_URL}/oauth2/token",
response = discord_request(
"POST",
"/oauth2/token",
data=data,
headers=headers,
timeout=REQUEST_TIMEOUT,
)
response.raise_for_status()
except requests.RequestException:
flash("Failed to retrieve access token from Discord (network error).")
return redirect(url_for("home"))
if response.status_code != 200:
flash("Failed to retrieve access token from Discord.")
return redirect(url_for("home"))
@@ -104,66 +119,52 @@ def callback():
flash("Access token missing in the response.")
return redirect(url_for("home"))
# Store access token in session for later use
session["access_token"] = access_token
session.permanent = True
# Fetch user info
try:
user_resp = requests.get(
f"{DISCORD_API_BASE_URL}/users/@me",
user_resp = discord_request(
"GET",
"/users/@me",
headers={"Authorization": f"Bearer {access_token}"},
timeout=REQUEST_TIMEOUT,
)
user_resp.raise_for_status()
user_data = user_resp.json()
except requests.RequestException:
flash("Failed to fetch user info from Discord.")
return redirect(url_for("home"))
# Store user information in session
session["user"] = user_data
# Fetch guilds the user is in
try:
guilds_response = requests.get(
f"{DISCORD_API_BASE_URL}/users/@me/guilds",
guilds_response = discord_request(
"GET",
"/users/@me/guilds",
headers={"Authorization": f"Bearer {access_token}"},
timeout=REQUEST_TIMEOUT,
)
guilds_response.raise_for_status()
except requests.RequestException:
flash("Failed to retrieve guilds from Discord.")
return redirect(url_for("home"))
if guilds_response.status_code == 200:
guilds = guilds_response.json() # Store guilds in a variable
session["guilds"] = guilds # Store guilds in session
guilds = guilds_response.json()
session["guilds"] = guilds
# Fetch member count and permissions for each guild
for guild in guilds:
guild_id = guild["id"]
try:
guild_info_response = requests.get(
f"{DISCORD_API_BASE_URL}/guilds/{guild_id}",
guild_info_response = discord_request(
"GET",
f"/guilds/{guild_id}",
headers={"Authorization": f"Bearer {access_token}"},
timeout=REQUEST_TIMEOUT,
)
guild_info_response.raise_for_status()
guild_info = guild_info_response.json()
guild["approx_member_count"] = guild_info.get("member_count", "N/A")
guild["permissions"] = guild_info.get(
"permissions", 0
) # Store permissions
guild["permissions"] = guild_info.get("permissions", 0)
except requests.RequestException:
guild["approx_member_count"] = "N/A"
else:
guild["approx_member_count"] = "N/A" # Fallback if unable to fetch
# Filter guilds where user can add bots
guilds_with_bot_permission = [guild for guild in guilds if can_add_bot(guild)]
session["guilds_with_bot_permission"] = guilds_with_bot_permission
else:
except requests.RequestException:
flash("Failed to retrieve guilds from Discord.")
return redirect(url_for("home"))
return redirect(url_for("home"))
@@ -352,12 +353,11 @@ def guild_settings(guild_id):
bot_headers = {"Authorization": f"Bot {DISCORD_BOT_TOKEN}"}
try:
guild_info_response = requests.get(
f"{DISCORD_API_BASE_URL}/guilds/{guild_id}?with_counts=true",
guild_info_response = discord_request(
"GET",
f"/guilds/{guild_id}?with_counts=true",
headers=bot_headers,
timeout=REQUEST_TIMEOUT,
)
guild_info_response.raise_for_status()
guild_info = guild_info_response.json()
except requests.RequestException:
flash("Failed to retrieve guild information (bot may not be in this guild or network error).")
@@ -369,12 +369,11 @@ def guild_settings(guild_id):
owner_name = "Unknown"
if owner_id != "N/A":
try:
owner_response = requests.get(
f"{DISCORD_API_BASE_URL}/users/{owner_id}",
owner_response = discord_request(
"GET",
f"/users/{owner_id}",
headers=bot_headers,
timeout=REQUEST_TIMEOUT,
)
owner_response.raise_for_status()
owner_data = owner_response.json()
owner_name = f"{owner_data.get('username', 'Unknown')}#{owner_data.get('discriminator', '0000')}"
except requests.RequestException:
@@ -406,4 +405,14 @@ def profile():
if __name__ == "__main__":
# Running via the Flask development server for local testing.
# For production, run under a WSGI server (gunicorn, waitress, etc.).
app.run(debug=True, port=5000, host="0.0.0.0", use_reloader=False)
if os.getenv("FLASK_ENV") == "production":
raise RuntimeError(
"Do not use the Flask development server in production. "
"Please serve this app with a WSGI server like gunicorn or waitress."
)
app.run(
debug=True,
port=int(os.getenv("PORT", "5000")),
host="0.0.0.0",
use_reloader=False,
)