Spaces:
Sleeping
Sleeping
File size: 5,693 Bytes
3ec2de0 a1fa3ac a705074 c3ebde1 6190b71 c3ebde1 b0822c8 3ec2de0 c3ebde1 a705074 5b5c90e c3ebde1 b0822c8 c3ebde1 6190b71 3ec2de0 b0822c8 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 c3ebde1 a705074 3ec2de0 2da25e6 6190b71 b0822c8 c3ebde1 b0822c8 a705074 c3ebde1 b0822c8 c3ebde1 2da25e6 c3ebde1 b0822c8 c3ebde1 b0822c8 c3ebde1 b0822c8 a705074 3ec2de0 2da25e6 b0822c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# terabox_utils.py
import re
import requests
import asyncio
import os
import time
import math
import logging
from functools import partial
from typing import Optional, Tuple
import config
logger = logging.getLogger(__name__)
os.makedirs("downloads", exist_ok=True)
# --- Utility Functions ---
def format_bytes(size_bytes: int) -> str:
if size_bytes <= 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = min(int(math.log(size_bytes, 1024)), len(size_name) - 1)
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
def format_eta(seconds_left: float) -> str:
seconds_left = int(seconds_left)
h, remainder = divmod(seconds_left, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}h {m}m {s}s"
elif m > 0:
return f"{m}m {s}s"
else:
return f"{s}s"
# --- Short ID Extract ---
async def extract_terabox_short_id(full_url: str) -> Optional[str]:
patterns = [
r'terabox\.com/s/([a-zA-Z0-9_-]+)',
r'teraboxapp\.com/s/([a-zA-Z0-9_-]+)',
r'1024tera\.com/s/([a-zA-Z0-9_-]+)',
r'freeterabox\.com/s/([a-zA-Z0-9_-]+)',
r'terabox\.com/sharing/link\?surl=([a-zA-Z0-9_-]+)',
r'terasharelink\.com/s/([a-zA-Z0-9_-]+)',
r'4funbox\.com/s/([a-zA-Z0-9_-]+)',
r'box-links\.com/s/([a-zA-Z0-9_-]+)'
]
for p in patterns:
if m := re.search(p, full_url, re.I):
return m.group(1)
return None
# --- Get Final URL and Filename ---
async def get_final_url_and_filename(original_link: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
payload = {"link": original_link}
headers = {"User-Agent": "Mozilla/5.0"}
try:
loop = asyncio.get_event_loop()
r = await loop.run_in_executor(
None,
partial(requests.post, config.TERABOX_WORKER_URL, headers=headers, json=payload, timeout=30)
)
r.raise_for_status()
data = r.json()
dl = data.get("proxy_url")
fn = data.get("file_name")
if data.get("error") or not dl or not fn:
return None, None, data.get('error', 'Worker returned incomplete data.')
return dl, fn, None
except Exception as e:
return None, None, str(e)
# --- Download with Progress ---
async def download_terabox_file(bot_instance, chat_id, msg_id, url, filename) -> Tuple[Optional[str], Optional[str], Optional[str]]:
safe_fn = re.sub(r'[\\/*?:"<>|]', "_", filename)[:200]
download_path = os.path.join("downloads", f"{chat_id}_{time.time()}_{safe_fn}")
try:
loop = asyncio.get_event_loop()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept": "*/*",
"Referer": "https://teraboxapp.com/"
}
r = await loop.run_in_executor(
None,
partial(requests.get, url, headers=headers, stream=True, timeout=(10, 300), allow_redirects=True)
)
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
dl_size = 0
last_update = time.time()
start_time = time.time()
with open(download_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
dl_size += len(chunk)
elapsed = time.time() - start_time
speed = dl_size / elapsed if elapsed > 0 else 0
eta = (total_size - dl_size) / speed if speed > 0 else 0
pct = (dl_size / total_size * 100) if total_size > 0 else 0
prog_text = (
f"📥 <b>Downloading:</b> <code>{filename}</code>\n"
f"Progress: {format_bytes(dl_size)}/{format_bytes(total_size)} ({pct:.1f}%)\n"
f"Speed: {format_bytes(speed)}/s\n"
f"ETA: {format_eta(eta)}"
)
if time.time() - last_update > 2.5:
try:
await bot_instance.edit_message_text(
chat_id=chat_id,
message_id=msg_id,
text=prog_text,
parse_mode="HTML"
)
except Exception:
pass
last_update = time.time()
# Optional thumbnail generation — skip if not a video
video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
thumb_path = None
if safe_fn.lower().endswith(video_exts):
thumb_path = await generate_video_thumbnail(download_path, chat_id)
return download_path, thumb_path, None
except Exception as e:
if os.path.exists(download_path):
os.remove(download_path)
return None, None, str(e)
# --- Generate Video Thumbnail (Optional) ---
async def generate_video_thumbnail(filepath: str, chat_id: int) -> Optional[str]:
try:
import subprocess
thumb_path = os.path.join("downloads", f"{chat_id}_{time.time()}_thumb.jpg")
ffmpeg_cmd = [
"ffmpeg", "-y", "-i", filepath,
"-ss", "00:00:01.000", "-vframes", "1", thumb_path
]
subprocess.run(ffmpeg_cmd, check=True)
if os.path.exists(thumb_path):
return thumb_path
except Exception as e:
logger.warning(f"Thumbnail generation failed: {e}")
return None |