File size: 10,097 Bytes
33d2419
 
 
 
 
 
ac6e69e
 
33d2419
ac6e69e
 
33d2419
 
 
 
 
 
 
 
 
 
debb592
33d2419
 
 
 
 
ac6e69e
33d2419
 
ac6e69e
 
33d2419
 
 
 
 
ac6e69e
33d2419
 
 
ac6e69e
33d2419
 
 
ac6e69e
33d2419
 
 
 
 
 
 
 
 
 
 
 
ac6e69e
33d2419
 
 
 
 
 
 
 
debb592
ac6e69e
33d2419
 
 
 
 
 
 
 
 
 
ac6e69e
33d2419
 
 
 
 
ac6e69e
 
 
 
 
 
33d2419
 
 
 
 
 
 
 
 
 
 
 
ac6e69e
 
33d2419
 
ac6e69e
 
33d2419
 
 
 
 
debb592
33d2419
 
 
 
ac6e69e
33d2419
ac6e69e
 
 
33d2419
ac6e69e
 
33d2419
debb592
ac6e69e
33d2419
ac6e69e
 
33d2419
ac6e69e
33d2419
 
ac6e69e
33d2419
ac6e69e
33d2419
 
ac6e69e
33d2419
 
 
 
ac6e69e
33d2419
 
 
 
 
 
 
 
 
 
ac6e69e
33d2419
ac6e69e
33d2419
 
 
 
ac6e69e
33d2419
 
 
 
 
 
ac6e69e
33d2419
ac6e69e
33d2419
ac6e69e
33d2419
 
 
 
ac6e69e
33d2419
 
ac6e69e
 
 
 
33d2419
 
 
debb592
33d2419
ac6e69e
33d2419
 
 
ac6e69e
 
 
 
33d2419
 
 
ac6e69e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d2419
 
ac6e69e
33d2419
 
 
 
 
ac6e69e
33d2419
 
ac6e69e
debb592
ac6e69e
 
 
 
 
 
 
 
 
 
debb592
 
33d2419
 
 
 
 
debb592
ac6e69e
debb592
33d2419
 
 
 
 
 
 
 
 
 
ac6e69e
33d2419
 
 
 
 
ac6e69e
33d2419
 
 
 
debb592
33d2419
ac6e69e
 
33d2419
 
ac6e69e
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# bot.py
import asyncio
import logging
import os
import re
import uuid
import time
from typing import Dict

from aiogram import Bot, Dispatcher, F
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, FSInputFile
from aiogram.enums import ParseMode
from aiogram.exceptions import TelegramBadRequest
from aiogram.client.default import DefaultBotProperties

import config
import terabox_utils as terabox
import db_utils

# --- Setup ---

os.makedirs("logs", exist_ok=True)
os.makedirs("downloads", exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("logs/bot.log", mode="a", encoding="utf-8"),
    ],
)
logger = logging.getLogger(__name__)

bot = Bot(
    token=config.BOT_TOKEN,
    default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)

dp = Dispatcher()

TASK_QUEUE = asyncio.Queue()
BATCH_JOBS: Dict[str, Dict] = {}

# --- Worker ---
async def link_processor_worker(worker_id: int):
    logger.info(f"Link worker #{worker_id} started.")
    while True:
        task = await TASK_QUEUE.get()
        batch_id, original_link = task["batch_id"], task["original_link"]
        batch_info = BATCH_JOBS.get(batch_id)

        if not batch_info:
            TASK_QUEUE.task_done()
            continue

        file_info = None
        error_msg = f"Unknown error for link: {original_link}"

        try:
            short_id = await terabox.extract_terabox_short_id(original_link)
            if not short_id:
                raise ValueError("Invalid Terabox link format.")

            cached_file = await db_utils.get_cached_file(short_id)
            if cached_file:
                logger.info(f"Cache hit for {short_id}. Reusing file_id.")
                file_info = {"cached": True, **cached_file}
            else:
                download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
                if error:
                    raise ValueError(error)

                local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
                    bot,
                    batch_info["source_chat_id"],
                    batch_info["status_message"].message_id,
                    download_url,
                    raw_filename,
                )
                if download_error:
                    raise ValueError(download_error)

                file_info = {
                    "cached": False,
                    "path": local_filepath,
                    "name": raw_filename,
                    "size": os.path.getsize(local_filepath),
                    "short_id": short_id,
                    "thumb": thumb_path,
                }

        except Exception as e:
            error_msg = str(e)

        async with batch_info["lock"]:
            batch_info["processed_links"] += 1
            if file_info:
                batch_info["successful_downloads"].append(file_info)
            else:
                batch_info["failed_links"].append({"link": original_link, "error": error_msg})

            processed, total = batch_info["processed_links"], batch_info["total_links"]

            try:
                await batch_info["status_message"].edit_text(
                    f"βš™οΈ Batch `{batch_id[:6]}` progress: {processed}/{total} links processed.\n"
                    f"ETA: {(total - processed) * 3} sec ⏳"
                )
            except TelegramBadRequest:
                pass

            if processed == total:
                logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
                await handle_batch_completion(batch_id)

        TASK_QUEUE.task_done()

