Spaces:
Running
Running
File size: 3,918 Bytes
3ec2de0 a1fa3ac a705074 c3ebde1 6190b71 c3ebde1 a705074 c3ebde1 3ec2de0 c3ebde1 a705074 2da25e6 c3ebde1 6190b71 3ec2de0 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 3ec2de0 2da25e6 6190b71 c3ebde1 2da25e6 a705074 c3ebde1 2da25e6 c3ebde1 2da25e6 c3ebde1 2da25e6 c3ebde1 2da25e6 a705074 3ec2de0 2da25e6 c3ebde1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# terabox_utils.py
import re
import requests
import asyncio
from functools import partial
import logging
import os
import time
import math
from typing import Optional, Tuple
import config
logger = logging.getLogger("terabox_utils")
os.makedirs("downloads", exist_ok=True)
# --- Utility Functions ---
def format_bytes(size_bytes: int) -> str:
if size_bytes <= 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = min(int(math.log(size_bytes, 1024)), len(size_name) - 1)
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
async def extract_terabox_short_id(full_url: str) -> Optional[str]:
patterns = [
r'terabox\.com/s/([a-zA-Z0-9_-]+)',
r'teraboxapp\.com/s/([a-zA-Z0-9_-]+)',
r'1024tera\.com/s/([a-zA-Z0-9_-]+)',
r'freeterabox\.com/s/([a-zA-Z0-9_-]+)',
r'terabox\.com/sharing/link\?surl=([a-zA-Z0-9_-]+)',
r'terasharelink\.com/s/([a-zA-Z0-9_-]+)',
r'4funbox\.com/s/([a-zA-Z0-9_-]+)',
r'box-links\.com/s/([a-zA-Z0-9_-]+)'
]
for p in patterns:
if m := re.search(p, full_url, re.I):
return m.group(1)
return None
async def get_final_url_and_filename(original_link: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
payload = {"link": original_link}
headers = {"User-Agent": "Mozilla/5.0"}
try:
loop = asyncio.get_event_loop()
r = await loop.run_in_executor(
None,
partial(requests.post, config.TERABOX_WORKER_URL, headers=headers, json=payload, timeout=30)
)
r.raise_for_status()
data = r.json()
dl = data.get("proxy_url")
fn = data.get("file_name")
if data.get("error") or not dl or not fn:
return None, None, data.get('error', 'Worker returned incomplete data.')
return dl, fn, None
except Exception as e:
return None, None, str(e)
async def download_terabox_file(bot_instance, chat_id, msg_id, url, filename):
safe_fn = re.sub(r'[\\/*?:"<>|]', "_", filename)[:200]
download_path = os.path.join("downloads", f"{chat_id}_{time.time()}_{safe_fn}")
try:
loop = asyncio.get_event_loop()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept": "*/*",
"Referer": "https://teraboxapp.com/"
}
r = await loop.run_in_executor(
None,
partial(requests.get, url, headers=headers, stream=True, timeout=(10, 300), allow_redirects=True)
)
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
dl_size = 0
last_update = time.time()
with open(download_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
dl_size += len(chunk)
if time.time() - last_update > 2.5:
pct = (dl_size / total_size * 100) if total_size > 0 else 0
prog_text = (
f"📥 <b>Downloading:</b> <code>{filename}</code>\n"
f"Progress: {format_bytes(dl_size)}/{format_bytes(total_size)} ({pct:.1f}%)"
)
try:
await bot_instance.edit_message_text(
chat_id, msg_id, prog_text, parse_mode="HTML"
)
except Exception:
pass
last_update = time.time()
return download_path, None, None
except Exception as e:
if os.path.exists(download_path):
os.remove(download_path)
return None, None, str(e) |