Spaces:
Runtime error
Runtime error
File size: 24,769 Bytes
165c9f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 |
# app.py
import os
import random
import textwrap
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Tuple
from dotenv import load_dotenv
from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError
import numpy as np
from telegram import Update, InputFile
from telegram.ext import Application, MessageHandler, filters, ContextTypes
from telegram.constants import ParseMode
from telegram.error import TelegramError
# --- Configuration ---
class Config:
# Load environment variables - essential for Hugging Face secrets
load_dotenv()
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
if not TELEGRAM_TOKEN:
# Use logger here if already configured, otherwise print/raise
# logger.critical("TELEGRAM_TOKEN environment variable not set!")
raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.")
# Directories (relative paths for container compatibility)
PREDEFINED_TEMPLATES_DIR = "templates"
OUTPUT_DIR = "generated_images"
# Assumes arial.ttf is copied into the /app directory by the Dockerfile
FONT_PATH = "arial.ttf"
# Predefined Template Settings
# Assuming templates are e.g., 1200x900 and have a space for the user image
TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas
PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template
PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template
# Auto Template Settings
AUTO_TEMPLATES_COUNT = 5
AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images
AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template
MIN_FONT_SIZE = 30
MAX_FONT_SIZE = 60
TEXT_STROKE_WIDTH = 2
NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1)
# Other
MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping
JPEG_QUALITY = 85 # Quality for saving JPG images
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
# Reduce verbosity of httpx logger used by python-telegram-bot
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# --- Setup ---
# Ensure necessary directories exist within the container's filesystem
try:
os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}")
except OSError as e:
logger.error(f"Error creating directories: {e}", exc_info=True)
# Depending on the error, you might want to exit or handle differently
# Check for font file availability
if not os.path.exists(Config.FONT_PATH):
logger.warning(f"Font file not found at '{Config.FONT_PATH}'. Text rendering might use default system font or fail if Pillow cannot find a fallback.")
# Pillow might find system fonts installed by Dockerfile's `apt-get install fontconfig ttf-mscorefonts-installer`
# --- Helper Functions ---
def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image:
"""Adds subtle noise to a PIL image using numpy."""
try:
# Ensure image is in RGB mode for numpy array processing
if img.mode != 'RGB':
img = img.convert('RGB')
img_array = np.array(img, dtype=np.float32) / 255.0
# Generate Gaussian noise matching image dimensions
noise = np.random.randn(*img_array.shape) * intensity
# Add noise and clip values to valid range [0, 1]
noisy_img_array = np.clip(img_array + noise, 0.0, 1.0)
# Convert back to PIL Image
noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB')
return noisy_img
except Exception as e:
logger.error(f"Error adding noise: {e}", exc_info=True)
return img # Return original image on error
# --- Image Processing Functions ---
def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]:
"""
Applies user image and caption to a predefined template using pasting.
Args:
user_image_path: Path to the downloaded user image.
caption: Text caption provided by the user.
template_path: Path to the predefined template image.
Returns:
Path to the generated image (in OUTPUT_DIR), or None if an error occurred.
"""
# Generate a unique-ish output filename
base_name = os.path.basename(template_path).split('.')[0]
output_filename = f"result_{base_name}_{random.randint(1000, 9999)}.jpg"
output_path = os.path.join(Config.OUTPUT_DIR, output_filename)
try:
# Use 'with' statement for automatic resource cleanup
with Image.open(template_path).convert("RGBA") as template, \
Image.open(user_image_path).convert("RGBA") as user_image_orig:
# Optional: Check if template size matches config (for consistency)
if template.size != Config.TEMPLATE_SIZE:
logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.")
# Resize user image to fit the placeholder area using Lanczos resampling for quality
user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS)
# Create a working copy of the template to paste onto
combined = template.copy()
# Paste the resized user image into the placeholder position
# The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency
combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None)
# --- Add Caption ---
draw = ImageDraw.Draw(combined)
try:
# Use a medium font size relative to config
font_size = Config.MAX_FONT_SIZE // 2
font = ImageFont.truetype(Config.FONT_PATH, font_size)
except IOError:
logger.warning(f"Failed to load font: {Config.FONT_PATH}. Using Pillow's default.")
font = ImageFont.load_default()
# Wrap text according to configured width
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH)
# Calculate text position (e.g., centered below the placeholder area)
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center")
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Center horizontally
text_x = (combined.width - text_width) // 2
# Position below placeholder, add some padding
text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20
# Draw text with a simple shadow/stroke for better visibility
# Draw shadow slightly offset
shadow_offset = 1
draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center")
# Draw main text
draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center")
# Convert to RGB before saving as JPG (removes alpha channel)
combined_rgb = combined.convert("RGB")
combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY)
logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}")
return output_path
except FileNotFoundError:
logger.error(f"Template or user image not found. Template: {template_path}, User Image: {user_image_path}")
except UnidentifiedImageError:
logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: {template_path}, User Image: {user_image_path}")
except Exception as e:
logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True)
# Explicitly return None on any error during the process
return None
def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]:
"""
Generates a dynamic template with various effects.
Args:
user_image_path: Path to the downloaded user image.
caption: Text caption provided by the user.
variant: An integer index to introduce variation.
Returns:
Path to the generated image (in OUTPUT_DIR), or None if an error occurred.
"""
output_filename = f"auto_template_{variant}_{random.randint(1000, 9999)}.jpg"
output_path = os.path.join(Config.OUTPUT_DIR, output_filename)
try:
# --- Create Background ---
# Generate a random somewhat dark color for the background
bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color)
# --- Process User Image ---
with Image.open(user_image_path) as user_img_orig:
# Work on a copy
user_img = user_img_orig.copy()
# Resize to fit the designated area within the auto-template
user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS)
# Apply random effects based on the variant index
effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'none']) # Add more variety
logger.debug(f"Auto template variant {variant}: Applying effect '{effect_choice}'")
if effect_choice == 'blur':
user_img = user_img.filter(ImageFilter.GaussianBlur(random.uniform(0.5, 1.5)))
elif effect_choice == 'noise':
user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY)
elif effect_choice == 'color': # Enhance or reduce color saturation
enhancer = ImageEnhance.Color(user_img)
user_img = enhancer.enhance(random.uniform(0.3, 1.7))
elif effect_choice == 'contrast': # Enhance or reduce contrast
enhancer = ImageEnhance.Contrast(user_img)
user_img = enhancer.enhance(random.uniform(0.7, 1.3))
# 'none' applies no extra filter
# Add a decorative border with a random light color
border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255))
border_width = random.randint(8, 20)
user_img = ImageOps.expand(user_img, border=border_width, fill=border_color)
# --- Paste User Image onto Background ---
# Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically
paste_x = (bg.width - user_img.width) // 2
paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center
bg.paste(user_img, (paste_x, paste_y))
# --- Add Styled Text ---
draw = ImageDraw.Draw(bg)
try:
# Random font size within configured range
font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE)
font = ImageFont.truetype(Config.FONT_PATH, font_size)
except IOError:
logger.warning(f"Failed to load font: {Config.FONT_PATH}. Using Pillow's default.")
font = ImageFont.load_default() # Fallback font
# Wrap text
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH)
# Calculate text position (centered horizontally, below the pasted image)
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center")
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (bg.width - text_width) // 2
# Position below the image + border, add padding
text_y = paste_y + user_img.height + 30
# Random bright text color and dark stroke color
text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255))
stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50))
# Draw text with stroke
draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color,
stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center")
# Save the final image
bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY)
logger.info(f"Generated auto-template image (variant {variant}): {output_path}")
return output_path
except FileNotFoundError:
logger.error(f"User image not found during auto-template creation: {user_image_path}")
except UnidentifiedImageError:
logger.error(f"Could not identify user image file during auto-template creation: {user_image_path}")
except Exception as e:
logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True)
return None
def load_predefined_templates() -> List[str]:
"""Loads paths of all valid template images from the predefined directory."""
templates = []
logger.debug(f"Searching for templates in: {os.path.abspath(Config.PREDEFINED_TEMPLATES_DIR)}")
try:
if not os.path.isdir(Config.PREDEFINED_TEMPLATES_DIR):
logger.warning(f"Predefined templates directory not found: {Config.PREDEFINED_TEMPLATES_DIR}")
return []
for file in os.listdir(Config.PREDEFINED_TEMPLATES_DIR):
# Check for common image extensions
if file.lower().endswith(('.png', '.jpg', '.jpeg')):
full_path = os.path.join(Config.PREDEFINED_TEMPLATES_DIR, file)
templates.append(full_path)
logger.info(f"Loaded {len(templates)} predefined templates.")
except Exception as e:
logger.error(f"Error loading predefined templates from {Config.PREDEFINED_TEMPLATES_DIR}: {e}", exc_info=True)
return templates
# This function orchestrates the image processing. It contains blocking Pillow calls,
# so it's designed to be run in a thread pool executor by the async handler.
def process_images(user_image_path: str, caption: str) -> List[str]:
"""
Processes the user image against predefined and auto-generated templates.
This is a SYNCHRONOUS function.
Args:
user_image_path: Path to the temporary user image file.
caption: The user's caption.
Returns:
A list of paths to the generated images.
"""
generated_image_paths: List[str] = []
predefined_templates = load_predefined_templates()
# 1. Process predefined templates
if predefined_templates:
logger.info(f"Processing {len(predefined_templates)} predefined templates...")
for template_path in predefined_templates:
result_path = apply_template(user_image_path, caption, template_path)
if result_path:
generated_image_paths.append(result_path)
else:
logger.info("No predefined templates found or loaded.")
# 2. Generate auto templates
logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...")
for i in range(Config.AUTO_TEMPLATES_COUNT):
result_path = create_auto_template(user_image_path, caption, i)
if result_path:
generated_image_paths.append(result_path)
logger.info(f"Finished processing. Generated {len(generated_image_paths)} images in total.")
return generated_image_paths
# --- Telegram Bot Handler ---
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handles incoming messages with photos and captions."""
# Basic check for essential message components
if not update.message or not update.message.photo or not update.message.caption:
# This case should ideally be filtered out by MessageHandler filters, but check just in case
logger.warning("Handler invoked for message without photo or caption.")
# Optionally reply, but filter should prevent this
# await update.message.reply_text("⚠️ Please send an image with a caption text!")
return
user = update.message.from_user
user_id = user.id if user else "UnknownUser"
caption = update.message.caption
message_id = update.message.message_id
chat_id = update.message.chat_id
logger.info(f"Received photo with caption from user {user_id} in chat {chat_id}.")
# --- Download User Image ---
# Create a unique temporary path for the downloaded image
temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg")
file_downloaded = False
try:
photo = update.message.photo[-1] # Get the highest resolution photo available
logger.info(f"Downloading photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...")
photo_file = await photo.get_file()
await photo_file.download_to_drive(temp_user_image_path)
logger.info(f"Photo downloaded successfully to {temp_user_image_path}")
file_downloaded = True
except TelegramError as e:
logger.error(f"Telegram error downloading photo: {e}", exc_info=True)
await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try again.")
return
except Exception as e:
logger.error(f"Unexpected error downloading photo: {e}", exc_info=True)
await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.")
return
if not file_downloaded: # Should not happen if exceptions are caught, but as safety
return
# --- Process Images in Executor ---
# Notify user that processing has started
try:
processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True)
message_to_delete = processing_message.message_id
except TelegramError as e:
logger.warning(f"Could not send 'Processing...' message: {e}")
message_to_delete = None # Cannot delete if sending failed
loop = asyncio.get_running_loop()
generated_images = []
start_time = loop.time()
try:
# Run the blocking image processing function in the default thread pool executor
generated_images = await loop.run_in_executor(
None, # Use default ThreadPoolExecutor
process_images, # The synchronous function to run
temp_user_image_path, # Argument 1 for process_images
caption # Argument 2 for process_images
)
processing_time = loop.time() - start_time
logger.info(f"Image processing completed in {processing_time:.2f} seconds.")
except Exception as e:
logger.error(f"Error during image processing executor call: {e}", exc_info=True)
# Try to edit the "Processing..." message to show error
if message_to_delete:
try:
await context.bot.edit_message_text(
chat_id=chat_id,
message_id=message_to_delete,
text="❌ An unexpected error occurred during processing."
)
message_to_delete = None # Don't try to delete it again later
except TelegramError as edit_err:
logger.warning(f"Could not edit processing message to show error: {edit_err}")
# Fallback reply if editing failed
await update.message.reply_text("❌ An unexpected error occurred during processing.")
else:
await update.message.reply_text("❌ An unexpected error occurred during processing.")
finally:
# Delete the "Processing..." message if it was sent and not edited to show error
if message_to_delete:
try:
await context.bot.delete_message(
chat_id=chat_id,
message_id=message_to_delete
)
except TelegramError as del_err:
logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}")
# --- Send Results ---
if not generated_images:
logger.warning("No images were generated successfully.")
# Avoid sending redundant message if error was already reported
if 'e' not in locals(): # Check if processing failed with exception 'e'
await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. Please check the templates or try again.")
else:
logger.info(f"Sending {len(generated_images)} generated images back to user {user_id}.")
sent_count = 0
for i, img_path in enumerate(generated_images):
caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "Here's your styled image!"
try:
# Send the photo from the generated path
with open(img_path, 'rb') as photo_data:
await update.message.reply_photo(
photo=InputFile(photo_data, filename=os.path.basename(img_path)),
caption=caption_text
)
sent_count += 1
logger.debug(f"Sent photo {os.path.basename(img_path)}")
except FileNotFoundError:
logger.error(f"Generated image file not found for sending: {img_path}")
# Try to inform user about partial failure if multiple images were expected
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (file missing).")
except TelegramError as e:
logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True)
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.")
except Exception as e:
logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True)
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.")
finally:
# Clean up the generated image file regardless of sending success
try:
if os.path.exists(img_path):
os.remove(img_path)
logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}")
except OSError as e:
logger.error(f"Error deleting generated image file {img_path}: {e}")
logger.info(f"Finished sending {sent_count}/{len(generated_images)} images.")
# --- Final Cleanup ---
# Clean up the originally downloaded user image
try:
if os.path.exists(temp_user_image_path):
os.remove(temp_user_image_path)
logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}")
except OSError as e:
logger.error(f"Error cleaning up user image {temp_user_image_path}: {e}", exc_info=True)
# --- Main Execution ---
if __name__ == "__main__":
logger.info("Starting Telegram Bot...")
if not Config.TELEGRAM_TOKEN:
logger.critical("TELEGRAM_TOKEN is not set in environment variables or .env file. Bot cannot start.")
exit(1) # Exit if token is missing
try:
# Build the application instance
app = Application.builder().token(Config.TELEGRAM_TOKEN).build()
# Add the handler for messages containing both a photo and a caption
# Filters.PHOTO checks for `message.photo` being non-empty
# Filters.CAPTION checks for `message.caption` being non-empty
app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message))
logger.info("Bot application built. Starting polling...")
# Start the bot polling for updates
app.run_polling(allowed_updates=Update.ALL_TYPES) # Specify allowed updates if needed
except Exception as e:
logger.critical(f"Fatal error initializing or running the bot application: {e}", exc_info=True)
exit(1)
|