import asyncio import glob import os import random import re from typing import Union from pyrogram.enums import MessageEntityType from pyrogram.types import Message from youtubesearchpython.__future__ import VideosSearch from yt_dlp import YoutubeDL import config from Devine.utils.database import is_on_off from Devine.utils.formatters import time_to_seconds def cookies(): folder_path = f"{os.getcwd()}/cookies" txt_files = glob.glob(os.path.join(folder_path, "*.txt")) if not txt_files: raise FileNotFoundError("No .txt files found in the specified folder.") cookie_txt_file = random.choice(txt_files) return f"""cookies/{str(cookie_txt_file).split("/")[-1]}""" def get_ytdl_options(ytdl_opts: Union[str, dict, list], commandline: bool = True) -> Union[str, dict, list]: token_data = os.getenv("TOKEN_DATA") if isinstance(ytdl_opts, list): if token_data: ytdl_opts += ["--username" if commandline else "username", "oauth2", "--password" if commandline else "password", "''"] else: ytdl_opts += ["--cookies" if commandline else "cookiefile", cookies()] elif isinstance(ytdl_opts, str): if token_data: ytdl_opts += "--username oauth2 --password '' " if commandline else "username oauth2 password '' " else: ytdl_opts += f"--cookies {cookies()}" if commandline else f"cookiefile {cookies()}" elif isinstance(ytdl_opts, dict): if token_data: ytdl_opts.update({"username": "oauth2", "password": ""}) else: ytdl_opts["cookiefile"] = cookies() return ytdl_opts async def shell_cmd(cmd): proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out, errorz = await proc.communicate() if errorz: if "unavailable videos are hidden" in (errorz.decode("utf-8")).lower(): return out.decode("utf-8") else: return errorz.decode("utf-8") return out.decode("utf-8") class YouTubeAPI: def __init__(self): self.base = "https://www.youtube.com/watch?v=" self.regex = r"(?:youtube\.com|youtu\.be)" self.status = "https://www.youtube.com/oembed?url=" self.listbase = "https://youtube.com/playlist?list=" self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") async def exists(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if re.search(self.regex, link): return True else: return False async def url(self, message_1: Message) -> Union[str, None]: messages = [message_1] if message_1.reply_to_message: messages.append(message_1.reply_to_message) text = "" offset = None length = None for message in messages: if offset: break if message.entities: for entity in message.entities: if entity.type == MessageEntityType.URL: text = message.text or message.caption offset, length = entity.offset, entity.length break elif message.caption_entities: for entity in message.caption_entities: if entity.type == MessageEntityType.TEXT_LINK: return entity.url if offset in (None,): return None return text[offset : offset + length] async def details(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] results = VideosSearch(link, limit=1) for result in (await results.next())["result"]: title = result["title"] duration_min = result["duration"] thumbnail = result["thumbnails"][0]["url"].split("?")[0] vidid = result["id"] if str(duration_min) == "None": duration_sec = 0 else: duration_sec = int(time_to_seconds(duration_min)) return title, duration_min, duration_sec, thumbnail, vidid async def title(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] results = VideosSearch(link, limit=1) for result in (await results.next())["result"]: title = result["title"] return title async def duration(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] results = VideosSearch(link, limit=1) for result in (await results.next())["result"]: duration = result["duration"] return duration async def thumbnail(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] results = VideosSearch(link, limit=1) for result in (await results.next())["result"]: thumbnail = result["thumbnails"][0]["url"].split("?")[0] return thumbnail async def video(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] cmd = [ "yt-dlp", "-g", "-f", "best[height<=?720][width<=?1280]", f"{link}", ] cmd = get_ytdl_options(cmd) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if stdout: return 1, stdout.decode().split("\n")[0] else: return 0, stderr.decode() async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None): if videoid: link = self.listbase + link if "&" in link: link = link.split("&")[0] cmd = get_ytdl_options( f"yt-dlp -i --get-id --flat-playlist --playlist-end {limit} --skip-download {link}" ) playlist = await shell_cmd(cmd) try: result = playlist.split("\n") for key in result: if key == "": result.remove(key) except: result = [] return result async def track(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] results = VideosSearch(link, limit=1) for result in (await results.next())["result"]: title = result["title"] duration_min = result["duration"] vidid = result["id"] yturl = result["link"] thumbnail = result["thumbnails"][0]["url"].split("?")[0] track_details = { "title": title, "link": yturl, "vidid": vidid, "duration_min": duration_min, "thumb": thumbnail, } return track_details, vidid async def formats(self, link: str, videoid: Union[bool, str] = None): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] ytdl_opts = { "quiet": True, } ytdl_opts = get_ytdl_options(ytdl_opts, False) ydl = YoutubeDL(ytdl_opts) with ydl: formats_available = [] r = ydl.extract_info(link, download=False) for format in r["formats"]: try: str(format["format"]) except Exception: continue if "dash" not in str(format["format"]).lower(): try: format["format"] format["filesize"] format["format_id"] format["ext"] format["format_note"] except KeyError: continue formats_available.append( { "format": format["format"], "filesize": format["filesize"], "format_id": format["format_id"], "ext": format["ext"], "format_note": format["format_note"], "yturl": link, } ) return formats_available, link async def slider( self, link: str, query_type: int, videoid: Union[bool, str] = None, ): if videoid: link = self.base + link if "&" in link: link = link.split("&")[0] a = VideosSearch(link, limit=10) result = (await a.next()).get("result") title = result[query_type]["title"] duration_min = result[query_type]["duration"] vidid = result[query_type]["id"] thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0] return title, duration_min, thumbnail, vidid async def download( self, link: str, mystic, video: Union[bool, str] = None, videoid: Union[bool, str] = None, songaudio: Union[bool, str] = None, songvideo: Union[bool, str] = None, format_id: Union[bool, str] = None, title: Union[bool, str] = None, ) -> str: if videoid: link = self.base + link loop = asyncio.get_running_loop() def audio_dl(): ydl_optssx = { "format": "bestaudio/best", "outtmpl": "downloads/%(id)s.%(ext)s", "geo_bypass": True, "nocheckcertificate": True, "quiet": True, "no_warnings": True, } ydl_optssx = get_ytdl_options(ydl_optssx, False) x = YoutubeDL(ydl_optssx) info = x.extract_info(link, False) xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}") if os.path.exists(xyz): return xyz x.download([link]) return xyz def video_dl(): ydl_optssx = { "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])", "outtmpl": "downloads/%(id)s.%(ext)s", "geo_bypass": True, "nocheckcertificate": True, "quiet": True, "no_warnings": True, } ydl_optssx = get_ytdl_options(ydl_optssx, False) x = YoutubeDL(ydl_optssx) info = x.extract_info(link, False) xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}") if os.path.exists(xyz): return xyz x.download([link]) return xyz def song_video_dl(): formats = f"{format_id}+140" fpath = f"downloads/{title}" ydl_optssx = { "format": formats, "outtmpl": fpath, "geo_bypass": True, "nocheckcertificate": True, "quiet": True, "no_warnings": True, "prefer_ffmpeg": True, "merge_output_format": "mp4", } ydl_optssx = get_ytdl_options(ydl_optssx, False) x = YoutubeDL(ydl_optssx) x.download([link]) def song_audio_dl(): fpath = f"downloads/{title}.%(ext)s" ydl_optssx = { "format": format_id, "outtmpl": fpath, "geo_bypass": True, "nocheckcertificate": True, "quiet": True, "no_warnings": True, "prefer_ffmpeg": True, "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], } ydl_optssx = get_ytdl_options(ydl_optssx, False) x = YoutubeDL(ydl_optssx) x.download([link]) if songvideo: await loop.run_in_executor(None, song_video_dl) fpath = f"downloads/{title}.mp4" return fpath elif songaudio: await loop.run_in_executor(None, song_audio_dl) fpath = f"downloads/{title}.mp3" return fpath elif video: if await is_on_off(config.YTDOWNLOADER): direct = True downloaded_file = await loop.run_in_executor(None, video_dl) else: command = [ "yt-dlp", "-g", "-f", "best[height<=?720][width<=?1280]", link, ] command = get_ytdl_options(command) proc = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if stdout: downloaded_file = stdout.decode().split("\n")[0] direct = None else: return else: direct = True downloaded_file = await loop.run_in_executor(None, audio_dl) return downloaded_file, direct