# app.py import os import random import textwrap import logging import asyncio from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Tuple # Attempt to load environment variables from .env file for local testing # In Hugging Face, secrets are injected directly as environment variables from dotenv import load_dotenv load_dotenv() # Load .env file if it exists from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError import numpy as np from telegram import Update, InputFile from telegram.ext import Application, MessageHandler, filters, ContextTypes from telegram.constants import ParseMode from telegram.error import TelegramError # --- Configuration --- class Config: # Read Telegram token from environment variable (set in HF Secrets) TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN') if not TELEGRAM_TOKEN: # Log critical error if token is missing logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.critical("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") # Directories (relative paths for container compatibility) PREDEFINED_TEMPLATES_DIR = "templates" OUTPUT_DIR = "generated_images" # UPDATED: Use the font name "Arial". Pillow should find the system-installed font. FONT_PATH = "Arial" # Predefined Template Settings TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template # Auto Template Settings AUTO_TEMPLATES_COUNT = 5 AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template MIN_FONT_SIZE = 30 MAX_FONT_SIZE = 60 TEXT_STROKE_WIDTH = 2 NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1) # Other MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping JPEG_QUALITY = 85 # Quality for saving JPG images # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) # Reduce verbosity of underlying HTTP library logger used by python-telegram-bot logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # --- Setup --- # Ensure necessary directories exist within the container's filesystem # This runs when the script starts inside the container try: # These paths are relative to the WORKDIR /app set in the Dockerfile os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True) os.makedirs(Config.OUTPUT_DIR, exist_ok=True) logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}") except OSError as e: logger.error(f"FATAL: Error creating essential directories: {e}", exc_info=True) # If directories cannot be created, the app likely cannot function raise SystemExit(f"FATAL: Cannot create directories {Config.PREDEFINED_TEMPLATES_DIR} or {Config.OUTPUT_DIR}") from e # Font check is now less critical as we rely on system fonts, but Pillow will log if "Arial" can't be found. logger.info(f"Application will attempt to use font '{Config.FONT_PATH}' via system font discovery (fontconfig).") # --- Helper Functions --- def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image: """Adds subtle noise to a PIL image using numpy.""" try: # Ensure image is in RGB mode for numpy array processing if img.mode != 'RGB': img = img.convert('RGB') img_array = np.array(img, dtype=np.float32) / 255.0 # Generate Gaussian noise matching image dimensions noise = np.random.randn(*img_array.shape) * intensity # Add noise and clip values to valid range [0, 1] noisy_img_array = np.clip(img_array + noise, 0.0, 1.0) # Convert back to PIL Image noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB') return noisy_img except Exception as e: logger.error(f"Error adding noise: {e}", exc_info=True) return img # Return original image on error # --- Image Processing Functions --- def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]: """ Applies user image and caption to a predefined template using pasting. Args: user_image_path: Path to the downloaded user image. caption: Text caption provided by the user. template_path: Path to the predefined template image. Returns: Path to the generated image (in OUTPUT_DIR), or None if an error occurred. """ # Generate a unique-ish output filename to avoid conflicts base_name = os.path.basename(template_path).split('.')[0] # Include a random element to prevent overwriting if called rapidly output_filename = f"result_{base_name}_{random.randint(10000, 99999)}.jpg" output_path = os.path.join(Config.OUTPUT_DIR, output_filename) logger.debug(f"Applying template '{os.path.basename(template_path)}' to user image '{os.path.basename(user_image_path)}'. Output: {output_path}") try: # Use 'with' statement for automatic resource cleanup (file closing) with Image.open(template_path).convert("RGBA") as template, \ Image.open(user_image_path).convert("RGBA") as user_image_orig: # Optional: Check if template size matches config (for consistency) if template.size != Config.TEMPLATE_SIZE: logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.") # Resize user image to fit the placeholder area using Lanczos resampling for quality logger.debug(f"Resizing user image to placeholder size {Config.PLACEHOLDER_SIZE}") user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS) # Create a working copy of the template to paste onto combined = template.copy() # Paste the resized user image into the placeholder position # The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency logger.debug(f"Pasting resized user image at {Config.PLACEHOLDER_POSITION}") combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None) # --- Add Caption --- logger.debug("Adding caption to template image") draw = ImageDraw.Draw(combined) try: # Use a medium font size relative to config font_size = Config.MAX_FONT_SIZE // 2 logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") font = ImageFont.truetype(Config.FONT_PATH, font_size) except IOError: # This can happen if font "Arial" is not found by Pillow logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") font = ImageFont.load_default() # Wrap text according to configured width wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") # Calculate text position (e.g., centered below the placeholder area) # Use textbbox for more accurate positioning text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Center horizontally text_x = (combined.width - text_width) // 2 # Position below placeholder, add some padding text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20 # Adjust padding as needed logger.debug(f"Calculated text position: ({text_x}, {text_y})") # Draw text with a simple shadow/stroke for better visibility # Draw shadow slightly offset shadow_offset = 1 draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center") # Draw main text draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center") # Convert to RGB before saving as JPG (removes alpha channel) logger.debug("Converting final image to RGB") combined_rgb = combined.convert("RGB") logger.debug(f"Saving final image to {output_path} with quality {Config.JPEG_QUALITY}") combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}") return output_path except FileNotFoundError: logger.error(f"Template or user image not found. Template: '{template_path}', User Image: '{user_image_path}'") except UnidentifiedImageError: logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: '{template_path}', User Image: '{user_image_path}'") except Exception as e: # Log detailed error including traceback logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True) # Explicitly return None on any error during the process return None def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]: """ Generates a dynamic template with various effects. Args: user_image_path: Path to the downloaded user image. caption: Text caption provided by the user. variant: An integer index to introduce variation. Returns: Path to the generated image (in OUTPUT_DIR), or None if an error occurred. """ output_filename = f"auto_template_{variant}_{random.randint(10000, 99999)}.jpg" output_path = os.path.join(Config.OUTPUT_DIR, output_filename) logger.debug(f"Creating auto template variant {variant}. Output: {output_path}") try: # --- Create Background --- # Generate a random somewhat dark color for the background bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) logger.debug(f"Auto template {variant}: Background color {bg_color}") bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color) # --- Process User Image --- with Image.open(user_image_path) as user_img_orig: # Work on a copy user_img = user_img_orig.copy() # Resize to fit the designated area within the auto-template logger.debug(f"Auto template {variant}: Resizing user image to {Config.AUTO_USER_IMAGE_SIZE}") user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS) # Apply random effects based on a random choice (not just variant index) effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'sharpness', 'none']) logger.debug(f"Auto template {variant}: Applying effect '{effect_choice}'") if effect_choice == 'blur': blur_radius = random.uniform(0.5, 1.8) logger.debug(f"Applying GaussianBlur with radius {blur_radius:.2f}") user_img = user_img.filter(ImageFilter.GaussianBlur(blur_radius)) elif effect_choice == 'noise': logger.debug(f"Applying noise with intensity {Config.NOISE_INTENSITY:.2f}") user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY) elif effect_choice == 'color': # Enhance or reduce color saturation color_factor = random.uniform(0.3, 1.7) logger.debug(f"Enhancing color with factor {color_factor:.2f}") enhancer = ImageEnhance.Color(user_img) user_img = enhancer.enhance(color_factor) elif effect_choice == 'contrast': # Enhance or reduce contrast contrast_factor = random.uniform(0.7, 1.4) logger.debug(f"Enhancing contrast with factor {contrast_factor:.2f}") enhancer = ImageEnhance.Contrast(user_img) user_img = enhancer.enhance(contrast_factor) elif effect_choice == 'sharpness': # Enhance sharpness sharpness_factor = random.uniform(1.1, 2.0) logger.debug(f"Enhancing sharpness with factor {sharpness_factor:.2f}") enhancer = ImageEnhance.Sharpness(user_img) user_img = enhancer.enhance(sharpness_factor) # 'none' applies no extra filter # Add a decorative border with a random light color border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255)) border_width = random.randint(8, 20) logger.debug(f"Adding border width {border_width} color {border_color}") user_img = ImageOps.expand(user_img, border=border_width, fill=border_color) # --- Paste User Image onto Background --- # Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically paste_x = (bg.width - user_img.width) // 2 paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center logger.debug(f"Pasting processed user image at ({paste_x}, {paste_y})") bg.paste(user_img, (paste_x, paste_y)) # --- Add Styled Text --- logger.debug("Adding caption to auto template") draw = ImageDraw.Draw(bg) try: # Random font size within configured range font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE) logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") font = ImageFont.truetype(Config.FONT_PATH, font_size) except IOError: # This can happen if font "Arial" is not found by Pillow logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") font = ImageFont.load_default() # Fallback font # Wrap text wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") # Calculate text position (centered horizontally, below the pasted image) text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] text_x = (bg.width - text_width) // 2 # Position below the image + border, add padding text_y = paste_y + user_img.height + 30 logger.debug(f"Calculated text position: ({text_x}, {text_y})") # Random bright text color and dark stroke color text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50)) logger.debug(f"Text color {text_color}, stroke color {stroke_color}") # Draw text with stroke draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color, stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center") # Save the final image logger.debug(f"Saving final auto template image to {output_path} with quality {Config.JPEG_QUALITY}") bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) logger.info(f"Generated auto-template image (variant {variant}): {output_path}") return output_path except FileNotFoundError: logger.error(f"User image not found during auto-template creation: '{user_image_path}'") except UnidentifiedImageError: logger.error(f"Could not identify user image file during auto-template creation: '{user_image_path}'") except Exception as e: logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True) return None def load_predefined_templates() -> List[str]: """Loads paths of all valid template images from the predefined directory.""" templates = [] template_dir = Config.PREDEFINED_TEMPLATES_DIR logger.debug(f"Searching for templates in directory: {os.path.abspath(template_dir)}") try: if not os.path.isdir(template_dir): logger.warning(f"Predefined templates directory not found: '{template_dir}'") return [] # Return empty list if directory doesn't exist files = os.listdir(template_dir) logger.debug(f"Found {len(files)} files/dirs in template directory.") for file in files: # Check for common image extensions if file.lower().endswith(('.png', '.jpg', '.jpeg')): full_path = os.path.join(template_dir, file) if os.path.isfile(full_path): # Ensure it's actually a file templates.append(full_path) logger.debug(f"Found template: {full_path}") else: logger.warning(f"Found item with image extension but is not a file: {full_path}") if not templates: logger.warning(f"No valid template image files found in '{template_dir}'.") else: logger.info(f"Loaded {len(templates)} predefined templates.") except Exception as e: logger.error(f"Error loading predefined templates from '{template_dir}': {e}", exc_info=True) return templates # This function orchestrates the image processing. It contains blocking Pillow calls, # so it's designed to be run in a thread pool executor by the async handler. def process_images(user_image_path: str, caption: str) -> List[str]: """ Processes the user image against predefined and auto-generated templates. This is a SYNCHRONOUS function. Args: user_image_path: Path to the temporary user image file. caption: The user's caption. Returns: A list of paths to the generated images. """ logger.info("Starting image processing task...") generated_image_paths: List[str] = [] predefined_templates = load_predefined_templates() # 1. Process predefined templates if predefined_templates: logger.info(f"Processing {len(predefined_templates)} predefined templates...") for template_path in predefined_templates: result_path = apply_template(user_image_path, caption, template_path) if result_path: generated_image_paths.append(result_path) else: logger.warning(f"Failed to generate image for template: {os.path.basename(template_path)}") else: logger.info("Skipping predefined templates (none found or loaded).") # 2. Generate auto templates logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...") for i in range(Config.AUTO_TEMPLATES_COUNT): result_path = create_auto_template(user_image_path, caption, i) if result_path: generated_image_paths.append(result_path) else: logger.warning(f"Failed to generate auto-template variant {i}") logger.info(f"Image processing task finished. Generated {len(generated_image_paths)} images in total.") return generated_image_paths # --- Telegram Bot Handler --- async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handles incoming messages with photos and captions.""" # Basic check for essential message components if not update.message or not update.message.photo or not update.message.caption: logger.warning("Handler invoked for message missing photo or caption. This shouldn't happen with the current filters.") return user = update.message.from_user # Use 'UnknownUser' or similar if user info isn't available user_id = user.id if user else "UnknownUser" user_info = f"user_id={user_id}" + (f", username={user.username}" if user and user.username else "") caption = update.message.caption message_id = update.message.message_id chat_id = update.message.chat_id logger.info(f"Received photo with caption from {user_info} in chat {chat_id} (message_id={message_id}).") # --- Download User Image --- # Create a unique temporary path for the downloaded image within the OUTPUT_DIR temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg") file_downloaded = False download_start_time = asyncio.get_running_loop().time() try: photo = update.message.photo[-1] # Get the highest resolution photo available logger.info(f"Attempting download photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...") photo_file = await photo.get_file() await photo_file.download_to_drive(temp_user_image_path) download_time = asyncio.get_running_loop().time() - download_start_time logger.info(f"Photo downloaded successfully to '{temp_user_image_path}' in {download_time:.2f} seconds.") file_downloaded = True except TelegramError as e: logger.error(f"Telegram error downloading photo for message {message_id}: {e}", exc_info=True) await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try sending it again.") return # Stop processing if download fails except Exception as e: logger.error(f"Unexpected error downloading photo for message {message_id}: {e}", exc_info=True) await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.") return # Stop processing if download fails # Safety check, though exceptions should prevent reaching here if download failed if not file_downloaded or not os.path.exists(temp_user_image_path): logger.error(f"Download reported success but file '{temp_user_image_path}' does not exist.") await update.message.reply_text("❌ An internal error occurred after downloading the image.") return # --- Process Images in Executor --- # Notify user that processing has started processing_message = None try: # Quote the original message for context processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True) except TelegramError as e: logger.warning(f"Could not send 'Processing...' message to chat {chat_id}: {e}") # Continue processing even if status message fails message_to_delete = processing_message.message_id if processing_message else None loop = asyncio.get_running_loop() generated_images = [] processing_failed = False processing_start_time = loop.time() try: logger.info(f"Submitting image processing task to executor for user image '{os.path.basename(temp_user_image_path)}'") # Run the blocking image processing function in the default thread pool executor generated_images = await loop.run_in_executor( None, # Use default ThreadPoolExecutor process_images, # The synchronous function to run temp_user_image_path, # Argument 1 for process_images caption # Argument 2 for process_images ) processing_time = loop.time() - processing_start_time logger.info(f"Image processing task completed in {processing_time:.2f} seconds.") except Exception as e: processing_failed = True logger.error(f"Error during image processing executor call for message {message_id}: {e}", exc_info=True) error_message = "❌ An unexpected error occurred during processing. Please try again later." # Try to edit the "Processing..." message to show error if message_to_delete: try: await context.bot.edit_message_text( chat_id=chat_id, message_id=message_to_delete, text=error_message ) message_to_delete = None # Mark as handled, don't delete later except TelegramError as edit_err: logger.warning(f"Could not edit processing message {message_to_delete} to show error: {edit_err}") # Fallback reply if editing failed await update.message.reply_text(error_message) else: # If sending initial status failed, just send error as new message await update.message.reply_text(error_message) # Delete the "Processing..." message if it was sent and processing didn't fail or error wasn't edited in if message_to_delete: try: await context.bot.delete_message( chat_id=chat_id, message_id=message_to_delete ) logger.debug(f"Deleted 'Processing...' message {message_to_delete}") except TelegramError as del_err: # Log warning, but don't bother user if deleting status message fails logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}") # --- Send Results --- # Only proceed if processing didn't explicitly fail with an exception if not processing_failed: if not generated_images: logger.warning(f"Image processing finished but generated 0 images for message {message_id}.") # Inform user if no images were created (e.g., no templates found, or all failed internally) await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. There might be an issue with the templates.") else: send_start_time = loop.time() logger.info(f"Sending {len(generated_images)} generated images back to user {user_id} for message {message_id}.") sent_count = 0 for i, img_path in enumerate(generated_images): # Check if file exists before attempting to send if not os.path.exists(img_path): logger.error(f"Generated image file not found before sending: '{img_path}'") if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (internal file missing).") continue # Skip to next image caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "🖼️ Here's your styled image!" try: # Send the photo from the generated path using InputFile await update.message.reply_photo( photo=InputFile(img_path), # PTB handles opening/closing when path is given caption=caption_text, # Consider adding reply_to_message_id=message_id for clearer context in group chats # reply_to_message_id=message_id ) sent_count += 1 logger.debug(f"Sent photo {os.path.basename(img_path)}") except TelegramError as e: # Log specific Telegram errors (e.g., file too large, chat not found, blocked by user) logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.") except Exception as e: # Catch other potential errors during sending logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) if len(generated_images) > 1: await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.") finally: # Clean up the generated image file after attempting to send it try: if os.path.exists(img_path): os.remove(img_path) logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}") except OSError as e: logger.error(f"Error deleting generated image file '{img_path}': {e}") send_time = loop.time() - send_start_time logger.info(f"Finished sending results for message {message_id}. Sent {sent_count}/{len(generated_images)} images in {send_time:.2f} seconds.") # --- Final Cleanup --- # Clean up the originally downloaded user image try: if os.path.exists(temp_user_image_path): os.remove(temp_user_image_path) logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}") except OSError as e: # Log error but don't bother user if temporary file cleanup fails logger.error(f"Error cleaning up user image '{temp_user_image_path}': {e}", exc_info=True) # --- Main Execution --- if __name__ == "__main__": logger.info("Initializing Telegram Bot Application...") # Token check is done in Config class now, exiting if it fails there. try: # Build the application instance app = Application.builder().token(Config.TELEGRAM_TOKEN).build() # Add the handler for messages containing both a photo and a caption # This ensures the 'handle_message' function only receives relevant updates app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message)) logger.info("Bot application built successfully. Starting polling for updates...") # Start the bot polling for updates indefinitely app.run_polling(allowed_updates=Update.ALL_TYPES) # Consider specifying only needed updates except Exception as e: # Catch potential errors during application build or startup logger.critical(f"FATAL error initializing or running the bot application: {e}", exc_info=True) # Exit with a non-zero code to indicate failure exit(1)