Lommds / app.py
understanding's picture
Create app.py
06b39f0 verified
raw
history blame
18.2 kB
import os
import asyncio
import json
from telethon import TelegramClient, events
from telethon.tl import types # For DocumentAttributeFilename
from PIL import Image, ImageDraw, ImageFont
# --- Configuration ---
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
API_ID = os.environ.get('API_ID')
API_HASH = os.environ.get('API_HASH')
BOT_TOKEN = os.environ.get('BOT_TOKEN')
if not all([API_ID, API_HASH, BOT_TOKEN]):
print("CRITICAL: API_ID, API_HASH, and BOT_TOKEN environment variables must be set.")
exit(1)
try:
API_ID = int(API_ID)
except ValueError:
print("CRITICAL: API_ID must be an integer.")
exit(1)
# Adjusted session path
SESSION_NAME = 'session/image_bot_session' # Store session file in 'session' subdirectory
bot = TelegramClient(SESSION_NAME, API_ID, API_HASH)
# --- Paths and Constants ---
DATA_DIR = "data"
DOWNLOADS_DIR = "downloads" # Changed from TEMP_DIR
CONFIG_FILENAME = "config.json"
TEMPLATE_FILENAME = "user_template.png"
USER_FONT_FILENAME = "user_font.ttf"
CONFIG_PATH = os.path.join(DATA_DIR, CONFIG_FILENAME)
TEMPLATE_PATH = os.path.join(DATA_DIR, TEMPLATE_FILENAME)
USER_FONT_PATH = os.path.join(DATA_DIR, USER_FONT_FILENAME)
# Ensure base directories exist (Dockerfile should create them, but this is a good fallback)
# The Dockerfile now creates these, but os.makedirs with exist_ok=True is safe.
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(DOWNLOADS_DIR, exist_ok=True)
os.makedirs("session", exist_ok=True) # Ensure the 'session' directory for Telethon exists
# --- Default Configuration (if config.json is missing/invalid) ---
DEFAULT_CONFIG = {
"raw_image_box": {"x": 40, "y": 40, "width": 1000, "height": 780},
"caption_area": {"x": 60, "y": 870},
"caption_font_size": 45,
"caption_color": "white",
"line_spacing": 10
}
bot_config = {}
# --- Config Management ---
def load_bot_config():
global bot_config
try:
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, 'r') as f:
loaded_values = json.load(f)
bot_config = DEFAULT_CONFIG.copy()
bot_config.update(loaded_values)
else:
bot_config = DEFAULT_CONFIG.copy()
save_bot_config() # Ensure file exists with all keys
print(f"Configuration loaded/initialized: {bot_config}")
except Exception as e:
print(f"Error loading config, using defaults: {e}")
bot_config = DEFAULT_CONFIG.copy()
save_bot_config()
def save_bot_config():
global bot_config
try:
with open(CONFIG_PATH, 'w') as f:
json.dump(bot_config, f, indent=4)
except Exception as e:
print(f"Error saving config: {e}")
# --- User State Management ---
user_sessions = {}
# --- Helper Functions (Image Processing) ---
async def generate_image_with_frame(raw_image_path, frame_template_path, caption_text, chat_id):
output_filename = f"final_image_{chat_id}.png"
# Save final output to DOWNLOADS_DIR before sending, then clean up
output_path = os.path.join(DOWNLOADS_DIR, output_filename)
cfg_raw_box = bot_config.get("raw_image_box", DEFAULT_CONFIG["raw_image_box"])
cfg_caption_area = bot_config.get("caption_area", DEFAULT_CONFIG["caption_area"])
cfg_font_size = bot_config.get("caption_font_size", DEFAULT_CONFIG["caption_font_size"])
cfg_caption_color = bot_config.get("caption_color", DEFAULT_CONFIG["caption_color"])
cfg_line_spacing = bot_config.get("line_spacing", DEFAULT_CONFIG["line_spacing"])
try:
raw_img = Image.open(raw_image_path).convert("RGBA")
frame_template_img = Image.open(frame_template_path).convert("RGBA")
raw_img_copy = raw_img.copy()
raw_img_copy.thumbnail((cfg_raw_box['width'], cfg_raw_box['height']), Image.LANCZOS)
final_img = frame_template_img.copy()
paste_x = cfg_raw_box['x'] + (cfg_raw_box['width'] - raw_img_copy.width) // 2
paste_y = cfg_raw_box['y'] + (cfg_raw_box['height'] - raw_img_copy.height) // 2
final_img.paste(raw_img_copy, (paste_x, paste_y), raw_img_copy if raw_img_copy.mode == 'RGBA' else None)
draw = ImageDraw.Draw(final_img)
font_to_use = None
if os.path.exists(USER_FONT_PATH):
try:
font_to_use = ImageFont.truetype(USER_FONT_PATH, cfg_font_size)
except IOError:
print(f"User font at '{USER_FONT_PATH}' found but couldn't be loaded. Using fallback.")
if not font_to_use:
try:
font_to_use = ImageFont.truetype("arial.ttf", cfg_font_size)
except IOError:
print("Arial.ttf not found, using Pillow's load_default(). This may look very basic.")
font_to_use = ImageFont.load_default().font_variant(size=cfg_font_size)
lines = caption_text.split('\n')
current_y = cfg_caption_area['y']
for line in lines:
try:
line_bbox = draw.textbbox((0, 0), line, font=font_to_use)
line_height = line_bbox[3] - line_bbox[1]
except AttributeError:
(text_width, text_height) = draw.textsize(line, font=font_to_use)
line_height = text_height
draw.text((cfg_caption_area['x'], current_y), line, font=font_to_use, fill=cfg_caption_color)
current_y += line_height + cfg_line_spacing
final_img.convert("RGB").save(output_path, "PNG")
return output_path
except FileNotFoundError as fnf_err:
print(f"Error: Image file not found during processing. Template: {frame_template_path}, Raw: {raw_image_path}. Error: {fnf_err}")
return None
except Exception as e:
print(f"Error during image processing: {e}")
import traceback
traceback.print_exc()
return None
# --- Bot Event Handlers --- (Mostly same as before, checking paths for DOWNLOADS_DIR)
@bot.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
start_message = (
"Welcome! This bot helps you create images with a template.\n\n"
"**Initial Setup (if first time or after a reset):**\n"
"1. `/settemplate` - Upload your frame template image (PNG/JPG).\n"
"2. `/setfont` - Upload your .ttf font file for captions.\n"
"3. `/help_config` - Learn how to set layout parameters.\n"
"4. `/setconfig <key> <value>` - Adjust layout as needed.\n\n"
"**Regular Use:**\n"
"`/create` - Start creating a new image.\n"
"`/viewconfig` - See current layout settings.\n"
"`/cancel` - Cancel current operation.\n\n"
"**Note on Hosting:** The `data/` and `session/` directories should ideally use persistent storage on your hosting platform. "
"If using ephemeral storage (common on free tiers), your uploaded template, font, config, and session might be lost on restarts, requiring setup again."
)
await event.reply(start_message)
@bot.on(events.NewMessage(pattern='/settemplate'))
async def set_template_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'awaiting_template_image', 'data': {}}
await event.reply("Please send your frame template image (e.g., a PNG or JPG).")
@bot.on(events.NewMessage(pattern='/setfont'))
async def set_font_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'awaiting_font_file', 'data': {}}
await event.reply("Please send your `.ttf` font file as a document/file.")
@bot.on(events.NewMessage(pattern='/viewconfig'))
async def view_config_handler(event):
load_bot_config()
config_str = "Current Bot Configuration:\n"
config_str += f"- Font: {'User font set (' + USER_FONT_FILENAME + ')' if os.path.exists(USER_FONT_PATH) else 'Using fallback font (Arial or Pillow default)'}\n"
for key, value in bot_config.items():
config_str += f"- {key}: {json.dumps(value)}\n"
await event.reply(config_str)
@bot.on(events.NewMessage(pattern='/help_config'))
async def help_config_handler(event):
await event.reply(
"**Configuration Commands (`/setconfig <key> <value>`):**\n"
"`raw_image_box_x <num>` (e.g., 40)\n"
"`raw_image_box_y <num>`\n"
"`raw_image_box_width <num>`\n"
"`raw_image_box_height <num>`\n"
"`caption_area_x <num>`\n"
"`caption_area_y <num>`\n"
"`caption_font_size <num>` (e.g., 45)\n"
"`caption_color <name_or_hex>` (e.g., white or #FFFFFF)\n"
"`line_spacing <num>` (e.g., 10)\n\n"
"Example: `/setconfig caption_font_size 50`\n"
"Use `/viewconfig` to see current values. "
"These settings define how the raw image and caption are placed on your template."
)
@bot.on(events.NewMessage(pattern=r'/setconfig (\w+) (.+)'))
async def set_config_handler(event):
key_to_set = event.pattern_match.group(1).strip()
value_str = event.pattern_match.group(2).strip()
load_bot_config()
original_config_copy = json.dumps(bot_config)
updated = False
sub_key = None # Initialize sub_key
try:
if key_to_set.startswith("raw_image_box_"):
sub_key = key_to_set.replace("raw_image_box_", "")
if "raw_image_box" in bot_config and sub_key in bot_config["raw_image_box"]:
bot_config["raw_image_box"][sub_key] = int(value_str)
updated = True
elif key_to_set.startswith("caption_area_"):
sub_key = key_to_set.replace("caption_area_", "")
if "caption_area" in bot_config and sub_key in bot_config["caption_area"]:
bot_config["caption_area"][sub_key] = int(value_str)
updated = True
elif key_to_set in ["caption_font_size", "line_spacing"]:
bot_config[key_to_set] = int(value_str)
updated = True
elif key_to_set == "caption_color":
bot_config[key_to_set] = value_str
updated = True
if updated:
if json.dumps(bot_config) != original_config_copy:
save_bot_config()
# Construct reply value more carefully
reply_value = value_str
if sub_key: # if it was a nested key
if key_to_set.startswith("raw_image_box_"):
reply_value = bot_config.get("raw_image_box", {}).get(sub_key, value_str)
elif key_to_set.startswith("caption_area_"):
reply_value = bot_config.get("caption_area", {}).get(sub_key, value_str)
else:
reply_value = bot_config.get(key_to_set, value_str)
await event.reply(f"Configuration updated: {key_to_set} = {reply_value}")
else:
await event.reply(f"Value for {key_to_set} is already {value_str}. No change made.")
else:
await event.reply(f"Unknown or non-configurable key: '{key_to_set}'. See `/help_config`.")
except ValueError:
await event.reply(f"Invalid value for '{key_to_set}'. Please provide a number where expected (e.g., for sizes, coordinates).")
except Exception as e:
await event.reply(f"Error setting config: {e}")
@bot.on(events.NewMessage(pattern='/create'))
async def create_post_handler(event):
chat_id = event.chat_id
if not os.path.exists(TEMPLATE_PATH):
await event.reply("No template found. Please set one using `/settemplate` first.")
return
if not os.path.exists(USER_FONT_PATH):
await event.reply("Warning: No user font found (use `/setfont`). A fallback font will be attempted, but results may vary.")
user_sessions[chat_id] = {'state': 'awaiting_raw_image_for_create', 'data': {}}
await event.reply("Please send the raw image you want to use.")
@bot.on(events.NewMessage(pattern='/cancel'))
async def cancel_handler(event):
chat_id = event.chat_id
if chat_id in user_sessions and user_sessions[chat_id]['state'] != 'idle':
temp_raw_path = user_sessions[chat_id].get('data', {}).get('raw_image_path')
if temp_raw_path and os.path.exists(temp_raw_path):
try: os.remove(temp_raw_path)
except OSError as e: print(f"Error deleting temp file {temp_raw_path}: {e}")
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
await event.reply("Operation cancelled.")
else:
await event.reply("Nothing to cancel.")
@bot.on(events.NewMessage)
async def message_handler(event): # pylint: disable=too-many-branches
chat_id = event.chat_id
if chat_id not in user_sessions:
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
if event.text and event.text.startswith(('/', '#', '.')):
return # Let specific command handlers take precedence
current_state = user_sessions.get(chat_id, {}).get('state', 'idle')
session_data = user_sessions.get(chat_id, {}).get('data', {})
if event.document and current_state == 'awaiting_font_file':
is_ttf = False
if hasattr(event.document, 'mime_type') and \
('font' in event.document.mime_type or 'ttf' in event.document.mime_type or 'opentype' in event.document.mime_type):
is_ttf = True
if not is_ttf and hasattr(event.document, 'attributes'):
for attr in event.document.attributes:
if isinstance(attr, types.DocumentAttributeFilename) and attr.file_name.lower().endswith('.ttf'):
is_ttf = True
break
if is_ttf:
await event.reply("Downloading font file...")
try:
await bot.download_media(event.message.document, USER_FONT_PATH)
user_sessions[chat_id]['state'] = 'idle'
await event.reply(f"Font saved as '{USER_FONT_FILENAME}' in `data/` directory!")
except Exception as e:
await event.reply(f"Sorry, I couldn't save the font. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
else:
await event.reply("That doesn't look like a .ttf font file. Please send a valid .ttf font file, or use /cancel.")
return
if event.photo:
if current_state == 'awaiting_template_image':
await event.reply("Downloading template image...")
try:
await bot.download_media(event.message.photo, TEMPLATE_PATH)
user_sessions[chat_id]['state'] = 'idle'
await event.reply(f"Template saved as '{TEMPLATE_FILENAME}' in `data/` directory!")
except Exception as e:
await event.reply(f"Couldn't save template image. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
return
elif current_state == 'awaiting_raw_image_for_create':
raw_img_filename = f"raw_{chat_id}_{event.message.id}.jpg"
# Save raw images to DOWNLOADS_DIR
raw_img_temp_path = os.path.join(DOWNLOADS_DIR, raw_img_filename)
await event.reply("Downloading raw image...")
try:
await bot.download_media(event.message.photo, raw_img_temp_path)
session_data['raw_image_path'] = raw_img_temp_path
user_sessions[chat_id]['state'] = 'awaiting_caption_for_create'
await event.reply("Raw image received. Now, please send the caption text.")
except Exception as e:
await event.reply(f"Couldn't save raw image. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
return
if event.text and not event.text.startswith('/') and current_state == 'awaiting_caption_for_create':
caption_text = event.text
raw_image_path = session_data.get('raw_image_path')
if not raw_image_path or not os.path.exists(raw_image_path):
await event.reply("Error: Raw image data not found. Try `/create` again.")
user_sessions[chat_id]['state'] = 'idle'
return
processing_msg = await event.reply("⏳ Processing your image, please wait...")
final_image_path = await generate_image_with_frame(raw_image_path, TEMPLATE_PATH, caption_text, chat_id)
try: await bot.delete_messages(chat_id, processing_msg)
except Exception: pass # noqa
if final_image_path and os.path.exists(final_image_path):
try:
await bot.send_file(chat_id, final_image_path, caption="🖼️ Here's your generated image!")
except Exception as e:
await event.reply(f"Sorry, couldn't send the final image. Error: {e}")
finally:
if os.path.exists(final_image_path): os.remove(final_image_path)
else:
await event.reply("❌ Sorry, something went wrong while creating the image.")
if os.path.exists(raw_image_path): os.remove(raw_image_path)
user_sessions[chat_id]['state'] = 'idle'
user_sessions[chat_id]['data'] = {}
return
if event.text and not event.text.startswith('/') and current_state != 'idle':
await event.reply(f"I'm currently waiting for: `{current_state}`. If you're stuck, try `/cancel`.")
# --- Main Bot Execution ---
async def main():
print("Bot starting...")
load_bot_config()
try:
print(f"Connecting to Telegram with API_ID: {str(API_ID)[:4]}... (masked)")
await bot.start(bot_token=BOT_TOKEN)
print("Bot is connected and running!")
me = await bot.get_me()
print(f"Logged in as: {me.username} (ID: {me.id})")
await bot.run_until_disconnected()
except Exception as e:
print(f"CRITICAL: An error occurred while running the bot: {e}")
import traceback
traceback.print_exc()
finally:
print("Bot is stopping...")
if bot.is_connected():
await bot.disconnect()
print("Bot has disconnected.")
if __name__ == '__main__':
asyncio.run(main())