Spaces:
Runtime error
Runtime error
import os | |
import asyncio | |
import json | |
from telethon import TelegramClient, events | |
from telethon.tl import types # For DocumentAttributeFilename | |
from PIL import Image, ImageDraw, ImageFont | |
# --- Configuration --- | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
except ImportError: | |
pass | |
API_ID = os.environ.get('API_ID') | |
API_HASH = os.environ.get('API_HASH') | |
BOT_TOKEN = os.environ.get('BOT_TOKEN') | |
if not all([API_ID, API_HASH, BOT_TOKEN]): | |
print("CRITICAL: API_ID, API_HASH, and BOT_TOKEN environment variables must be set.") | |
exit(1) | |
try: | |
API_ID = int(API_ID) | |
except ValueError: | |
print("CRITICAL: API_ID must be an integer.") | |
exit(1) | |
# Adjusted session path | |
SESSION_NAME = 'session/image_bot_session' # Store session file in 'session' subdirectory | |
bot = TelegramClient(SESSION_NAME, API_ID, API_HASH) | |
# --- Paths and Constants --- | |
DATA_DIR = "data" | |
DOWNLOADS_DIR = "downloads" # Changed from TEMP_DIR | |
CONFIG_FILENAME = "config.json" | |
TEMPLATE_FILENAME = "user_template.png" | |
USER_FONT_FILENAME = "user_font.ttf" | |
CONFIG_PATH = os.path.join(DATA_DIR, CONFIG_FILENAME) | |
TEMPLATE_PATH = os.path.join(DATA_DIR, TEMPLATE_FILENAME) | |
USER_FONT_PATH = os.path.join(DATA_DIR, USER_FONT_FILENAME) | |
# Ensure base directories exist (Dockerfile should create them, but this is a good fallback) | |
# The Dockerfile now creates these, but os.makedirs with exist_ok=True is safe. | |
os.makedirs(DATA_DIR, exist_ok=True) | |
os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
os.makedirs("session", exist_ok=True) # Ensure the 'session' directory for Telethon exists | |
# --- Default Configuration (if config.json is missing/invalid) --- | |
DEFAULT_CONFIG = { | |
"raw_image_box": {"x": 40, "y": 40, "width": 1000, "height": 780}, | |
"caption_area": {"x": 60, "y": 870}, | |
"caption_font_size": 45, | |
"caption_color": "white", | |
"line_spacing": 10 | |
} | |
bot_config = {} | |
# --- Config Management --- | |
def load_bot_config(): | |
global bot_config | |
try: | |
if os.path.exists(CONFIG_PATH): | |
with open(CONFIG_PATH, 'r') as f: | |
loaded_values = json.load(f) | |
bot_config = DEFAULT_CONFIG.copy() | |
bot_config.update(loaded_values) | |
else: | |
bot_config = DEFAULT_CONFIG.copy() | |
save_bot_config() # Ensure file exists with all keys | |
print(f"Configuration loaded/initialized: {bot_config}") | |
except Exception as e: | |
print(f"Error loading config, using defaults: {e}") | |
bot_config = DEFAULT_CONFIG.copy() | |
save_bot_config() | |
def save_bot_config(): | |
global bot_config | |
try: | |
with open(CONFIG_PATH, 'w') as f: | |
json.dump(bot_config, f, indent=4) | |
except Exception as e: | |
print(f"Error saving config: {e}") | |
# --- User State Management --- | |
user_sessions = {} | |
# --- Helper Functions (Image Processing) --- | |
async def generate_image_with_frame(raw_image_path, frame_template_path, caption_text, chat_id): | |
output_filename = f"final_image_{chat_id}.png" | |
# Save final output to DOWNLOADS_DIR before sending, then clean up | |
output_path = os.path.join(DOWNLOADS_DIR, output_filename) | |
cfg_raw_box = bot_config.get("raw_image_box", DEFAULT_CONFIG["raw_image_box"]) | |
cfg_caption_area = bot_config.get("caption_area", DEFAULT_CONFIG["caption_area"]) | |
cfg_font_size = bot_config.get("caption_font_size", DEFAULT_CONFIG["caption_font_size"]) | |
cfg_caption_color = bot_config.get("caption_color", DEFAULT_CONFIG["caption_color"]) | |
cfg_line_spacing = bot_config.get("line_spacing", DEFAULT_CONFIG["line_spacing"]) | |
try: | |
raw_img = Image.open(raw_image_path).convert("RGBA") | |
frame_template_img = Image.open(frame_template_path).convert("RGBA") | |
raw_img_copy = raw_img.copy() | |
raw_img_copy.thumbnail((cfg_raw_box['width'], cfg_raw_box['height']), Image.LANCZOS) | |
final_img = frame_template_img.copy() | |
paste_x = cfg_raw_box['x'] + (cfg_raw_box['width'] - raw_img_copy.width) // 2 | |
paste_y = cfg_raw_box['y'] + (cfg_raw_box['height'] - raw_img_copy.height) // 2 | |
final_img.paste(raw_img_copy, (paste_x, paste_y), raw_img_copy if raw_img_copy.mode == 'RGBA' else None) | |
draw = ImageDraw.Draw(final_img) | |
font_to_use = None | |
if os.path.exists(USER_FONT_PATH): | |
try: | |
font_to_use = ImageFont.truetype(USER_FONT_PATH, cfg_font_size) | |
except IOError: | |
print(f"User font at '{USER_FONT_PATH}' found but couldn't be loaded. Using fallback.") | |
if not font_to_use: | |
try: | |
font_to_use = ImageFont.truetype("arial.ttf", cfg_font_size) | |
except IOError: | |
print("Arial.ttf not found, using Pillow's load_default(). This may look very basic.") | |
font_to_use = ImageFont.load_default().font_variant(size=cfg_font_size) | |
lines = caption_text.split('\n') | |
current_y = cfg_caption_area['y'] | |
for line in lines: | |
try: | |
line_bbox = draw.textbbox((0, 0), line, font=font_to_use) | |
line_height = line_bbox[3] - line_bbox[1] | |
except AttributeError: | |
(text_width, text_height) = draw.textsize(line, font=font_to_use) | |
line_height = text_height | |
draw.text((cfg_caption_area['x'], current_y), line, font=font_to_use, fill=cfg_caption_color) | |
current_y += line_height + cfg_line_spacing | |
final_img.convert("RGB").save(output_path, "PNG") | |
return output_path | |
except FileNotFoundError as fnf_err: | |
print(f"Error: Image file not found during processing. Template: {frame_template_path}, Raw: {raw_image_path}. Error: {fnf_err}") | |
return None | |
except Exception as e: | |
print(f"Error during image processing: {e}") | |
import traceback | |
traceback.print_exc() | |
return None | |
# --- Bot Event Handlers --- (Mostly same as before, checking paths for DOWNLOADS_DIR) | |
async def start_handler(event): | |
chat_id = event.chat_id | |
user_sessions[chat_id] = {'state': 'idle', 'data': {}} | |
start_message = ( | |
"Welcome! This bot helps you create images with a template.\n\n" | |
"**Initial Setup (if first time or after a reset):**\n" | |
"1. `/settemplate` - Upload your frame template image (PNG/JPG).\n" | |
"2. `/setfont` - Upload your .ttf font file for captions.\n" | |
"3. `/help_config` - Learn how to set layout parameters.\n" | |
"4. `/setconfig <key> <value>` - Adjust layout as needed.\n\n" | |
"**Regular Use:**\n" | |
"`/create` - Start creating a new image.\n" | |
"`/viewconfig` - See current layout settings.\n" | |
"`/cancel` - Cancel current operation.\n\n" | |
"**Note on Hosting:** The `data/` and `session/` directories should ideally use persistent storage on your hosting platform. " | |
"If using ephemeral storage (common on free tiers), your uploaded template, font, config, and session might be lost on restarts, requiring setup again." | |
) | |
await event.reply(start_message) | |
async def set_template_handler(event): | |
chat_id = event.chat_id | |
user_sessions[chat_id] = {'state': 'awaiting_template_image', 'data': {}} | |
await event.reply("Please send your frame template image (e.g., a PNG or JPG).") | |
async def set_font_handler(event): | |
chat_id = event.chat_id | |
user_sessions[chat_id] = {'state': 'awaiting_font_file', 'data': {}} | |
await event.reply("Please send your `.ttf` font file as a document/file.") | |
async def view_config_handler(event): | |
load_bot_config() | |
config_str = "Current Bot Configuration:\n" | |
config_str += f"- Font: {'User font set (' + USER_FONT_FILENAME + ')' if os.path.exists(USER_FONT_PATH) else 'Using fallback font (Arial or Pillow default)'}\n" | |
for key, value in bot_config.items(): | |
config_str += f"- {key}: {json.dumps(value)}\n" | |
await event.reply(config_str) | |
async def help_config_handler(event): | |
await event.reply( | |
"**Configuration Commands (`/setconfig <key> <value>`):**\n" | |
"`raw_image_box_x <num>` (e.g., 40)\n" | |
"`raw_image_box_y <num>`\n" | |
"`raw_image_box_width <num>`\n" | |
"`raw_image_box_height <num>`\n" | |
"`caption_area_x <num>`\n" | |
"`caption_area_y <num>`\n" | |
"`caption_font_size <num>` (e.g., 45)\n" | |
"`caption_color <name_or_hex>` (e.g., white or #FFFFFF)\n" | |
"`line_spacing <num>` (e.g., 10)\n\n" | |
"Example: `/setconfig caption_font_size 50`\n" | |
"Use `/viewconfig` to see current values. " | |
"These settings define how the raw image and caption are placed on your template." | |
) | |
async def set_config_handler(event): | |
key_to_set = event.pattern_match.group(1).strip() | |
value_str = event.pattern_match.group(2).strip() | |
load_bot_config() | |
original_config_copy = json.dumps(bot_config) | |
updated = False | |
sub_key = None # Initialize sub_key | |
try: | |
if key_to_set.startswith("raw_image_box_"): | |
sub_key = key_to_set.replace("raw_image_box_", "") | |
if "raw_image_box" in bot_config and sub_key in bot_config["raw_image_box"]: | |
bot_config["raw_image_box"][sub_key] = int(value_str) | |
updated = True | |
elif key_to_set.startswith("caption_area_"): | |
sub_key = key_to_set.replace("caption_area_", "") | |
if "caption_area" in bot_config and sub_key in bot_config["caption_area"]: | |
bot_config["caption_area"][sub_key] = int(value_str) | |
updated = True | |
elif key_to_set in ["caption_font_size", "line_spacing"]: | |
bot_config[key_to_set] = int(value_str) | |
updated = True | |
elif key_to_set == "caption_color": | |
bot_config[key_to_set] = value_str | |
updated = True | |
if updated: | |
if json.dumps(bot_config) != original_config_copy: | |
save_bot_config() | |
# Construct reply value more carefully | |
reply_value = value_str | |
if sub_key: # if it was a nested key | |
if key_to_set.startswith("raw_image_box_"): | |
reply_value = bot_config.get("raw_image_box", {}).get(sub_key, value_str) | |
elif key_to_set.startswith("caption_area_"): | |
reply_value = bot_config.get("caption_area", {}).get(sub_key, value_str) | |
else: | |
reply_value = bot_config.get(key_to_set, value_str) | |
await event.reply(f"Configuration updated: {key_to_set} = {reply_value}") | |
else: | |
await event.reply(f"Value for {key_to_set} is already {value_str}. No change made.") | |
else: | |
await event.reply(f"Unknown or non-configurable key: '{key_to_set}'. See `/help_config`.") | |
except ValueError: | |
await event.reply(f"Invalid value for '{key_to_set}'. Please provide a number where expected (e.g., for sizes, coordinates).") | |
except Exception as e: | |
await event.reply(f"Error setting config: {e}") | |
async def create_post_handler(event): | |
chat_id = event.chat_id | |
if not os.path.exists(TEMPLATE_PATH): | |
await event.reply("No template found. Please set one using `/settemplate` first.") | |
return | |
if not os.path.exists(USER_FONT_PATH): | |
await event.reply("Warning: No user font found (use `/setfont`). A fallback font will be attempted, but results may vary.") | |
user_sessions[chat_id] = {'state': 'awaiting_raw_image_for_create', 'data': {}} | |
await event.reply("Please send the raw image you want to use.") | |
async def cancel_handler(event): | |
chat_id = event.chat_id | |
if chat_id in user_sessions and user_sessions[chat_id]['state'] != 'idle': | |
temp_raw_path = user_sessions[chat_id].get('data', {}).get('raw_image_path') | |
if temp_raw_path and os.path.exists(temp_raw_path): | |
try: os.remove(temp_raw_path) | |
except OSError as e: print(f"Error deleting temp file {temp_raw_path}: {e}") | |
user_sessions[chat_id] = {'state': 'idle', 'data': {}} | |
await event.reply("Operation cancelled.") | |
else: | |
await event.reply("Nothing to cancel.") | |
async def message_handler(event): # pylint: disable=too-many-branches | |
chat_id = event.chat_id | |
if chat_id not in user_sessions: | |
user_sessions[chat_id] = {'state': 'idle', 'data': {}} | |
if event.text and event.text.startswith(('/', '#', '.')): | |
return # Let specific command handlers take precedence | |
current_state = user_sessions.get(chat_id, {}).get('state', 'idle') | |
session_data = user_sessions.get(chat_id, {}).get('data', {}) | |
if event.document and current_state == 'awaiting_font_file': | |
is_ttf = False | |
if hasattr(event.document, 'mime_type') and \ | |
('font' in event.document.mime_type or 'ttf' in event.document.mime_type or 'opentype' in event.document.mime_type): | |
is_ttf = True | |
if not is_ttf and hasattr(event.document, 'attributes'): | |
for attr in event.document.attributes: | |
if isinstance(attr, types.DocumentAttributeFilename) and attr.file_name.lower().endswith('.ttf'): | |
is_ttf = True | |
break | |
if is_ttf: | |
await event.reply("Downloading font file...") | |
try: | |
await bot.download_media(event.message.document, USER_FONT_PATH) | |
user_sessions[chat_id]['state'] = 'idle' | |
await event.reply(f"Font saved as '{USER_FONT_FILENAME}' in `data/` directory!") | |
except Exception as e: | |
await event.reply(f"Sorry, I couldn't save the font. Error: {e}") | |
user_sessions[chat_id]['state'] = 'idle' | |
else: | |
await event.reply("That doesn't look like a .ttf font file. Please send a valid .ttf font file, or use /cancel.") | |
return | |
if event.photo: | |
if current_state == 'awaiting_template_image': | |
await event.reply("Downloading template image...") | |
try: | |
await bot.download_media(event.message.photo, TEMPLATE_PATH) | |
user_sessions[chat_id]['state'] = 'idle' | |
await event.reply(f"Template saved as '{TEMPLATE_FILENAME}' in `data/` directory!") | |
except Exception as e: | |
await event.reply(f"Couldn't save template image. Error: {e}") | |
user_sessions[chat_id]['state'] = 'idle' | |
return | |
elif current_state == 'awaiting_raw_image_for_create': | |
raw_img_filename = f"raw_{chat_id}_{event.message.id}.jpg" | |
# Save raw images to DOWNLOADS_DIR | |
raw_img_temp_path = os.path.join(DOWNLOADS_DIR, raw_img_filename) | |
await event.reply("Downloading raw image...") | |
try: | |
await bot.download_media(event.message.photo, raw_img_temp_path) | |
session_data['raw_image_path'] = raw_img_temp_path | |
user_sessions[chat_id]['state'] = 'awaiting_caption_for_create' | |
await event.reply("Raw image received. Now, please send the caption text.") | |
except Exception as e: | |
await event.reply(f"Couldn't save raw image. Error: {e}") | |
user_sessions[chat_id]['state'] = 'idle' | |
return | |
if event.text and not event.text.startswith('/') and current_state == 'awaiting_caption_for_create': | |
caption_text = event.text | |
raw_image_path = session_data.get('raw_image_path') | |
if not raw_image_path or not os.path.exists(raw_image_path): | |
await event.reply("Error: Raw image data not found. Try `/create` again.") | |
user_sessions[chat_id]['state'] = 'idle' | |
return | |
processing_msg = await event.reply("⏳ Processing your image, please wait...") | |
final_image_path = await generate_image_with_frame(raw_image_path, TEMPLATE_PATH, caption_text, chat_id) | |
try: await bot.delete_messages(chat_id, processing_msg) | |
except Exception: pass # noqa | |
if final_image_path and os.path.exists(final_image_path): | |
try: | |
await bot.send_file(chat_id, final_image_path, caption="🖼️ Here's your generated image!") | |
except Exception as e: | |
await event.reply(f"Sorry, couldn't send the final image. Error: {e}") | |
finally: | |
if os.path.exists(final_image_path): os.remove(final_image_path) | |
else: | |
await event.reply("❌ Sorry, something went wrong while creating the image.") | |
if os.path.exists(raw_image_path): os.remove(raw_image_path) | |
user_sessions[chat_id]['state'] = 'idle' | |
user_sessions[chat_id]['data'] = {} | |
return | |
if event.text and not event.text.startswith('/') and current_state != 'idle': | |
await event.reply(f"I'm currently waiting for: `{current_state}`. If you're stuck, try `/cancel`.") | |
# --- Main Bot Execution --- | |
async def main(): | |
print("Bot starting...") | |
load_bot_config() | |
try: | |
print(f"Connecting to Telegram with API_ID: {str(API_ID)[:4]}... (masked)") | |
await bot.start(bot_token=BOT_TOKEN) | |
print("Bot is connected and running!") | |
me = await bot.get_me() | |
print(f"Logged in as: {me.username} (ID: {me.id})") | |
await bot.run_until_disconnected() | |
except Exception as e: | |
print(f"CRITICAL: An error occurred while running the bot: {e}") | |
import traceback | |
traceback.print_exc() | |
finally: | |
print("Bot is stopping...") | |
if bot.is_connected(): | |
await bot.disconnect() | |
print("Bot has disconnected.") | |
if __name__ == '__main__': | |
asyncio.run(main()) | |