Spaces:
Sleeping
Sleeping
# bot.py | |
import asyncio | |
import logging | |
import os | |
import re | |
import uuid | |
import time | |
from typing import Dict | |
from aiogram import Bot, Dispatcher, types, F | |
from aiogram.filters import CommandStart | |
from aiogram.types import Message, FSInputFile | |
from aiogram.enums import ParseMode | |
from aiogram.exceptions import TelegramBadRequest | |
from aiogram.client.default import DefaultBotProperties | |
import config | |
import terabox_utils as terabox | |
import db_utils | |
# --- Setup --- | |
os.makedirs("logs", exist_ok=True) | |
os.makedirs("downloads", exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger("bot") | |
bot = Bot( | |
token=config.BOT_TOKEN, | |
default=DefaultBotProperties( | |
parse_mode=ParseMode.HTML, | |
link_preview_is_disabled=True | |
) | |
) | |
dp = Dispatcher() | |
TASK_QUEUE = asyncio.Queue() | |
BATCH_JOBS: Dict[str, Dict] = {} | |
# --- Worker Logic --- | |
async def link_processor_worker(worker_id: int): | |
logger.info(f"Link worker #{worker_id} started.") | |
while True: | |
task = await TASK_QUEUE.get() | |
batch_id, original_link = task["batch_id"], task["original_link"] | |
batch_info = BATCH_JOBS.get(batch_id) | |
if not batch_info: | |
TASK_QUEUE.task_done() | |
continue | |
file_info = None | |
error_msg = f"An unknown error occurred for link: {original_link}" | |
try: | |
short_id = await terabox.extract_terabox_short_id(original_link) | |
if not short_id: | |
raise ValueError("Invalid Terabox link format.") | |
cached_file = await db_utils.get_cached_file(short_id) | |
if cached_file: | |
logger.info(f"Cache hit for {short_id}. Reusing file_id.") | |
file_info = {'cached': True, 'short_id': short_id, **cached_file} | |
else: | |
download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link) | |
if error: | |
raise ValueError(error) | |
local_filepath, download_error = await terabox.download_terabox_file(download_url, raw_filename) | |
if download_error: | |
raise ValueError(download_error) | |
file_info = { | |
'cached': False, | |
'path': local_filepath, | |
'name': raw_filename, | |
'size': os.path.getsize(local_filepath), | |
'short_id': short_id | |
} | |
except Exception as e: | |
error_msg = str(e) | |
async with batch_info["lock"]: | |
batch_info["processed_links"] += 1 | |
if file_info: | |
batch_info["successful_downloads"].append(file_info) | |
else: | |
batch_info["failed_links"].append({"link": original_link, "error": error_msg}) | |
processed, total = batch_info['processed_links'], batch_info['total_links'] | |
eta_sec = max(1, int((time.time() - batch_info["start_time"]) / processed * (total - processed))) | |
try: | |
await bot.edit_message_text( | |
chat_id=batch_info["source_chat_id"], | |
message_id=batch_info["status_message_id"], | |
text=( | |
f"βοΈ Batch <code>{batch_id[:6]}</code> in progress... " | |
f"{processed}/{total} links processed.\n" | |
f"β³ ETA: ~{eta_sec}s" | |
) | |
) | |
except TelegramBadRequest: | |
pass | |
if processed == total: | |
logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.") | |
await handle_batch_completion(batch_id) | |
TASK_QUEUE.task_done() | |
async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message: | |
file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name']) | |
media_type = file_info.get('type') | |
filename = file_info.get('name') or file_info.get('filename') | |
sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption) | |
if not file_info.get('cached') and sent_message: | |
file_id_to_cache = sent_message.document.file_id | |
await db_utils.add_to_cache( | |
file_info['short_id'], file_id_to_cache, filename, 'document', file_info['size'] | |
) | |
return sent_message | |
async def handle_batch_completion(batch_id: str): | |
batch = BATCH_JOBS.get(batch_id) | |
if not batch: | |
return | |
successful_downloads = batch["successful_downloads"] | |
try: | |
if not successful_downloads: | |
failed_links_text = "\n".join( | |
[f"- {x['link']} β {x['error']}" for x in batch['failed_links']] | |
) | |
await bot.edit_message_text( | |
chat_id=batch["source_chat_id"], | |
message_id=batch["status_message_id"], | |
text=( | |
f"β Batch <code>{batch_id[:6]}</code> failed. No files could be processed.\n" | |
f"<b>Details:</b>\n{failed_links_text}" | |
) | |
) | |
return | |
await bot.edit_message_text( | |
chat_id=batch["source_chat_id"], | |
message_id=batch["status_message_id"], | |
text=( | |
f"β Batch <code>{batch_id[:6]}</code> downloaded. Sending {len(successful_downloads)} files..." | |
) | |
) | |
# Forward original message if enabled | |
if config.FORWARD_CHANNEL_ID: | |
await bot.forward_message( | |
config.FORWARD_CHANNEL_ID, | |
batch["source_chat_id"], | |
batch["source_message_id"] | |
) | |
for item in successful_downloads: | |
caption = f"`{item.get('name') or item.get('filename')}`" | |
await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption) | |
await asyncio.sleep(1) | |
for item in successful_downloads: | |
caption = f"`{item.get('name') or item.get('filename')}`" | |
await send_and_cache_file(batch["source_chat_id"], item, caption) | |
summary = f"β Batch <code>{batch_id[:6]}</code> complete: {len(successful_downloads)} files sent." | |
if batch["failed_links"]: | |
summary += f"\nβ {len(batch['failed_links'])} links failed." | |
await bot.edit_message_text( | |
chat_id=batch["source_chat_id"], | |
message_id=batch["status_message_id"], | |
text=summary | |
) | |
except Exception as e: | |
logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True) | |
await bot.edit_message_text( | |
chat_id=batch["source_chat_id"], | |
message_id=batch["status_message_id"], | |
text=( | |
f"β οΈ A critical error occurred while sending files for batch <code>{batch_id[:6]}</code>." | |
) | |
) | |
finally: | |
for item in successful_downloads: | |
if not item.get('cached') and os.path.exists(item['path']): | |
os.remove(item['path']) | |
del BATCH_JOBS[batch_id] | |
# --- Handlers --- | |
async def start_handler(message: Message): | |
text = ( | |
"π <b>Welcome to the Terabox Downloader Bot!</b>\n\n" | |
"π₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n" | |
f"π’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n" | |
"π Supports batch links & auto-caching.\n" | |
"πΎ Fast & lightweight.\n\n" | |
"β <i>Just send your link below β¬οΈ</i>" | |
) | |
await bot.send_message( | |
chat_id=message.chat.id, | |
text=text, | |
reply_to_message_id=message.message_id | |
) | |
async def message_handler(message: Message): | |
if message.text and message.text.startswith('/'): | |
return | |
await db_utils.add_or_update_user_db( | |
message.from_user.id, | |
message.from_user.username, | |
message.from_user.first_name | |
) | |
links = list(set(re.findall(r'https?://[^\s<>"\']+', message.text or message.caption or ""))) | |
terabox_links = [link for link in links if any(domain in link for domain in [ | |
"terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com", | |
"freeterabox.com", "4funbox.com", "box-links.com" | |
])] | |
if not terabox_links: | |
return | |
batch_id = str(uuid.uuid4()) | |
status_msg = await bot.send_message( | |
chat_id=message.chat.id, | |
text=( | |
f"β Found {len(terabox_links)} links. Queued as batch <code>{batch_id[:6]}</code>.\n" | |
f"β³ Please wait..." | |
), | |
reply_to_message_id=message.message_id | |
) | |
BATCH_JOBS[batch_id] = { | |
"total_links": len(terabox_links), | |
"processed_links": 0, | |
"successful_downloads": [], | |
"failed_links": [], | |
"source_chat_id": message.chat.id, | |
"source_user_id": message.from_user.id, | |
"source_message_id": message.message_id, | |
"status_message_id": status_msg.message_id, | |
"lock": asyncio.Lock(), | |
"start_time": time.time() | |
} | |
for link in terabox_links: | |
await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link}) | |
# --- Startup --- | |
async def on_startup(dispatcher: Dispatcher): | |
await db_utils.initialize_database() | |
for i in range(config.CONCURRENT_WORKERS): | |
asyncio.create_task(link_processor_worker(i + 1)) | |
logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.") | |
def start_bot(): | |
dp.startup.register(on_startup) | |
dp.message.register(message_handler) | |
dp.message.register(start_handler, CommandStart()) | |
return dp, bot | |