# --- Send & Cache ---
async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
    file_path_or_id = file_info.get("file_id") or FSInputFile(file_info["path"], filename=file_info["name"])
    media_type = file_info.get("type")
    filename = file_info.get("name") or file_info.get("filename")

    video_exts = (".mp4", ".mkv", ".mov", ".avi", ".webm")
    audio_exts = (".mp3", ".flac", ".ogg", ".wav")

    sent_message = None
    if media_type == "video" or filename.lower().endswith(video_exts):
        sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
        media_type = "video"
    elif media_type == "audio" or filename.lower().endswith(audio_exts):
        sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
        media_type = "audio"
    else:
        sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
        media_type = "document"

    if not file_info.get("cached") and sent_message:
        file_id_to_cache = getattr(sent_message, media_type).file_id
        await db_utils.add_to_cache(
            file_info["short_id"], file_id_to_cache, filename, media_type, file_info["size"]
        )

    return sent_message

# --- Batch Completion ---
async def handle_batch_completion(batch_id: str):
    batch = BATCH_JOBS.get(batch_id)
    if not batch:
        return

    status_msg = batch["status_message"]
    successful_downloads = batch["successful_downloads"]

    try:
        if not successful_downloads:
            failed_links_text = "\n".join([f"- {x['link']} β†’ {x['error']}" for x in batch["failed_links"]])
            await status_msg.edit_text(
                f"❌ Batch `{batch_id[:6]}` failed.\nDetails:\n{failed_links_text}"
            )
            return

        await status_msg.edit_text(
            f"βœ… Batch `{batch_id[:6]}` downloaded. Sending {len(successful_downloads)} files..."
        )

        if config.FORWARD_CHANNEL_ID:
            await bot.forward_message(
                config.FORWARD_CHANNEL_ID,
                batch["source_chat_id"],
                batch["source_message_id"],
            )

            for item in successful_downloads:
                caption = f"`{item.get('name') or item.get('filename')}`"
                await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
                await asyncio.sleep(1)

        for item in successful_downloads:
            caption = f"`{item.get('name') or item.get('filename')}`"
            await send_and_cache_file(batch["source_chat_id"], item, caption)

        summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
        if batch["failed_links"]:
            summary += f"\n❌ {len(batch['failed_links'])} links failed."

        await status_msg.edit_text(summary)

    except Exception as e:
        logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
        await status_msg.edit_text(
            f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
        )
    finally:
        for item in successful_downloads:
            if not item.get("cached") and os.path.exists(item["path"]):
                os.remove(item["path"])
            if item.get("thumb") and os.path.exists(item["thumb"]):
                os.remove(item["thumb"])

        del BATCH_JOBS[batch_id]

# --- Handlers ---
@dp.message(CommandStart())
async def start_handler(message: Message):
    await message.reply(
        "πŸ‘‹ <b>Welcome to Terabox Bot!</b>\n\n"
        "πŸ“₯ Send me any <b>Terabox</b> links.\n"
        "βœ… I will download and send the files to you.\n\n"
        "ℹ️ Use /status to check current batch progress.\n\n"
        "Happy sharing!"
    )

@dp.message(Command("status"))
async def status_handler(message: Message):
    active_batches = []
    for batch_id, batch in BATCH_JOBS.items():
        processed = batch["processed_links"]
        total = batch["total_links"]
        active_batches.append(f"Batch `{batch_id[:6]}`: {processed}/{total}")

    if not active_batches:
        await message.reply("βœ… No active batches. All clear.")
    else:
        await message.reply("πŸ“Š Active batches:\n" + "\n".join(active_batches))

@dp.message(F.text | F.caption)
async def message_handler(message: Message):
    if message.text and message.text.startswith("/"):
        return

    await db_utils.add_or_update_user_db(
        message.from_user.id,
        message.from_user.username,
        message.from_user.first_name,
    )

    links = list(set(re.findall(r"https?://[^\s<>'\"]+", message.text or message.caption or "")))
    terabox_links = [
        link
        for link in links
        if any(x in link for x in [
            "terabox.com",
            "teraboxapp.com",
            "terasharelink.com",
            "1024tera.com",
            "freeterabox.com",
            "4funbox.com",
            "box-links.com",
        ])
    ]

    if not terabox_links:
        return

    batch_id = str(uuid.uuid4())
    status_msg = await message.reply(
        f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.\nProcessing... ⏳"
    )

    BATCH_JOBS[batch_id] = {
        "total_links": len(terabox_links),
        "processed_links": 0,
        "successful_downloads": [],
        "failed_links": [],
        "source_chat_id": message.chat.id,
        "source_user_id": message.from_user.id,
        "source_message_id": message.message_id,
        "status_message": status_msg,
        "lock": asyncio.Lock(),
    }

    for link in terabox_links:
        await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})

# --- Startup ---
async def on_startup(dispatcher: Dispatcher):
    await db_utils.initialize_database()
    for i in range(config.CONCURRENT_WORKERS):
        asyncio.create_task(link_processor_worker(i + 1))
    logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")

# --- Main ---
def setup_dispatcher():
    dp.startup.register(on_startup)
    dp.message.register(start_handler, CommandStart())
    dp.message.register(status_handler, Command("status"))
    dp.message.register(message_handler)
    return dp