Spaces:
Paused
Paused
File size: 6,514 Bytes
eef327e be2dc0f eef327e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import asyncio
import aiohttp
import aiofiles
import os
import re
from typing import Optional, Union, Dict
from yt_dlp import YoutubeDL
from config import API_URL, API_KEY
USE_API = bool(API_URL and API_KEY)
_logged_api_skip = False
CHUNK_SIZE = 8192
RETRY_DELAY = 2
cookies_file = "cookies.txt"
download_folder = "/tmp/downloads"
os.makedirs(download_folder, exist_ok=True)
def extract_video_id(link: str) -> str:
if "v=" in link:
return link.split("v=")[-1].split("&")[0]
return link.split("/")[-1].split("?")[0]
def safe_filename(name: str) -> str:
return re.sub(r"[\\/*?\"<>|]", "_", name).strip()[:100]
def file_exists(video_id: str) -> Optional[str]:
for ext in ["mp3", "m4a", "webm"]:
path = f"{download_folder}/{video_id}.{ext}"
if os.path.exists(path):
print(f"[CACHED] Using existing file: {path}")
return path
return None
async def api_download_song(link: str) -> Optional[str]:
global _logged_api_skip
if not USE_API:
if not _logged_api_skip:
print("[SKIPPED] API config missing — using yt-dlp only.")
_logged_api_skip = True
return None
video_id = extract_video_id(link)
song_url = f"{API_URL}/song/{video_id}?api={API_KEY}"
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(song_url) as response:
if response.status != 200:
print(f"[API ERROR] Status {response.status}")
return None
data = await response.json()
status = data.get("status", "").lower()
if status == "downloading":
await asyncio.sleep(RETRY_DELAY)
continue
elif status == "error":
print(f"[API ERROR] Status=error for {video_id}")
return None
elif status == "done":
download_url = data.get("link")
break
else:
print(f"[API ERROR] Unknown status: {status}")
return None
fmt = data.get("format", "mp3").lower()
path = f"{download_folder}/{video_id}.{fmt}"
async with session.get(download_url) as file_response:
async with aiofiles.open(path, "wb") as f:
while True:
chunk = await file_response.content.read(CHUNK_SIZE)
if not chunk:
break
await f.write(chunk)
return path
except Exception as e:
print(f"[API Download Error] {e}")
return None
def _download_ytdlp(link: str, opts: Dict) -> Optional[str]:
try:
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(link, download=False)
ext = info.get("ext", "webm")
vid = info.get("id")
path = f"{download_folder}/{vid}.{ext}"
if os.path.exists(path):
return path
ydl.download([link])
return path
except Exception as e:
print(f"[yt-dlp Error] {e}")
return None
async def yt_dlp_download(link: str, type: str, format_id: str = None, title: str = None) -> Optional[str]:
loop = asyncio.get_running_loop()
if type == "audio":
opts = {
"format": "bestaudio/best",
"outtmpl": f"{download_folder}/%(id)s.%(ext)s",
"quiet": True,
"no_warnings": True,
"cookiefile": cookies_file,
"noplaylist": True,
"concurrent_fragment_downloads": 5,
}
return await loop.run_in_executor(None, _download_ytdlp, link, opts)
elif type == "video":
opts = {
"format": "best[height<=?720][width<=?1280]",
"outtmpl": f"{download_folder}/%(id)s.%(ext)s",
"quiet": True,
"no_warnings": True,
"cookiefile": cookies_file,
"noplaylist": True,
"concurrent_fragment_downloads": 5,
}
return await loop.run_in_executor(None, _download_ytdlp, link, opts)
elif type == "song_video" and format_id and title:
safe_title = safe_filename(title)
opts = {
"format": f"{format_id}+140",
"outtmpl": f"{download_folder}/{safe_title}.mp4",
"quiet": True,
"no_warnings": True,
"prefer_ffmpeg": True,
"merge_output_format": "mp4",
"cookiefile": cookies_file,
}
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link]))
return f"{download_folder}/{safe_title}.mp4"
elif type == "song_audio" and format_id and title:
safe_title = safe_filename(title)
opts = {
"format": format_id,
"outtmpl": f"{download_folder}/{safe_title}.%(ext)s",
"quiet": True,
"no_warnings": True,
"prefer_ffmpeg": True,
"cookiefile": cookies_file,
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}],
}
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link]))
return f"{download_folder}/{safe_title}.mp3"
return None
async def download_audio_concurrent(link: str) -> Optional[str]:
video_id = extract_video_id(link)
existing = file_exists(video_id)
if existing:
return existing
if not USE_API:
return await yt_dlp_download(link, type="audio")
yt_task = asyncio.create_task(yt_dlp_download(link, type="audio"))
api_task = asyncio.create_task(api_download_song(link))
done, _ = await asyncio.wait([yt_task, api_task], return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
result = task.result()
if result:
return result
except Exception as e:
print(f"[Download Task Error] {e}")
for task in [yt_task, api_task]:
if not task.done():
try:
result = await task
if result:
return result
except Exception as e:
print(f"[Fallback Task Error] {e}")
return None |