DragMusicV2 / DragMusic /utils /downloader.py
taslim19
fix: Use /tmp for downloads to prevent PermissionError
be2dc0f
raw
history blame
6.51 kB
import asyncio
import aiohttp
import aiofiles
import os
import re
from typing import Optional, Union, Dict
from yt_dlp import YoutubeDL
from config import API_URL, API_KEY
USE_API = bool(API_URL and API_KEY)
_logged_api_skip = False
CHUNK_SIZE = 8192
RETRY_DELAY = 2
cookies_file = "cookies.txt"
download_folder = "/tmp/downloads"
os.makedirs(download_folder, exist_ok=True)
def extract_video_id(link: str) -> str:
if "v=" in link:
return link.split("v=")[-1].split("&")[0]
return link.split("/")[-1].split("?")[0]
def safe_filename(name: str) -> str:
return re.sub(r"[\\/*?\"<>|]", "_", name).strip()[:100]
def file_exists(video_id: str) -> Optional[str]:
for ext in ["mp3", "m4a", "webm"]:
path = f"{download_folder}/{video_id}.{ext}"
if os.path.exists(path):
print(f"[CACHED] Using existing file: {path}")
return path
return None
async def api_download_song(link: str) -> Optional[str]:
global _logged_api_skip
if not USE_API:
if not _logged_api_skip:
print("[SKIPPED] API config missing — using yt-dlp only.")
_logged_api_skip = True
return None
video_id = extract_video_id(link)
song_url = f"{API_URL}/song/{video_id}?api={API_KEY}"
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(song_url) as response:
if response.status != 200:
print(f"[API ERROR] Status {response.status}")
return None
data = await response.json()
status = data.get("status", "").lower()
if status == "downloading":
await asyncio.sleep(RETRY_DELAY)
continue
elif status == "error":
print(f"[API ERROR] Status=error for {video_id}")
return None
elif status == "done":
download_url = data.get("link")
break
else:
print(f"[API ERROR] Unknown status: {status}")
return None
fmt = data.get("format", "mp3").lower()
path = f"{download_folder}/{video_id}.{fmt}"
async with session.get(download_url) as file_response:
async with aiofiles.open(path, "wb") as f:
while True:
chunk = await file_response.content.read(CHUNK_SIZE)
if not chunk:
break
await f.write(chunk)
return path
except Exception as e:
print(f"[API Download Error] {e}")
return None
def _download_ytdlp(link: str, opts: Dict) -> Optional[str]:
try:
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(link, download=False)
ext = info.get("ext", "webm")
vid = info.get("id")
path = f"{download_folder}/{vid}.{ext}"
if os.path.exists(path):
return path
ydl.download([link])
return path
except Exception as e:
print(f"[yt-dlp Error] {e}")
return None
async def yt_dlp_download(link: str, type: str, format_id: str = None, title: str = None) -> Optional[str]:
loop = asyncio.get_running_loop()
if type == "audio":
opts = {
"format": "bestaudio/best",
"outtmpl": f"{download_folder}/%(id)s.%(ext)s",
"quiet": True,
"no_warnings": True,
"cookiefile": cookies_file,
"noplaylist": True,
"concurrent_fragment_downloads": 5,
}
return await loop.run_in_executor(None, _download_ytdlp, link, opts)
elif type == "video":
opts = {
"format": "best[height<=?720][width<=?1280]",
"outtmpl": f"{download_folder}/%(id)s.%(ext)s",
"quiet": True,
"no_warnings": True,
"cookiefile": cookies_file,
"noplaylist": True,
"concurrent_fragment_downloads": 5,
}
return await loop.run_in_executor(None, _download_ytdlp, link, opts)
elif type == "song_video" and format_id and title:
safe_title = safe_filename(title)
opts = {
"format": f"{format_id}+140",
"outtmpl": f"{download_folder}/{safe_title}.mp4",
"quiet": True,
"no_warnings": True,
"prefer_ffmpeg": True,
"merge_output_format": "mp4",
"cookiefile": cookies_file,
}
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link]))
return f"{download_folder}/{safe_title}.mp4"
elif type == "song_audio" and format_id and title:
safe_title = safe_filename(title)
opts = {
"format": format_id,
"outtmpl": f"{download_folder}/{safe_title}.%(ext)s",
"quiet": True,
"no_warnings": True,
"prefer_ffmpeg": True,
"cookiefile": cookies_file,
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}],
}
await loop.run_in_executor(None, lambda: YoutubeDL(opts).download([link]))
return f"{download_folder}/{safe_title}.mp3"
return None
async def download_audio_concurrent(link: str) -> Optional[str]:
video_id = extract_video_id(link)
existing = file_exists(video_id)
if existing:
return existing
if not USE_API:
return await yt_dlp_download(link, type="audio")
yt_task = asyncio.create_task(yt_dlp_download(link, type="audio"))
api_task = asyncio.create_task(api_download_song(link))
done, _ = await asyncio.wait([yt_task, api_task], return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
result = task.result()
if result:
return result
except Exception as e:
print(f"[Download Task Error] {e}")
for task in [yt_task, api_task]:
if not task.done():
try:
result = await task
if result:
return result
except Exception as e:
print(f"[Fallback Task Error] {e}")
return None