Spaces:
Sleeping
Sleeping
# bot.py | |
import asyncio | |
import logging | |
import os | |
import re | |
import uuid | |
import time | |
from typing import Dict | |
from aiogram import Bot, Dispatcher, F | |
from aiogram.filters import CommandStart, Command | |
from aiogram.types import Message, FSInputFile | |
from aiogram.enums import ParseMode | |
from aiogram.exceptions import TelegramBadRequest | |
from aiogram.client.default import DefaultBotProperties | |
import config | |
import terabox_utils as terabox | |
import db_utils | |
# --- Setup --- | |
os.makedirs("logs", exist_ok=True) | |
os.makedirs("downloads", exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler("logs/bot.log", mode="a", encoding="utf-8"), | |
], | |
) | |
logger = logging.getLogger(__name__) | |
bot = Bot( | |
token=config.BOT_TOKEN, | |
default=DefaultBotProperties(parse_mode=ParseMode.HTML), | |
) | |
dp = Dispatcher() | |
TASK_QUEUE = asyncio.Queue() | |
BATCH_JOBS: Dict[str, Dict] = {} | |
# --- Worker --- | |
async def link_processor_worker(worker_id: int): | |
logger.info(f"Link worker #{worker_id} started.") | |
while True: | |
task = await TASK_QUEUE.get() | |
batch_id, original_link = task["batch_id"], task["original_link"] | |
batch_info = BATCH_JOBS.get(batch_id) | |
if not batch_info: | |
TASK_QUEUE.task_done() | |
continue | |
file_info = None | |
error_msg = f"Unknown error for link: {original_link}" | |
try: | |
short_id = await terabox.extract_terabox_short_id(original_link) | |
if not short_id: | |
raise ValueError("Invalid Terabox link format.") | |
cached_file = await db_utils.get_cached_file(short_id) | |
if cached_file: | |
logger.info(f"Cache hit for {short_id}. Reusing file_id.") | |
file_info = {"cached": True, **cached_file} | |
else: | |
download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link) | |
if error: | |
raise ValueError(error) | |
local_filepath, thumb_path, download_error = await terabox.download_terabox_file( | |
bot, | |
batch_info["source_chat_id"], | |
batch_info["status_message"].message_id, | |
download_url, | |
raw_filename, | |
) | |
if download_error: | |
raise ValueError(download_error) | |
file_info = { | |
"cached": False, | |
"path": local_filepath, | |
"name": raw_filename, | |
"size": os.path.getsize(local_filepath), | |
"short_id": short_id, | |
"thumb": thumb_path, | |
} | |
except Exception as e: | |
error_msg = str(e) | |
async with batch_info["lock"]: | |
batch_info["processed_links"] += 1 | |
if file_info: | |
batch_info["successful_downloads"].append(file_info) | |
else: | |
batch_info["failed_links"].append({"link": original_link, "error": error_msg}) | |
processed, total = batch_info["processed_links"], batch_info["total_links"] | |
try: | |
await batch_info["status_message"].edit_text( | |
f"βοΈ Batch `{batch_id[:6]}` progress: {processed}/{total} links processed.\n" | |
f"ETA: {(total - processed) * 3} sec β³" | |
) | |
except TelegramBadRequest: | |
pass | |
if processed == total: | |
logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.") | |
await handle_batch_completion(batch_id) | |
TASK_QUEUE.task_done() | |
# --- Send & Cache --- | |
async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message: | |
file_path_or_id = file_info.get("file_id") or FSInputFile(file_info["path"], filename=file_info["name"]) | |
media_type = file_info.get("type") | |
filename = file_info.get("name") or file_info.get("filename") | |
video_exts = (".mp4", ".mkv", ".mov", ".avi", ".webm") | |
audio_exts = (".mp3", ".flac", ".ogg", ".wav") | |
sent_message = None | |
if media_type == "video" or filename.lower().endswith(video_exts): | |
sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True) | |
media_type = "video" | |
elif media_type == "audio" or filename.lower().endswith(audio_exts): | |
sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption) | |
media_type = "audio" | |
else: | |
sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption) | |
media_type = "document" | |
if not file_info.get("cached") and sent_message: | |
file_id_to_cache = getattr(sent_message, media_type).file_id | |
await db_utils.add_to_cache( | |
file_info["short_id"], file_id_to_cache, filename, media_type, file_info["size"] | |
) | |
return sent_message | |
# --- Batch Completion --- | |
async def handle_batch_completion(batch_id: str): | |
batch = BATCH_JOBS.get(batch_id) | |
if not batch: | |
return | |
status_msg = batch["status_message"] | |
successful_downloads = batch["successful_downloads"] | |
try: | |
if not successful_downloads: | |
failed_links_text = "\n".join([f"- {x['link']} β {x['error']}" for x in batch["failed_links"]]) | |
await status_msg.edit_text( | |
f"β Batch `{batch_id[:6]}` failed.\nDetails:\n{failed_links_text}" | |
) | |
return | |
await status_msg.edit_text( | |
f"β Batch `{batch_id[:6]}` downloaded. Sending {len(successful_downloads)} files..." | |
) | |
if config.FORWARD_CHANNEL_ID: | |
await bot.forward_message( | |
config.FORWARD_CHANNEL_ID, | |
batch["source_chat_id"], | |
batch["source_message_id"], | |
) | |
for item in successful_downloads: | |
caption = f"`{item.get('name') or item.get('filename')}`" | |
await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption) | |
await asyncio.sleep(1) | |
for item in successful_downloads: | |
caption = f"`{item.get('name') or item.get('filename')}`" | |
await send_and_cache_file(batch["source_chat_id"], item, caption) | |
summary = f"β Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent." | |
if batch["failed_links"]: | |
summary += f"\nβ {len(batch['failed_links'])} links failed." | |
await status_msg.edit_text(summary) | |
except Exception as e: | |
logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True) | |
await status_msg.edit_text( | |
f"A critical error occurred while sending files for batch `{batch_id[:6]}`." | |
) | |
finally: | |
for item in successful_downloads: | |
if not item.get("cached") and os.path.exists(item["path"]): | |
os.remove(item["path"]) | |
if item.get("thumb") and os.path.exists(item["thumb"]): | |
os.remove(item["thumb"]) | |
del BATCH_JOBS[batch_id] | |
# --- Handlers --- | |
async def start_handler(message: Message): | |
await message.reply( | |
"π <b>Welcome to Terabox Bot!</b>\n\n" | |
"π₯ Send me any <b>Terabox</b> links.\n" | |
"β I will download and send the files to you.\n\n" | |
"βΉοΈ Use /status to check current batch progress.\n\n" | |
"Happy sharing!" | |
) | |
async def status_handler(message: Message): | |
active_batches = [] | |
for batch_id, batch in BATCH_JOBS.items(): | |
processed = batch["processed_links"] | |
total = batch["total_links"] | |
active_batches.append(f"Batch `{batch_id[:6]}`: {processed}/{total}") | |
if not active_batches: | |
await message.reply("β No active batches. All clear.") | |
else: | |
await message.reply("π Active batches:\n" + "\n".join(active_batches)) | |
async def message_handler(message: Message): | |
if message.text and message.text.startswith("/"): | |
return | |
await db_utils.add_or_update_user_db( | |
message.from_user.id, | |
message.from_user.username, | |
message.from_user.first_name, | |
) | |
links = list(set(re.findall(r"https?://[^\s<>'\"]+", message.text or message.caption or ""))) | |
terabox_links = [ | |
link | |
for link in links | |
if any(x in link for x in [ | |
"terabox.com", | |
"teraboxapp.com", | |
"terasharelink.com", | |
"1024tera.com", | |
"freeterabox.com", | |
"4funbox.com", | |
"box-links.com", | |
]) | |
] | |
if not terabox_links: | |
return | |
batch_id = str(uuid.uuid4()) | |
status_msg = await message.reply( | |
f"β Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.\nProcessing... β³" | |
) | |
BATCH_JOBS[batch_id] = { | |
"total_links": len(terabox_links), | |
"processed_links": 0, | |
"successful_downloads": [], | |
"failed_links": [], | |
"source_chat_id": message.chat.id, | |
"source_user_id": message.from_user.id, | |
"source_message_id": message.message_id, | |
"status_message": status_msg, | |
"lock": asyncio.Lock(), | |
} | |
for link in terabox_links: | |
await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link}) | |
# --- Startup --- | |
async def on_startup(dispatcher: Dispatcher): | |
await db_utils.initialize_database() | |
for i in range(config.CONCURRENT_WORKERS): | |
asyncio.create_task(link_processor_worker(i + 1)) | |
logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.") | |
# --- Main --- | |
def setup_dispatcher(): | |
dp.startup.register(on_startup) | |
dp.message.register(start_handler, CommandStart()) | |
dp.message.register(status_handler, Command("status")) | |
dp.message.register(message_handler) | |
return dp | |