# app.py import os import random import textwrap import logging import asyncio from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Tuple from dotenv import load_dotenv from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError import numpy as np from telegram import Update, InputFile from telegram.ext import Application, MessageHandler, filters, ContextTypes from telegram.constants import ParseMode from telegram.error import TelegramError # --- Configuration --- class Config: # Load environment variables - essential for Hugging Face secrets load_dotenv() TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN') if not TELEGRAM_TOKEN: # Use logger here if already configured, otherwise print/raise # logger.critical("TELEGRAM_TOKEN environment variable not set!") raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") # Directories (relative paths for container compatibility) PREDEFINED_TEMPLATES_DIR = "templates" OUTPUT_DIR = "generated_images" # Assumes arial.ttf is copied into the /app directory by the Dockerfile FONT_PATH = "arial.ttf" # Predefined Template Settings # Assuming templates are e.g., 1200x900 and have a space for the user image TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template # Auto Template Settings AUTO_TEMPLATES_COUNT = 5 AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template MIN_FONT_SIZE = 30 MAX_FONT_SIZE = 60 TEXT_STROKE_WIDTH = 2 NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1) # Other MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping JPEG_QUALITY = 85 # Quality for saving JPG images # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) # Reduce verbosity of httpx logger used by python-telegram-bot logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # --- Setup --- # Ensure necessary directories exist within the container's filesystem try: os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True) os.makedirs(Config.OUTPUT_DIR, exist_ok=True) logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}") except OSError as e: logger.error(f"Error creating directories: {e}", exc_info=True) # Depending on the error, you might want to exit or handle differently # Check for font file availability if not os.path.exists(Config.FONT_PATH): logger.warning(f"Font file not found at '{Config.FONT_PATH}'. Text rendering might use default system font or fail if Pillow cannot find a fallback.") # Pillow might find system fonts installed by Dockerfile's `apt-get install fontconfig ttf-mscorefonts-installer` # --- Helper Functions --- def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image: """Adds subtle noise to a PIL image using numpy.""" try: # Ensure image is in RGB mode for numpy array processing if img.mode != 'RGB': img = img.convert('RGB') img_array = np.array(img, dtype=np.float32) / 255.0 # Generate Gaussian noise matching image dimensions noise = np.random.randn(*img_array.shape) * intensity # Add noise and clip values to valid range [0, 1] noisy_img_array = np.clip(img_array + noise, 0.0, 1.0) # Convert back to PIL Image noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB') return noisy_img except Exception as e: logger.error(f"Error adding noise: {e}", exc_info=True) return img # Return original image on error # --- Image Processing Functions --- def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]: """ Applies user image and caption to a predefined template using pasting. Args: user_image_path: Path to the downloaded user image. caption: Text caption provided by the user. template_path: Path to the predefined template image. Returns: Path to the generated image (in OUTPUT_DIR), or None if an error occurred. """ # Generate a unique-ish output filename base_name = os.path.basename(template_path).split('.')[0] output_filename = f"result_{base_name}_{random.randint(1000, 9999)}.jpg" output_path = os.path.join(Config.OUTPUT_DIR, output_filename) try: # Use 'with' statement for automatic resource cleanup with Image.open(template_path).convert("RGBA") as template, \ Image.open(user_image_path).convert("RGBA") as user_image_orig: # Optional: Check if template size matches config (for consistency) if template.size != Config.TEMPLATE_SIZE: logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.") # Resize user image to fit the placeholder area using Lanczos resampling for quality user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS) # Create a working copy of the template to paste onto combined = template.copy() # Paste the resized user image into the placeholder position # The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None) # --- Add Caption --- draw = ImageDraw.Draw(combined) try: # Use a medium font size relative to config font_size = Config.MAX_FONT_SIZE // 2 font = ImageFont.truetype(Config.FONT_PATH, font_size) except IOError: logger.warning(f"Failed to load font: {Config.FONT_PATH}. Using Pillow's default.") font = ImageFont.load_default() # Wrap text according to configured width wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) # Calculate text position (e.g., centered below the placeholder area) text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Center horizontally text_x = (combined.width - text_width) // 2 # Position below placeholder, add some padding text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20 # Draw text with a simple shadow/stroke for better visibility # Draw shadow slightly offset shadow_offset = 1 draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center") # Draw main text draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center") # Convert to RGB before saving as JPG (removes alpha channel) combined_rgb = combined.convert("RGB") combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}") return output_path except FileNotFoundError: logger.error(f"Template or user image not found. Template: {template_path}, User Image: {user_image_path}") except UnidentifiedImageError: logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: {template_path}, User Image: {user_image_path}") except Exception as e: logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True) # Explicitly return None on any error during the process return None def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]: """ Generates a dynamic template with various effects. Args: user_image_path: Path to the downloaded user image. caption: Text caption provided by the user. variant: An integer index to introduce variation. Returns: Path to the generated image (in OUTPUT_DIR), or None if an error occurred. """ output_filename = f"auto_template_{variant}_{random.randint(1000, 9999)}.jpg" output_path = os.path.join(Config.OUTPUT_DIR, output_filename) try: # --- Create Background --- # Generate a random somewhat dark color for the background bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color) # --- Process User Image --- with Image.open(user_image_path) as user_img_orig: # Work on a copy user_img = user_img_orig.copy() # Resize to fit the designated area within the auto-template user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS) # Apply random effects based on the variant index effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'none']) # Add more variety logger.debug(f"Auto template variant {variant}: Applying effect '{effect_choice}'") if effect_choice == 'blur': user_img = user_img.filter(ImageFilter.GaussianBlur(random.uniform(0.5, 1.5))) elif effect_choice == 'noise': user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY) elif effect_choice == 'color': # Enhance or reduce color saturation enhancer = ImageEnhance.Color(user_img) user_img = enhancer.enhance(random.uniform(0.3, 1.7)) elif effect_choice == 'contrast': # Enhance or reduce contrast enhancer = ImageEnhance.Contrast(user_img) user_img = enhancer.enhance(random.uniform(0.7, 1.3)) # 'none' applies no extra filter # Add a decorative border with a random light color border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255)) border_width = random.randint(8, 20) user_img = ImageOps.expand(user_img, border=border_width, fill=border_color) # --- Paste User Image onto Background --- # Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically paste_x = (bg.width - user_img.width) // 2 paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center bg.paste(user_img, (paste_x, paste_y)) # --- Add Styled Text --- draw = ImageDraw.Draw(bg) try: # Random font size within configured range font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE) font = ImageFont.truetype(Config.FONT_PATH, font_size) except IOError: logger.warning(f"Failed to load font: {Config.FONT_PATH}. Using Pillow's default.") font = ImageFont.load_default() # Fallback font # Wrap text wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) # Calculate text position (centered horizontally, below the pasted image) text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] text_x = (bg.width - text_width) // 2 # Position below the image + border, add padding text_y = paste_y + user_img.height + 30 # Random bright text color and dark stroke color text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50)) # Draw text with stroke draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color, stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center") # Save the final image bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) logger.info(f"Generated auto-template image (variant {variant}): {output_path}") return output_path except FileNotFoundError: logger.error(f"User image not found during auto-template creation: {user_image_path}") except UnidentifiedImageError: logger.error(f"Could not identify user image file during auto-template creation: {user_image_path}") except Exception as e: logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True) return None def load_predefined_templates() -> List[str]: """Loads paths of all valid template images from the predefined directory.""" templates = [] logger.debug(f"Searching for templates in: {os.path.abspath(Config.PREDEFINED_TEMPLATES_DIR)}") try: if not os.path.isdir(Config.PREDEFINED_TEMPLATES_DIR): logger.warning(f"Predefined templates directory not found: {Config.PREDEFINED_TEMPLATES_DIR}") return [] for file in os.listdir(Config.PREDEFINED_TEMPLATES_DIR): # Check for common image extensions if file.lower().endswith(('.png', '.jpg', '.jpeg')): full_path = os.path.join(Config.PREDEFINED_TEMPLATES_DIR, file) templates.append(full_path) logger.info(f"Loaded {len(templates)} predefined templates.") except Exception as e: logger.error(f"Error loading predefined templates from {Config.PREDEFINED_TEMPLATES_DIR}: {e}", exc_info=True) return templates # This function orchestrates the image processing. It contains blocking Pillow calls, # so it's designed to be run in a thread pool executor by the async handler. def process_images(user_image_path: str, caption: str) -> List[str]: """ Processes the user image against predefined and auto-generated templates. This is a SYNCHRONOUS function. Args: user_image_path: Path to the temporary user image file. caption: The user's caption. Returns: A list of paths to the generated images. """ generated_image_paths: List[str] = [] predefined_templates = load_predefined_templates() # 1. Process predefined templates if predefined_templates: logger.info(f"Processing {len(predefined_templates)} predefined templates...") for template_path in predefined_templates: result_path = apply_template(user_image_path, caption, template_path) if result_path: generated_image_paths.append(result_path) else: logger.info("No predefined templates found or loaded.") # 2. Generate auto templates logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...") for i in range(Config.AUTO_TEMPLATES_COUNT): result_path = create_auto_template(user_image_path, caption, i) if result_path: generated_image_paths.append(result_path) logger.info(f"Finished processing. Generated {len(generated_image_paths)} images in total.") return generated_image_paths # --- Telegram Bot Handler --- async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handles incoming messages with photos and captions.""" # Basic check for essential message components if not update.message or not update.message.photo or not update.message.caption: # This case should ideally be filtered out by MessageHandler filters, but check just in case logger.warning("Handler invoked for message without photo or caption.") # Optionally reply, but filter should prevent this # await update.message.reply_text("⚠️ Please send an image with a caption text!") return user = update.message.from_user user_id = user.id if user else "UnknownUser" caption = update.message.caption message_id = update.message.message_id chat_id = update.message.chat_id logger.info(f"Received photo with caption from user {user_id} in chat {chat_id}.") # --- Download User Image --- # Create a unique temporary path for the downloaded image temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg") file_downloaded = False try: photo = update.message.photo[-1] # Get the highest resolution photo available logger.info(f"Downloading photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...") photo_file = await photo.get_file() await photo_file.download_to_drive(temp_user_image_path) logger.info(f"Photo downloaded successfully to {temp_user_image_path}") file_downloaded = True except TelegramError as e: logger.error(f"Telegram error downloading photo: {e}", exc_info=True) await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try again.") return except Exception as e: logger.error(f"Unexpected error downloading photo: {e}", exc_info=True) await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.") return if not file_downloaded: # Should not happen if exceptions are caught, but as safety return # --- Process Images in Executor --- # Notify user that processing has started try: processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True) message_to_delete = processing_message.message_id except TelegramError as e: logger.warning(f"Could not send 'Processing...' message: {e}") message_to_delete = None # Cannot delete if sending failed loop = asyncio.get_running_loop() generated_images = [] start_time = loop.time() try: # Run the blocking image processing function in the default thread pool executor generated_images = await loop.run_in_executor( None, # Use default ThreadPoolExecutor process_images, # The synchronous function to run temp_user_image_path, # Argument 1 for process_images caption # Argument 2 for process_images ) processing_time = loop.time() - start_time logger.info(f"Image processing completed in {processing_time:.2f} seconds.") except Exception as e: logger.error(f"Error during image processing executor call: {e}", exc_info=True) # Try to edit the "Processing..." message to show error if message_to_delete: try: await context.bot.edit_message_text( chat_id=chat_id, message_id=message_to_delete, text="❌ An unexpected error occurred during processing." ) message_to_delete = None # Don't try to delete it again later except TelegramError as edit_err: logger.warning(f"Could not edit processing message to show error: {edit_err}") # Fallback reply if editing failed await update.message.reply_text("❌ An unexpected error occurred during processing.") else: await update.message.reply_text("❌ An unexpected error occurred during processing.") finally: # Delete the "Processing..." message if it was sent and not edited to show error if message_to_delete: try: await context.bot.delete_message( chat_id=chat_id, message_id=message_to_delete ) except TelegramError as del_err: logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}") # --- Send Results --- if not generated_images: logger.warning("No images were generated successfully.") # Avoid sending redundant message if error was already reported if 'e' not in locals(): # Check if processing failed with exception 'e' await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. Please check the templates or try again.") else: logger.info(f"Sending {len(generated_images)} generated images back to user {user_id}.") sent_count = 0 for i, img_path in enumerate(generated_images): caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "Here's your styled image!" try: # Send the photo from the generated path with open(img_path, 'rb') as photo_data: await update.message.reply_photo( photo=InputFile(photo_data, filename=os.path.basename(img_path)), caption=caption_text ) sent_count += 1 logger.debug(f"Sent photo {os.path.basename(img_path)}") except FileNotFoundError: logger.error(f"Generated image file not found for sending: {img_path}") # Try to inform user about partial failure if multiple images were expected if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (file missing).") except TelegramError as e: logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.") except Exception as e: logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.") finally: # Clean up the generated image file regardless of sending success try: if os.path.exists(img_path): os.remove(img_path) logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}") except OSError as e: logger.error(f"Error deleting generated image file {img_path}: {e}") logger.info(f"Finished sending {sent_count}/{len(generated_images)} images.") # --- Final Cleanup --- # Clean up the originally downloaded user image try: if os.path.exists(temp_user_image_path): os.remove(temp_user_image_path) logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}") except OSError as e: logger.error(f"Error cleaning up user image {temp_user_image_path}: {e}", exc_info=True) # --- Main Execution --- if __name__ == "__main__": logger.info("Starting Telegram Bot...") if not Config.TELEGRAM_TOKEN: logger.critical("TELEGRAM_TOKEN is not set in environment variables or .env file. Bot cannot start.") exit(1) # Exit if token is missing try: # Build the application instance app = Application.builder().token(Config.TELEGRAM_TOKEN).build() # Add the handler for messages containing both a photo and a caption # Filters.PHOTO checks for `message.photo` being non-empty # Filters.CAPTION checks for `message.caption` being non-empty app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message)) logger.info("Bot application built. Starting polling...") # Start the bot polling for updates app.run_polling(allowed_updates=Update.ALL_TYPES) # Specify allowed updates if needed except Exception as e: logger.critical(f"Fatal error initializing or running the bot application: {e}", exc_info=True) exit(1)