dragonxd1 commited on
Commit
07027a4
·
verified ·
1 Parent(s): 0d351fc

Update DragMusic/platforms/Youtube.py

Browse files
Files changed (1) hide show
  1. DragMusic/platforms/Youtube.py +216 -186
DragMusic/platforms/Youtube.py CHANGED
@@ -1,5 +1,5 @@
1
  import asyncio
2
- import os
3
  import re
4
  from typing import Dict, List, Optional, Tuple, Union
5
 
@@ -9,231 +9,261 @@ from pyrogram.types import Message
9
  from youtubesearchpython.__future__ import VideosSearch
10
 
11
  from DragMusic.utils.database import is_on_off
 
12
  from DragMusic.utils.errors import capture_internal_err
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
 
15
  _cache = {}
16
 
17
- async def shell_cmd(cmd):
 
 
18
  proc = await asyncio.create_subprocess_shell(
19
- cmd,
20
- stdout=asyncio.subprocess.PIPE,
21
- stderr=asyncio.subprocess.PIPE,
22
  )
23
- out, errorz = await proc.communicate()
24
- if errorz:
25
- if "unavailable videos are hidden" in (errorz.decode("utf-8")).lower():
26
- return out.decode("utf-8")
27
- else:
28
- return errorz.decode("utf-8")
29
- return out.decode("utf-8")
 
 
 
 
 
 
 
30
 
31
 
32
  class YouTubeAPI:
33
- def __init__(self):
34
- self.base = "https://www.youtube.com/watch?v="
35
- self.opts = {"nocheckcertificate": True, "quiet": True, "no_warnings": True}
36
-
37
- cookie_file = "cookies.txt"
38
- if os.path.exists(cookie_file):
39
- self.opts["cookiefile"] = cookie_file
40
 
41
- @capture_internal_err
42
- async def exists(self, url: str):
43
- return bool(re.search(r"youtube\.com|youtu\.be", url))
 
 
 
 
 
44
 
45
  @capture_internal_err
46
- async def slider(self, query: str, query_type: int):
47
- search = VideosSearch(query, limit=10)
48
- results = await search.next()
49
- result_list = results.get("result", [])
50
- if not result_list:
51
- return None, None, None, None
52
-
53
- if query_type >= len(result_list):
54
- query_type = 0
55
-
56
- info = result_list[query_type]
57
- title = info.get("title", "Unknown Title")
58
- duration_min = info.get("duration", "0:00")
59
- thumbnail = info.get("thumbnails", [{}])[0].get("url", "").split("?")[0]
60
- vidid = info.get("id", "")
61
- return title, duration_min, thumbnail, vidid
62
 
63
  @capture_internal_err
64
- async def _fetch_info(self, query: str, use_cache: bool = True):
65
- if use_cache and query in _cache:
66
- return _cache[query]
67
-
68
- search = VideosSearch(query, limit=1)
69
- results = await search.next()
70
- result_data = results.get("result", [])
71
-
72
- if result_data:
73
- _cache[query] = result_data[0]
74
- return result_data[0]
75
  return None
76
 
77
  @capture_internal_err
78
- async def get_info(self, link: str):
79
- """Fetches all video information using yt-dlp."""
80
- full_url = self.base + link if not link.startswith("http") else link
81
- with yt_dlp.YoutubeDL(self.opts) as ydl:
82
- info = ydl.extract_info(full_url, download=False)
83
- return info
 
84
 
85
  @capture_internal_err
86
- async def details(self, link: str, videoid: bool = False):
87
- """Gets essential video details."""
88
- if videoid:
89
- link = self.base + link
90
-
91
- info = await self._fetch_info(link, use_cache=False)
92
- if not info:
93
- info = await self.get_info(link)
94
-
95
- title = info.get("title", "Unknown Title")
96
- duration_min = info.get("duration", "0:00")
97
- duration_sec = time_to_seconds(duration_min) if duration_min else 0
98
- thumbnail = info.get("thumbnails", [{}])[0].get("url", "").split("?")[0] or info.get("thumbnail", "")
99
- vidid = info.get("id", "")
100
- return title, duration_min, duration_sec, thumbnail, vidid
101
 
102
  @capture_internal_err
103
- async def search(self, query: str):
104
- """Searches for a video and returns its details."""
105
- info = await self._fetch_info(query)
106
  if not info:
