Spaces:
Paused
Paused
import asyncio | |
import aiohttp | |
import aiofiles | |
import os | |
import re | |
from typing import Optional, Union, Dict | |
from yt_dlp import YoutubeDL | |
from config import API_URL, API_KEY | |
USE_API = bool(API_URL and API_KEY) | |
_logged_api_skip = False | |
CHUNK_SIZE = 8192 | |
RETRY_DELAY = 2 | |
cookies_file = "cookies.txt" | |
download_folder = "/tmp/downloads" | |
os.makedirs(download_folder, exist_ok=True) | |
def extract_video_id(link: str) -> str: | |
if "v=" in link: | |
return link.split("v=")[-1].split("&")[0] | |
return link.split("/")[-1].split("?")[0] | |
def safe_filename(name: str) -> str: | |
return re.sub(r"[\\/*?\"<>|]", "_", name).strip()[:100] | |
def file_exists(video_id: str) -> Optional[str]: | |
for ext in ["mp3", "m4a", "webm"]: | |
path = f"{download_folder}/{video_id}.{ext}" | |
if os.path.exists(path): | |
print(f"[CACHED] Using existing file: {path}") | |
return path | |
return None | |
async def api_download_song(link: str) -> Optional[str]: | |
global _logged_api_skip | |
if not USE_API: | |
if not _logged_api_skip: | |
print("[SKIPPED] API config missing — using yt-dlp only.") | |
_logged_api_skip = True | |
return None | |
video_id = extract_video_id(link) | |
song_url = f"{API_URL}/song/{video_id}?api={API_KEY}" | |
try: | |
async with aiohttp.ClientSession() as session: | |
while True: | |
async with session.get(song_url) as response: | |
if response.status != 200: | |
print(f"[API ERROR] Status {response.status}") | |
return None | |
data = await response.json() | |
status = data.get("status", "").lower() | |
if status == "downloading": | |
await asyncio.sleep(RETRY_DELAY) | |
continue | |
elif status == "error": | |
print(f"[API ERROR] Status=error for {video_id}") | |
return None | |
elif status == "done": | |
download_url = data.get("link") | |
break | |
else: | |
print(f"[API ERROR] Unknown status: {status}") | |
return None | |
fmt = data.get("format", "mp3").lower() | |
path = f"{download_folder}/{video_id}.{fmt}" | |
async with session.get(download_url) as file_response: | |
async with aiofiles.open(path, "wb") as f: | |
while True: | |
chunk = await file_response.content.read(CHUNK_SIZE) | |
if not chunk: | |
break | |
await f.write(chunk) | |
return path | |
except Exception as e: | |
print(f"[API Download Error] {e}") | |
return None | |
def _download_ytdlp(link: str, opts: Dict) -> Optional[str]: | |
try: | |
with YoutubeDL(opts) as ydl: | |
info = ydl.extract_info(link, download=False) | |
ext = info.get("ext", "webm") | |
vid = info.get("id") | |
path = f"{download_folder}/{vid}.{ext}" | |
if os.path.exists(path): | |
return path | |
ydl.download([link]) | |
return path | |
except Exception as e: | |
print(f"[yt-dlp Error] {e}") | |
return None | |
async def yt_dlp_download(link: str, type: str, format_id: str = None, title: str = None) -> Optional[str]: | |
loop = asyncio.get_running_loop() | |
if type == "audio": | |
opts = { | |
"format": "bestaudio/best", | |
"outtmpl": f"{download_folder}/%(id)s.%(ext)s", | |
"quiet": True, | |
"no_warnings": True, | |
"cookiefile": cookies_file, | |
"noplaylist": True, | |
"concurrent_fragment_downloads": 5, | |
} | |
return await loop.run_in_executor(None, _download_ytdlp, link, opts) | |
elif type == "video": | |
opts = { | |
"format": "best[height<=?720][width<=?1280]", | |
"outtmpl": f"{download_folder}/%(id)s.%(ext)s", | |
"quiet": True, | |
"no_warnings": True, | |
"cookiefile": cookies_file, | |
"noplaylist": True, | |
"concurrent_fragment_downloads": 5, | |
} | |
return await loop.run_in_executor(None, _download_ytdlp, link, opts) | |
elif type == "song_video" and format_id and title: | |
safe_title = safe_filename(title) | |
opts = { | |
"format": f"{format_id}+140", | |
"outtmpl": f"{download_folder}/{safe_title}.mp4", | |
"quiet": True, | |
"no_warnings": True, | |
"prefer_ffmpeg": True, | |
"merge_output_format": "mp4", | |
"cookiefile": cookies_file, | |
} | |
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link])) | |
return f"{download_folder}/{safe_title}.mp4" | |
elif type == "song_audio" and format_id and title: | |
safe_title = safe_filename(title) | |
opts = { | |
"format": format_id, | |
"outtmpl": f"{download_folder}/{safe_title}.%(ext)s", | |
"quiet": True, | |
"no_warnings": True, | |
"prefer_ffmpeg": True, | |
"cookiefile": cookies_file, | |
"postprocessors": [{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "mp3", | |
"preferredquality": "192", | |
}], | |
} | |
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link])) | |
return f"{download_folder}/{safe_title}.mp3" | |
return None | |
async def download_audio_concurrent(link: str) -> Optional[str]: | |
video_id = extract_video_id(link) | |
existing = file_exists(video_id) | |
if existing: | |
return existing | |
if not USE_API: | |
return await yt_dlp_download(link, type="audio") | |
yt_task = asyncio.create_task(yt_dlp_download(link, type="audio")) | |
api_task = asyncio.create_task(api_download_song(link)) | |
done, _ = await asyncio.wait([yt_task, api_task], return_when=asyncio.FIRST_COMPLETED) | |
for task in done: | |
try: | |
result = task.result() | |
if result: | |
return result | |
except Exception as e: | |
print(f"[Download Task Error] {e}") | |
for task in [yt_task, api_task]: | |
if not task.done(): | |
try: | |
result = await task | |
if result: | |
return result | |
except Exception as e: | |
print(f"[Fallback Task Error] {e}") | |
return None |