Spaces:
Runtime error
Runtime error
File size: 18,180 Bytes
06b39f0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
import os
import asyncio
import json
from telethon import TelegramClient, events
from telethon.tl import types # For DocumentAttributeFilename
from PIL import Image, ImageDraw, ImageFont
# --- Configuration ---
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
API_ID = os.environ.get('API_ID')
API_HASH = os.environ.get('API_HASH')
BOT_TOKEN = os.environ.get('BOT_TOKEN')
if not all([API_ID, API_HASH, BOT_TOKEN]):
print("CRITICAL: API_ID, API_HASH, and BOT_TOKEN environment variables must be set.")
exit(1)
try:
API_ID = int(API_ID)
except ValueError:
print("CRITICAL: API_ID must be an integer.")
exit(1)
# Adjusted session path
SESSION_NAME = 'session/image_bot_session' # Store session file in 'session' subdirectory
bot = TelegramClient(SESSION_NAME, API_ID, API_HASH)
# --- Paths and Constants ---
DATA_DIR = "data"
DOWNLOADS_DIR = "downloads" # Changed from TEMP_DIR
CONFIG_FILENAME = "config.json"
TEMPLATE_FILENAME = "user_template.png"
USER_FONT_FILENAME = "user_font.ttf"
CONFIG_PATH = os.path.join(DATA_DIR, CONFIG_FILENAME)
TEMPLATE_PATH = os.path.join(DATA_DIR, TEMPLATE_FILENAME)
USER_FONT_PATH = os.path.join(DATA_DIR, USER_FONT_FILENAME)
# Ensure base directories exist (Dockerfile should create them, but this is a good fallback)
# The Dockerfile now creates these, but os.makedirs with exist_ok=True is safe.
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(DOWNLOADS_DIR, exist_ok=True)
os.makedirs("session", exist_ok=True) # Ensure the 'session' directory for Telethon exists
# --- Default Configuration (if config.json is missing/invalid) ---
DEFAULT_CONFIG = {
"raw_image_box": {"x": 40, "y": 40, "width": 1000, "height": 780},
"caption_area": {"x": 60, "y": 870},
"caption_font_size": 45,
"caption_color": "white",
"line_spacing": 10
}
bot_config = {}
# --- Config Management ---
def load_bot_config():
global bot_config
try:
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, 'r') as f:
loaded_values = json.load(f)
bot_config = DEFAULT_CONFIG.copy()
bot_config.update(loaded_values)
else:
bot_config = DEFAULT_CONFIG.copy()
save_bot_config() # Ensure file exists with all keys
print(f"Configuration loaded/initialized: {bot_config}")
except Exception as e:
print(f"Error loading config, using defaults: {e}")
bot_config = DEFAULT_CONFIG.copy()
save_bot_config()
def save_bot_config():
global bot_config
try:
with open(CONFIG_PATH, 'w') as f:
json.dump(bot_config, f, indent=4)
except Exception as e:
print(f"Error saving config: {e}")
# --- User State Management ---
user_sessions = {}
# --- Helper Functions (Image Processing) ---
async def generate_image_with_frame(raw_image_path, frame_template_path, caption_text, chat_id):
output_filename = f"final_image_{chat_id}.png"
# Save final output to DOWNLOADS_DIR before sending, then clean up
output_path = os.path.join(DOWNLOADS_DIR, output_filename)
cfg_raw_box = bot_config.get("raw_image_box", DEFAULT_CONFIG["raw_image_box"])
cfg_caption_area = bot_config.get("caption_area", DEFAULT_CONFIG["caption_area"])
cfg_font_size = bot_config.get("caption_font_size", DEFAULT_CONFIG["caption_font_size"])
cfg_caption_color = bot_config.get("caption_color", DEFAULT_CONFIG["caption_color"])
cfg_line_spacing = bot_config.get("line_spacing", DEFAULT_CONFIG["line_spacing"])
try:
raw_img = Image.open(raw_image_path).convert("RGBA")
frame_template_img = Image.open(frame_template_path).convert("RGBA")
raw_img_copy = raw_img.copy()
raw_img_copy.thumbnail((cfg_raw_box['width'], cfg_raw_box['height']), Image.LANCZOS)
final_img = frame_template_img.copy()
paste_x = cfg_raw_box['x'] + (cfg_raw_box['width'] - raw_img_copy.width) // 2
paste_y = cfg_raw_box['y'] + (cfg_raw_box['height'] - raw_img_copy.height) // 2
final_img.paste(raw_img_copy, (paste_x, paste_y), raw_img_copy if raw_img_copy.mode == 'RGBA' else None)
draw = ImageDraw.Draw(final_img)
font_to_use = None
if os.path.exists(USER_FONT_PATH):
try:
font_to_use = ImageFont.truetype(USER_FONT_PATH, cfg_font_size)
except IOError:
print(f"User font at '{USER_FONT_PATH}' found but couldn't be loaded. Using fallback.")
if not font_to_use:
try:
font_to_use = ImageFont.truetype("arial.ttf", cfg_font_size)
except IOError:
print("Arial.ttf not found, using Pillow's load_default(). This may look very basic.")
font_to_use = ImageFont.load_default().font_variant(size=cfg_font_size)
lines = caption_text.split('\n')
current_y = cfg_caption_area['y']
for line in lines:
try:
line_bbox = draw.textbbox((0, 0), line, font=font_to_use)
line_height = line_bbox[3] - line_bbox[1]
except AttributeError:
(text_width, text_height) = draw.textsize(line, font=font_to_use)
line_height = text_height
draw.text((cfg_caption_area['x'], current_y), line, font=font_to_use, fill=cfg_caption_color)
current_y += line_height + cfg_line_spacing
final_img.convert("RGB").save(output_path, "PNG")
return output_path
except FileNotFoundError as fnf_err:
print(f"Error: Image file not found during processing. Template: {frame_template_path}, Raw: {raw_image_path}. Error: {fnf_err}")
return None
except Exception as e:
print(f"Error during image processing: {e}")
import traceback
traceback.print_exc()
return None
# --- Bot Event Handlers --- (Mostly same as before, checking paths for DOWNLOADS_DIR)
@bot.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
start_message = (
"Welcome! This bot helps you create images with a template.\n\n"
"**Initial Setup (if first time or after a reset):**\n"
"1. `/settemplate` - Upload your frame template image (PNG/JPG).\n"
"2. `/setfont` - Upload your .ttf font file for captions.\n"
"3. `/help_config` - Learn how to set layout parameters.\n"
"4. `/setconfig <key> <value>` - Adjust layout as needed.\n\n"
"**Regular Use:**\n"
"`/create` - Start creating a new image.\n"
"`/viewconfig` - See current layout settings.\n"
"`/cancel` - Cancel current operation.\n\n"
"**Note on Hosting:** The `data/` and `session/` directories should ideally use persistent storage on your hosting platform. "
"If using ephemeral storage (common on free tiers), your uploaded template, font, config, and session might be lost on restarts, requiring setup again."
)
await event.reply(start_message)
@bot.on(events.NewMessage(pattern='/settemplate'))
async def set_template_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'awaiting_template_image', 'data': {}}
await event.reply("Please send your frame template image (e.g., a PNG or JPG).")
@bot.on(events.NewMessage(pattern='/setfont'))
async def set_font_handler(event):
chat_id = event.chat_id
user_sessions[chat_id] = {'state': 'awaiting_font_file', 'data': {}}
await event.reply("Please send your `.ttf` font file as a document/file.")
@bot.on(events.NewMessage(pattern='/viewconfig'))
async def view_config_handler(event):
load_bot_config()
config_str = "Current Bot Configuration:\n"
config_str += f"- Font: {'User font set (' + USER_FONT_FILENAME + ')' if os.path.exists(USER_FONT_PATH) else 'Using fallback font (Arial or Pillow default)'}\n"
for key, value in bot_config.items():
config_str += f"- {key}: {json.dumps(value)}\n"
await event.reply(config_str)
@bot.on(events.NewMessage(pattern='/help_config'))
async def help_config_handler(event):
await event.reply(
"**Configuration Commands (`/setconfig <key> <value>`):**\n"
"`raw_image_box_x <num>` (e.g., 40)\n"
"`raw_image_box_y <num>`\n"
"`raw_image_box_width <num>`\n"
"`raw_image_box_height <num>`\n"
"`caption_area_x <num>`\n"
"`caption_area_y <num>`\n"
"`caption_font_size <num>` (e.g., 45)\n"
"`caption_color <name_or_hex>` (e.g., white or #FFFFFF)\n"
"`line_spacing <num>` (e.g., 10)\n\n"
"Example: `/setconfig caption_font_size 50`\n"
"Use `/viewconfig` to see current values. "
"These settings define how the raw image and caption are placed on your template."
)
@bot.on(events.NewMessage(pattern=r'/setconfig (\w+) (.+)'))
async def set_config_handler(event):
key_to_set = event.pattern_match.group(1).strip()
value_str = event.pattern_match.group(2).strip()
load_bot_config()
original_config_copy = json.dumps(bot_config)
updated = False
sub_key = None # Initialize sub_key
try:
if key_to_set.startswith("raw_image_box_"):
sub_key = key_to_set.replace("raw_image_box_", "")
if "raw_image_box" in bot_config and sub_key in bot_config["raw_image_box"]:
bot_config["raw_image_box"][sub_key] = int(value_str)
updated = True
elif key_to_set.startswith("caption_area_"):
sub_key = key_to_set.replace("caption_area_", "")
if "caption_area" in bot_config and sub_key in bot_config["caption_area"]:
bot_config["caption_area"][sub_key] = int(value_str)
updated = True
elif key_to_set in ["caption_font_size", "line_spacing"]:
bot_config[key_to_set] = int(value_str)
updated = True
elif key_to_set == "caption_color":
bot_config[key_to_set] = value_str
updated = True
if updated:
if json.dumps(bot_config) != original_config_copy:
save_bot_config()
# Construct reply value more carefully
reply_value = value_str
if sub_key: # if it was a nested key
if key_to_set.startswith("raw_image_box_"):
reply_value = bot_config.get("raw_image_box", {}).get(sub_key, value_str)
elif key_to_set.startswith("caption_area_"):
reply_value = bot_config.get("caption_area", {}).get(sub_key, value_str)
else:
reply_value = bot_config.get(key_to_set, value_str)
await event.reply(f"Configuration updated: {key_to_set} = {reply_value}")
else:
await event.reply(f"Value for {key_to_set} is already {value_str}. No change made.")
else:
await event.reply(f"Unknown or non-configurable key: '{key_to_set}'. See `/help_config`.")
except ValueError:
await event.reply(f"Invalid value for '{key_to_set}'. Please provide a number where expected (e.g., for sizes, coordinates).")
except Exception as e:
await event.reply(f"Error setting config: {e}")
@bot.on(events.NewMessage(pattern='/create'))
async def create_post_handler(event):
chat_id = event.chat_id
if not os.path.exists(TEMPLATE_PATH):
await event.reply("No template found. Please set one using `/settemplate` first.")
return
if not os.path.exists(USER_FONT_PATH):
await event.reply("Warning: No user font found (use `/setfont`). A fallback font will be attempted, but results may vary.")
user_sessions[chat_id] = {'state': 'awaiting_raw_image_for_create', 'data': {}}
await event.reply("Please send the raw image you want to use.")
@bot.on(events.NewMessage(pattern='/cancel'))
async def cancel_handler(event):
chat_id = event.chat_id
if chat_id in user_sessions and user_sessions[chat_id]['state'] != 'idle':
temp_raw_path = user_sessions[chat_id].get('data', {}).get('raw_image_path')
if temp_raw_path and os.path.exists(temp_raw_path):
try: os.remove(temp_raw_path)
except OSError as e: print(f"Error deleting temp file {temp_raw_path}: {e}")
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
await event.reply("Operation cancelled.")
else:
await event.reply("Nothing to cancel.")
@bot.on(events.NewMessage)
async def message_handler(event): # pylint: disable=too-many-branches
chat_id = event.chat_id
if chat_id not in user_sessions:
user_sessions[chat_id] = {'state': 'idle', 'data': {}}
if event.text and event.text.startswith(('/', '#', '.')):
return # Let specific command handlers take precedence
current_state = user_sessions.get(chat_id, {}).get('state', 'idle')
session_data = user_sessions.get(chat_id, {}).get('data', {})
if event.document and current_state == 'awaiting_font_file':
is_ttf = False
if hasattr(event.document, 'mime_type') and \
('font' in event.document.mime_type or 'ttf' in event.document.mime_type or 'opentype' in event.document.mime_type):
is_ttf = True
if not is_ttf and hasattr(event.document, 'attributes'):
for attr in event.document.attributes:
if isinstance(attr, types.DocumentAttributeFilename) and attr.file_name.lower().endswith('.ttf'):
is_ttf = True
break
if is_ttf:
await event.reply("Downloading font file...")
try:
await bot.download_media(event.message.document, USER_FONT_PATH)
user_sessions[chat_id]['state'] = 'idle'
await event.reply(f"Font saved as '{USER_FONT_FILENAME}' in `data/` directory!")
except Exception as e:
await event.reply(f"Sorry, I couldn't save the font. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
else:
await event.reply("That doesn't look like a .ttf font file. Please send a valid .ttf font file, or use /cancel.")
return
if event.photo:
if current_state == 'awaiting_template_image':
await event.reply("Downloading template image...")
try:
await bot.download_media(event.message.photo, TEMPLATE_PATH)
user_sessions[chat_id]['state'] = 'idle'
await event.reply(f"Template saved as '{TEMPLATE_FILENAME}' in `data/` directory!")
except Exception as e:
await event.reply(f"Couldn't save template image. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
return
elif current_state == 'awaiting_raw_image_for_create':
raw_img_filename = f"raw_{chat_id}_{event.message.id}.jpg"
# Save raw images to DOWNLOADS_DIR
raw_img_temp_path = os.path.join(DOWNLOADS_DIR, raw_img_filename)
await event.reply("Downloading raw image...")
try:
await bot.download_media(event.message.photo, raw_img_temp_path)
session_data['raw_image_path'] = raw_img_temp_path
user_sessions[chat_id]['state'] = 'awaiting_caption_for_create'
await event.reply("Raw image received. Now, please send the caption text.")
except Exception as e:
await event.reply(f"Couldn't save raw image. Error: {e}")
user_sessions[chat_id]['state'] = 'idle'
return
if event.text and not event.text.startswith('/') and current_state == 'awaiting_caption_for_create':
caption_text = event.text
raw_image_path = session_data.get('raw_image_path')
if not raw_image_path or not os.path.exists(raw_image_path):
await event.reply("Error: Raw image data not found. Try `/create` again.")
user_sessions[chat_id]['state'] = 'idle'
return
processing_msg = await event.reply("⏳ Processing your image, please wait...")
final_image_path = await generate_image_with_frame(raw_image_path, TEMPLATE_PATH, caption_text, chat_id)
try: await bot.delete_messages(chat_id, processing_msg)
except Exception: pass # noqa
if final_image_path and os.path.exists(final_image_path):
try:
await bot.send_file(chat_id, final_image_path, caption="🖼️ Here's your generated image!")
except Exception as e:
await event.reply(f"Sorry, couldn't send the final image. Error: {e}")
finally:
if os.path.exists(final_image_path): os.remove(final_image_path)
else:
await event.reply("❌ Sorry, something went wrong while creating the image.")
if os.path.exists(raw_image_path): os.remove(raw_image_path)
user_sessions[chat_id]['state'] = 'idle'
user_sessions[chat_id]['data'] = {}
return
if event.text and not event.text.startswith('/') and current_state != 'idle':
await event.reply(f"I'm currently waiting for: `{current_state}`. If you're stuck, try `/cancel`.")
# --- Main Bot Execution ---
async def main():
print("Bot starting...")
load_bot_config()
try:
print(f"Connecting to Telegram with API_ID: {str(API_ID)[:4]}... (masked)")
await bot.start(bot_token=BOT_TOKEN)
print("Bot is connected and running!")
me = await bot.get_me()
print(f"Logged in as: {me.username} (ID: {me.id})")
await bot.run_until_disconnected()
except Exception as e:
print(f"CRITICAL: An error occurred while running the bot: {e}")
import traceback
traceback.print_exc()
finally:
print("Bot is stopping...")
if bot.is_connected():
await bot.disconnect()
print("Bot has disconnected.")
if __name__ == '__main__':
asyncio.run(main())
|