107
- return None, None
108
-
109
- details = {
110
- "title": info.get("title", "Unknown Title"),
111
- "link": info.get("link", ""),
112
- "vidid": info.get("id", ""),
113
- "duration_min": info.get("duration", "0:00"),
114
- "thumb": info.get("thumbnails", [{}])[0].get("url", "").split("?")[0],
115
- }
116
- track_id = info.get("id", "")
117
- return details, track_id
118
 
119
  @capture_internal_err
120
- async def track(self, link: str, videoid: bool = False):
121
- """Gets track details for a single video."""
122
- if videoid:
123
- link = self.base + link
124
- info = await self.get_info(link)
125
- track_details = {
126
- "title": info.get("title", "Unknown Title"),
127
- "link": info.get("webpage_url", ""),
128
- "vidid": info.get("id", ""),
129
- "duration_min": info.get("duration_string", "0:00"),
130
- "thumb": info.get("thumbnail", ""),
131
- }
132
- return track_details, info.get("id", "")
133
 
134
  @capture_internal_err
135
- async def url(self, message: "Message") -> Union[str, None]:
136
- """Extracts a URL from a Pyrogram message."""
137
- for entity in message.entities or message.caption_entities or []:
138
- if entity.type in ["url", "text_link"]:
139
- return entity.url or message.text[entity.offset:entity.offset + entity.length]
140
- return None
141
 
142
  @capture_internal_err
143
- async def title(self, link: str):
144
- info = await self.get_info(link)
145
- return info.get("title", "Unknown Title")
146
 
147
  @capture_internal_err
148
- async def duration(self, link: str):
149
- info = await self.get_info(link)
150
- return info.get("duration_string", "0:00")
 
 
 
 
 
151
 
152
  @capture_internal_err
153
- async def thumbnail(self, link: str):
154
- info = await self.get_info(link)
155
- return info.get("thumbnail", "")
 
 
 
 
 
 
 
156
 
157
  @capture_internal_err
158
- async def get_best_audio_url(self, link: str):
159
- """Gets the URL of the best quality audio stream."""
160
- info = await self.get_info(link)
161
- best_audio = None
162
- for f in info.get("formats", []):
163
- if f.get("acodec") != "none" and f.get("vcodec") == "none":
164
- if best_audio is None or f.get("abr", 0) > best_audio.get("abr", 0):
165
- best_audio = f
166
- return best_audio["url"] if best_audio else None
 
 
 
 
 
 
 
 
 
167
 
168
- @capture_internal_err
169
- async def get_best_video_url(self, link: str, quality: str = "720"):
170
- """Gets the URL of the best video stream for a given quality."""
171
- info = await self.get_info(link)
172
- best_video = None
173
- for f in info.get("formats", []):
174
- if f.get("height") and f.get("height") <= int(quality) and f.get("vcodec") != "none":
175
- if best_video is None or f.get("height") > best_video.get("height"):
176
- best_video = f
177
- return best_video["url"] if best_video else None
178
 
179
  @capture_internal_err
180
- async def playlist(self, link: str, limit: int, user_id, videoid: bool = False):
181
- """Fetches video IDs from a playlist."""
182
- if videoid:
183
- link = f"https://www.youtube.com/playlist?list={link}"
184
-
185
- playlist_opts = self.opts.copy()
186
- playlist_opts["extract_flat"] = "in_playlist"
187
- playlist_opts["playlistend"] = limit
188
-
189
- with yt_dlp.YoutubeDL(playlist_opts) as ydl:
190
- info = ydl.extract_info(link, download=False)
191
- return [entry["id"] for entry in info.get("entries", []) if entry]
 
 
 
 
 
 
 
 
 
 
192
 
193
  @capture_internal_err
194
- async def formats(self, link: str):
195
- """Gets available formats for a video."""
196
- info = await self.get_info(link)
197
- formats_available = []
198
- for f in info.get("formats", []):
199
- if not "dash" in str(f.get("format_note", "")).lower():
200
- formats_available.append({
201
- "format": f.get("format", ""),
202
- "filesize": f.get("filesize"),
203
- "format_id": f.get("format_id", ""),
204
- "ext": f.get("ext", ""),
205
- "format_note": f.get("format_note", ""),
206
- "yturl": info.get("webpage_url", ""),
207
- })
208
- return formats_available, info.get("webpage_url", "")
209
-
210
  @capture_internal_err
