Spaces:
Sleeping
Sleeping
File size: 3,450 Bytes
3ec2de0 a1fa3ac a705074 c3ebde1 6190b71 c3ebde1 a705074 c3ebde1 3ec2de0 c3ebde1 a705074 5b5c90e c3ebde1 5b5c90e c3ebde1 6190b71 3ec2de0 5b5c90e c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 3ec2de0 2da25e6 6190b71 5b5c90e c3ebde1 5b5c90e a705074 c3ebde1 2da25e6 c3ebde1 5b5c90e c3ebde1 5b5c90e c3ebde1 5b5c90e a705074 3ec2de0 2da25e6 5b5c90e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# terabox_utils.py
import re
import requests
import asyncio
from functools import partial
import logging
import os
import time
import math
import config
logger = logging.getLogger(__name__)
os.makedirs("downloads", exist_ok=True)
# --- Utility Functions ---
def format_bytes(size_bytes: int) -> str:
if size_bytes <= 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = min(int(math.log(size_bytes, 1024)), len(size_name) - 1)
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
async def extract_terabox_short_id(full_url: str):
patterns = [
r'terabox\.com/s/([a-zA-Z0-9_-]+)',
r'teraboxapp\.com/s/([a-zA-Z0-9_-]+)',
r'1024tera\.com/s/([a-zA-Z0-9_-]+)',
r'freeterabox\.com/s/([a-zA-Z0-9_-]+)',
r'terabox\.com/sharing/link\?surl=([a-zA-Z0-9_-]+)',
r'terasharelink\.com/s/([a-zA-Z0-9_-]+)',
r'4funbox\.com/s/([a-zA-Z0-9_-]+)',
r'box-links\.com/s/([a-zA-Z0-9_-]+)'
]
for p in patterns:
if m := re.search(p, full_url, re.I):
return m.group(1)
return None
async def get_final_url_and_filename(original_link: str):
payload = {"link": original_link}
headers = {"User-Agent": "Mozilla/5.0"}
try:
loop = asyncio.get_event_loop()
r = await loop.run_in_executor(
None,
partial(requests.post, config.TERABOX_WORKER_URL, headers=headers, json=payload, timeout=30)
)
r.raise_for_status()
data = r.json()
dl = data.get("proxy_url")
fn = data.get("file_name")
if data.get("error") or not dl or not fn:
return None, None, data.get('error', 'Worker returned incomplete data.')
return dl, fn, None
except Exception as e:
return None, None, str(e)
async def download_terabox_file(url: str, filename: str):
safe_fn = re.sub(r'[\\/*?:"<>|]', "_", filename)[:200]
download_path = os.path.join("downloads", f"{time.time()}_{safe_fn}")
try:
loop = asyncio.get_event_loop()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept": "*/*",
"Referer": "https://teraboxapp.com/"
}
r = await loop.run_in_executor(
None,
partial(requests.get, url, headers=headers, stream=True, timeout=(10, 300), allow_redirects=True)
)
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
dl_size = 0
last_update = time.time()
with open(download_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
dl_size += len(chunk)
# For debug:
if time.time() - last_update > 2.5:
pct = (dl_size / total_size * 100) if total_size > 0 else 0
logger.info(
f"Downloading: {safe_fn} → {format_bytes(dl_size)}/{format_bytes(total_size)} ({pct:.1f}%)"
)
last_update = time.time()
return download_path, None
except Exception as e:
if os.path.exists(download_path):
os.remove(download_path)
return None, str(e) |