File size: 8,990 Bytes
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
 
 
 
 
33d2419
bc7bf12
 
 
33d2419
bc7bf12
33d2419
bc7bf12
 
33d2419
bc7bf12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d2419
bc7bf12
 
 
 
 
 
 
19fc4ac
bc7bf12
33d2419
bc7bf12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
 
33d2419
bc7bf12
 
 
 
 
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
 
 
33d2419
bc7bf12
 
 
 
 
 
33d2419
ac6e69e
bc7bf12
 
33d2419
bc7bf12
 
 
ac6e69e
bc7bf12
 
 
33d2419
bc7bf12
 
 
 
 
 
ac6e69e
bc7bf12
 
 
 
 
 
ac6e69e
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
33d2419
bc7bf12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d2419
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import asyncio import logging import os import re import uuid from typing import Dict

from aiogram import Bot, Dispatcher, types, F from aiogram.filters import CommandStart from aiogram.types import Message, FSInputFile from aiogram.enums import ParseMode from aiogram.exceptions import TelegramBadRequest from aiogram.client.default import DefaultBotProperties

import config import terabox_utils as terabox import db_utils

--- Setup ---

os.makedirs("logs", exist_ok=True) os.makedirs("downloads", exist_ok=True)

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8') ] ) logger = logging.getLogger("bot")

bot = Bot( token=config.BOT_TOKEN, default=DefaultBotProperties( parse_mode=ParseMode.HTML, link_preview_is_disabled=True ) )

dp = Dispatcher() TASK_QUEUE = asyncio.Queue() BATCH_JOBS: Dict[str, Dict] = {}

--- Worker Logic ---

async def link_processor_worker(worker_id: int): logger.info(f"Link worker #{worker_id} started.") while True: task = await TASK_QUEUE.get() batch_id, original_link = task["batch_id"], task["original_link"] batch_info = BATCH_JOBS.get(batch_id)

if not batch_info:
        TASK_QUEUE.task_done()
        continue

    file_info = None
    error_msg = f"An unknown error occurred for link: {original_link}"

    try:
        short_id = await terabox.extract_terabox_short_id(original_link)
        if not short_id:
            raise ValueError("Invalid Terabox link format.")

        cached_file = await db_utils.get_cached_file(short_id)
        if cached_file:
            logger.info(f"Cache hit for {short_id}. Reusing file_id.")
            file_info = {'cached': True, 'short_id': short_id, **cached_file}
        else:
            download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
            if error:
                raise ValueError(error)

            local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
                bot,
                batch_info["source_chat_id"],
                batch_info["status_message"].message_id,
                download_url,
                raw_filename
            )
            if download_error:
                raise ValueError(download_error)

            file_info = {
                'cached': False,
                'path': local_filepath,
                'name': raw_filename,
                'size': os.path.getsize(local_filepath),
                'short_id': short_id,
                'thumb': thumb_path
            }

    except Exception as e:
        error_msg = str(e)

    async with batch_info["lock"]:
        batch_info["processed_links"] += 1
        if file_info:
            batch_info["successful_downloads"].append(file_info)
        else:
            batch_info["failed_links"].append({"link": original_link, "error": error_msg})

        processed, total = batch_info['processed_links'], batch_info['total_links']
        try:
            await batch_info["status_message"].edit_text(
                f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
            )
        except TelegramBadRequest:
            pass

        if processed == total:
            logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
            await handle_batch_completion(batch_id)

    TASK_QUEUE.task_done()

async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message: file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name']) media_type = file_info.get('type') filename = file_info.get('name') or file_info.get('filename')

video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
audio_exts = ('.mp3', '.flac', '.ogg', '.wav')

sent_message = None
if media_type == 'video' or filename.lower().endswith(video_exts):
    sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
    media_type = 'video'
elif media_type == 'audio' or filename.lower().endswith(audio_exts):
    sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
    media_type = 'audio'
else:
    sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
    media_type = 'document'

if not file_info.get('cached') and sent_message:
    file_id_to_cache = getattr(sent_message, media_type).file_id
    await db_utils.add_to_cache(
        file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
    )

return sent_message

async def handle_batch_completion(batch_id: str): batch = BATCH_JOBS.get(batch_id) if not batch: return

status_msg = batch["status_message"]
successful_downloads = batch["successful_downloads"]

try:
    if not successful_downloads:
        failed_links_text = "\n".join(
            [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
        )
        await status_msg.edit_text(
            f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
        )
        return

    await status_msg.edit_text(
        f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
    )

    if config.FORWARD_CHANNEL_ID:
        await bot.forward_message(
            config.FORWARD_CHANNEL_ID,
            batch["source_chat_id"],
            batch["source_message_id"]
        )
        for item in successful_downloads:
            caption = f"`{item.get('name') or item.get('filename')}`"
            await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
            await asyncio.sleep(1)

    for item in successful_downloads:
        caption = f"`{item.get('name') or item.get('filename')}`"
        await send_and_cache_file(batch["source_chat_id"], item, caption)

    summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
    if batch["failed_links"]:
        summary += f"\n❌ {len(batch['failed_links'])} links failed."

    await status_msg.edit_text(summary)

except Exception as e:
    logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
    await status_msg.edit_text(
        f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
    )
finally:
    for item in successful_downloads:
        if not item.get('cached') and os.path.exists(item['path']):
            os.remove(item['path'])
        if item.get('thumb') and os.path.exists(item['thumb']):
            os.remove(item['thumb'])

    del BATCH_JOBS[batch_id]

--- Handlers ---

@dp.message(CommandStart()) async def start_handler(message: Message): text = ( "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n" "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n" f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n" "πŸš€ Supports batch links & auto-caching.\n" "πŸ’Ύ Fast & lightweight.\n\n" "βœ… <i>Just send your link below ⬇️</i>" ) await bot.send_message( chat_id=message.chat.id, text=text, parse_mode=ParseMode.HTML, disable_web_page_preview=True )

@dp.message(F.text | F.caption) async def message_handler(message: Message): if message.text and message.text.startswith('/'): return

await db_utils.add_or_update_user_db(
    message.from_user.id,
    message.from_user.username,
    message.from_user.first_name
)

links = list(set(re.findall(r'https?://[^\s<>\"\']+', message.text or message.caption or "")))
terabox_links = [link for link in links if any(domain in link for domain in [
    "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
    "freeterabox.com", "4funbox.com", "box-links.com"
])]

if not terabox_links:
    return

batch_id = str(uuid.uuid4())
status_msg = await bot.send_message(
    chat_id=message.chat.id,
    text=f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.",
    parse_mode=ParseMode.HTML,
    disable_web_page_preview=True
)

BATCH_JOBS[batch_id] = {
    "total_links": len(terabox_links),
    "processed_links": 0,
    "successful_downloads": [],
    "failed_links": [],
    "source_chat_id": message.chat.id,
    "source_user_id": message.from_user.id,
    "source_message_id": message.message_id,
    "status_message": status_msg,
    "lock": asyncio.Lock()
}

for link in terabox_links:
    await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})

--- Startup ---

async def on_startup(dispatcher: Dispatcher): await db_utils.initialize_database() for i in range(config.CONCURRENT_WORKERS): asyncio.create_task(link_processor_worker(i + 1)) logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")

def start_bot(): dp.startup.register(on_startup) dp.message.register(message_handler) dp.message.register(start_handler, CommandStart()) return dp, bot