211
- async def download(self, link: str, mystic, video: bool = None, format_id: str = None) -> Union[str, None]:
212
- """Downloads a video or audio file and returns the path."""
213
- file_path = f"downloads/{link}.%(ext)s"
214
-
215
- dl_opts = self.opts.copy()
216
- dl_opts["outtmpl"] = file_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
 
218
  if video:
219
- dl_opts["format"] = format_id if format_id else "best[height<=?720][width<=?1280]/best"
220
- else: # audio
221
- dl_opts["format"] = "bestaudio/best"
222
- dl_opts["postprocessors"] = [{
223
- "key": "FFmpegExtractAudio",
224
- "preferredcodec": "mp3",
225
- "preferredquality": "192",
226
- }]
227
-
228
- try:
229
- with yt_dlp.YoutubeDL(dl_opts) as ydl:
230
- ydl.download([link])
231
-
232
- # Find the downloaded file since the extension is dynamic
233
- for file in os.listdir("downloads"):
234
- if link in file:
235
- return os.path.join("downloads", file)
236
- return None
237
- except Exception as e:
238
- print(str(e))
239
- return None
 
 
 
 
 
 
 
1
  import asyncio
2
+ import json
3
  import re
4
  from typing import Dict, List, Optional, Tuple, Union
5
 
 
9
  from youtubesearchpython.__future__ import VideosSearch
10
 
11
  from DragMusic.utils.database import is_on_off
12
+ from DragMusic.utils.downloader import yt_dlp_download, download_audio_concurrent
13
  from DragMusic.utils.errors import capture_internal_err
14
  from DragMusic.utils.formatters import time_to_seconds
15
 
16
+ cookies_file = "cookies.txt"
17
  _cache = {}
18
 
19
+
20
+ @capture_internal_err
21
+ async def shell_cmd(cmd: str) -> str:
22
  proc = await asyncio.create_subprocess_shell(
23
+ cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
 
 
24
  )
25
+ out, err = await proc.communicate()
26
+ return (out or err).decode()
27
+
28
+
29
+ @capture_internal_err
30
+ async def cached_youtube_search(query: str) -> List[Dict]:
31
+ if query in _cache:
32
+ return _cache[query]
33
+ search = VideosSearch(query, limit=1)
34
+ results = await search.next()
35
+ result_data = results.get("result", [])
36
+ if result_data:
37
+ _cache[query] = result_data
38
+ return result_data
39
 
40
 
41
  class YouTubeAPI:
42
+ def __init__(self) -> None:
43
+ self.base_url = "https://www.youtube.com/watch?v="
44
+ self.playlist_url = "https://youtube.com/playlist?list="
45
+ self._url_pattern = re.compile(r"(?:youtube\.com|youtu\.be)")
 
 
 
46
 
47
+ def _prepare_link(self, link: str, videoid: Union[str, bool, None] = None) -> str:
48
+ if isinstance(videoid, str) and videoid.strip():
49
+ link = self.base_url + videoid.strip()
50
+ if "youtu.be" in link:
51
+ link = self.base_url + link.split("/")[-1].split("?")[0]
52
+ elif "youtube.com/shorts/" in link or "youtube.com/live/" in link:
53
+ link = self.base_url + link.split("/")[-1].split("?")[0]
54
+ return link.split("&")[0]
55
 
56
  @capture_internal_err
57
+ async def exists(self, link: str, videoid: Union[str, bool, None] = None) -> bool:
58
+ return bool(self._url_pattern.search(self._prepare_link(link, videoid)))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
  @capture_internal_err
61
+ async def url(self, message: Message) -> Optional[str]:
62
+ msgs = [message] + ([message.reply_to_message] if message.reply_to_message else [])
63
+ for msg in msgs:
64
+ text = msg.text or msg.caption or ""
65
+ entities = msg.entities or msg.caption_entities or []
66
+ for ent in entities:
67
+ if ent.type == MessageEntityType.URL:
68
+ return text[ent.offset : ent.offset + ent.length]
69
+ if ent.type == MessageEntityType.TEXT_LINK:
70
+ return ent.url
 
71
  return None
72
 
73
  @capture_internal_err
74
+ async def _fetch_video_info(self, query: str, *, use_cache: bool = True) -> Optional[Dict]:
75
+ if use_cache and not query.startswith("http"):
76
+ result = await cached_youtube_search(query)
77
+ else:
78
+ search = VideosSearch(query, limit=1)
79
+ result = (await search.next()).get("result", [])
80
+ return result[0] if result else None
81
 
