Spaces:
Sleeping
Sleeping
File size: 1,685 Bytes
3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 a1fa3ac 3ec2de0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# terabox_utils.py
import aiohttp
import asyncio
import os
import logging
import config
async def extract_terabox_short_id(link: str) -> str:
# Simple example: extract ID from Terabox link (you can customize this!)
import re
match = re.search(r'/s/([a-zA-Z0-9]+)', link)
return match.group(1) if match else None
async def get_final_url_and_filename(link: str):
# Example worker call
try:
async with aiohttp.ClientSession() as session:
async with session.post(config.TERABOX_WORKER_URL, json={"link": link}) as resp:
if resp.status != 200:
return None, None, f"Worker error: {resp.status}"
data = await resp.json()
return data["url"], data["name"], None
except Exception as e:
logging.exception("Error in get_final_url_and_filename")
return None, None, str(e)
async def download_terabox_file(bot, chat_id, message_id, download_url, filename):
# Fast simple downloader
local_path = f"downloads/{filename}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(download_url) as resp:
with open(local_path, "wb") as f:
while True:
chunk = await resp.content.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
# Simulate thumbnail creation (skip FFMPEG encoding!)
thumb_path = None
return local_path, thumb_path, None
except Exception as e:
logging.exception("Error in download_terabox_file")
return None, None, str(e)
|