82
  @capture_internal_err
83
+ async def is_live(self, link: str) -> bool:
84
+ prepared = self._prepare_link(link)
85
+ proc = await asyncio.create_subprocess_exec(
86
+ "yt-dlp", "--cookies", cookies_file, "--dump-json", prepared,
87
+ stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
88
+ )
89
+ stdout, _ = await proc.communicate()
90
+ if not stdout:
91
+ return False
92
+ try:
93
+ info = json.loads(stdout.decode())
94
+ return bool(info.get("is_live"))
95
+ except json.JSONDecodeError:
96
+ return False
 
97
 
98
  @capture_internal_err
99
+ async def details(self, link: str, videoid: Union[str, bool, None] = None) -> Tuple[str, Optional[str], int, str, str]:
100
+ info = await self._fetch_video_info(self._prepare_link(link, videoid))
 
101
  if not info:
102
+ raise ValueError("Video not found")
103
+ duration_text = info.get("duration")
104
+ duration_sec = int(time_to_seconds(duration_text)) if duration_text else 0
105
+ thumb = (info.get("thumbnail") or info.get("thumbnails", [{}])[0].get("url", "")).split("?")[0]
106
+ return (
107
+ info.get("title", ""),
108
+ duration_text,
109
+ duration_sec,
110
+ thumb,
111
+ info.get("id", ""),
112
+ )
113
 
114
  @capture_internal_err
115
+ async def title(self, link: str, videoid: Union[str, bool, None] = None) -> str:
116
+ info = await self._fetch_video_info(self._prepare_link(link, videoid))
117
+ return info.get("title", "") if info else ""
 
 
 
 
 
 
 
 
 
 
118
 
119
  @capture_internal_err
120
+ async def duration(self, link: str, videoid: Union[str, bool, None] = None) -> Optional[str]:
121
+ info = await self._fetch_video_info(self._prepare_link(link, videoid))
122
+ return info.get("duration") if info else None
 
 
 
123
 
124
  @capture_internal_err
125
+ async def thumbnail(self, link: str, videoid: Union[str, bool, None] = None) -> str:
126
+ info = await self._fetch_video_info(self._prepare_link(link, videoid))
127
+ return (info.get("thumbnail") or info.get("thumbnails", [{}])[0].get("url", "")).split("?")[0] if info else ""
128
 
129
  @capture_internal_err
130
+ async def video(self, link: str, videoid: Union[str, bool, None] = None) -> Tuple[int, str]:
131
+ link = self._prepare_link(link, videoid)
132
+ proc = await asyncio.create_subprocess_exec(
133
+ "yt-dlp", "--cookies", cookies_file, "-g", "-f", "best[height<=?720][width<=?1280]",
134
+ link, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
135
+ )
136
+ stdout, stderr = await proc.communicate()
137
+ return (1, stdout.decode().split("\n")[0]) if stdout else (0, stderr.decode())
138
 
139
  @capture_internal_err
140
+ async def playlist(self, link: str, limit: int, user_id, videoid: Union[str, bool, None] = None) -> List[str]:
141
+ if videoid:
142
+ link = self.playlist_url + str(videoid)
143
+ link = link.split("&")[0]
144
+ cmd = (
145
+ f"yt-dlp --cookies {cookies_file} -i --get-id --flat-playlist "
146
+ f"--playlist-end {limit} --skip-download {link}"
147
+ )
148
+ data = await shell_cmd(cmd)
149
+ return [item for item in data.strip().split("\n") if item]
150
 
151
  @capture_internal_err
152
+ async def track(self, link: str, videoid: Union[str, bool, None] = None) -> Tuple[Dict, str]:
153
+ try:
154
+ info = await self._fetch_video_info(self._prepare_link(link, videoid))
155
+ if not info:
156
+ raise ValueError("Track not found via API")
157
+ except Exception:
158
+ prepared = self._prepare_link(link, videoid)
159
+ proc = await asyncio.create_subprocess_exec(
160
+ "yt-dlp", "--cookies", cookies_file, "--dump-json", prepared,
161
+ stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
162
+ )
163
+ stdout, _ = await proc.communicate()
164
+ if not stdout:
165
+ raise ValueError("Track not found (yt-dlp fallback)")
166
+ try:
167
+ info = json.loads(stdout.decode())
168
+ except json.JSONDecodeError:
169
+ raise ValueError("Failed to parse yt-dlp output")
170
 
171
+ thumb = (info.get("thumbnail") or info.get("thumbnails", [{}])[0].get("url", "")).split("?")[0]
172
+ details = {
173
+ "title": info.get("title", ""),
174
+ "link": info.get("webpage_url", self._prepare_link(link, videoid)),
175
+ "vidid": info.get("id", ""),
176
+ "duration_min": info.get("duration") if isinstance(info.get("duration"), str) else None,
177
+ "thumb": thumb,
178
+ }
179
+ return details, info.get("id", "")
 
180
 
181
  @capture_internal_err
182
+ async def formats(self, link: str, videoid: Union[str, bool, None] = None) -> Tuple[List[Dict], str]:
183
+ link = self._prepare_link(link, videoid)
184
+ opts = {"quiet": True, "cookiefile": cookies_file}
185
+ formats: List[Dict] = []
186
+ try:
187
+ with yt_dlp.YoutubeDL(opts) as ydl:
188
+ info = ydl.extract_info(link, download=False)
189
+ for fmt in info.get("formats", []):
190
+ if "dash" in fmt.get("format", "").lower():
191
+ continue
192
+ if all(k in fmt for k in ("format", "filesize", "format_id", "ext", "format_note")):
193
+ formats.append({
194
+ "format": fmt["format"],
195
+ "filesize": fmt["filesize"],
196
+ "format_id": fmt["format_id"],
197
+ "ext": fmt["ext"],
198
+ "format_note": fmt["format_note"],
199
+ "yturl": link,
200
+ })
201
+ except Exception as e:
202
+ print(f"[formats()] yt-dlp error: {e}")
203
+ return formats, link
204
 
205
  @capture_internal_err
206
+ async def slider(self, link: str, query_type: int, videoid: Union[str, bool, None] = None) -> Tuple[str, Optional[str], str, str]:
207
+ search = VideosSearch(self._prepare_link(link, videoid), limit=10)
208
+ results = (await search.next()).get("result", [])
209
+ if not results or query_type >= len(results):
210
+ raise IndexError(f"Query type index {query_type} out of range (found {len(results)} results)")
211
+ res = results[query_type]
212
+ return (
213
+ res.get("title", ""),
214
+ res.get("duration"),
215
+ res.get("thumbnails", [{}])[0].get("url", "").split("?")[0],
216
+ res.get("id", ""),
217
+ )
218
+
 
 
 
219
  @capture_internal_err
220
+ async def download(
221
+ self,
222
+ link: str,
223
+ mystic,
224
+ *,
225
+ video: Union[bool, str, None] = None,
226
+ videoid: Union[str, bool, None] = None,
227
+ songaudio: Union[bool, str, None] = None,
228
+ songvideo: Union[bool, str, None] = None,
229
+ format_id: Union[bool, str, None] = None,
230
+ title: Union[bool, str, None] = None,
231
+ ) -> Union[Tuple[str, Optional[bool]], Tuple[None, None]]:
232
+ link = self._prepare_link(link, videoid)
233
+
234
+ if songvideo:
235
+ path = await yt_dlp_download(link, type="song_video", format_id=format_id, title=title)
236
+ return (path, True) if path else (None, None)
237
+
238
+ if songaudio:
239
+ path = await yt_dlp_download(link, type="song_audio", format_id=format_id, title=title)
240
+ return (path, True) if path else (None, None)
241
 
242
  if video:
243
+ if await self.is_live(link):
244
+ status, stream_url = await self.video(link)
245
+ if status == 1:
246
+ return stream_url, None
247
+ raise ValueError("Unable to fetch live stream link")
248
+ if await is_on_off(1):
249
+ path = await yt_dlp_download(link, type="video")
250
+ return (path, True) if path else (None, None)
251
+ else:
252
+ proc = await asyncio.create_subprocess_exec(
253
+ "yt-dlp",
254
+ "--cookies",
255
+ cookies_file,
256
+ "-g",
257
+ "-f",
258
+ "best[height<=?720][width<=?1280]",
259
+ link,
260
+ stdout=asyncio.subprocess.PIPE,
261
+ stderr=asyncio.subprocess.PIPE,
262
+ )
263
+ stdout, _ = await proc.communicate()
264
+ if stdout:
265
+ return stdout.decode().split("\n")[0], None
266
+ return None, None
267
+
268
+ path = await download_audio_concurrent(link)
269
+ return (path, True) if path else (None, None)