dragonxd1 commited on
Commit
265223e
·
verified ·
1 Parent(s): 1d5e89e

Update DragMusic/platforms/Youtube.py

Browse files
Files changed (1) hide show
  1. DragMusic/platforms/Youtube.py +169 -157
DragMusic/platforms/Youtube.py CHANGED
@@ -12,7 +12,11 @@ from youtubesearchpython.__future__ import VideosSearch
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
15
- SUPPORTED_FORMATS = {"mp4", "mkv", "webm", "avi", "mov"} # formats that ffmpeg can handle
 
 
 
 
16
 
17
  async def shell_cmd(cmd):
18
  proc = await asyncio.create_subprocess_shell(
@@ -29,18 +33,36 @@ async def shell_cmd(cmd):
29
  return out.decode("utf-8")
30
 
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  class YouTubeAPI:
33
  def __init__(self):
34
  self.base = "https://www.youtube.com/watch?v="
35
  self.regex = r"(?:youtube\.com|youtu\.be)"
36
  self.status = "https://www.youtube.com/oembed?url="
37
  self.listbase = "https://youtube.com/playlist?list="
38
- self.reg = re.compile(r"\\x1B(?:[@-Z\\-_]|\\[[0-?]*[ -/]*[@-~])")
39
 
40
  async def exists(self, link: str, videoid: Union[bool, str] = None):
41
  if videoid:
42
  link = self.base + link
43
- return bool(re.search(self.regex, link))
 
 
 
44
 
45
  async def url(self, message_1: Message) -> Union[str, None]:
46
  messages = [message_1]
@@ -77,7 +99,10 @@ class YouTubeAPI:
77
  duration_min = result["duration"]
78
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
79
  vidid = result["id"]
80
- duration_sec = 0 if str(duration_min) == "None" else int(time_to_seconds(duration_min))
 
 
 
81
  return title, duration_min, duration_sec, thumbnail, vidid
82
 
83
  async def title(self, link: str, videoid: Union[bool, str] = None):
@@ -87,7 +112,8 @@ class YouTubeAPI:
87
  link = link.split("&")[0]
88
  results = VideosSearch(link, limit=1)
89
  for result in (await results.next())["result"]:
90
- return result["title"]
 
91
 
92
  async def duration(self, link: str, videoid: Union[bool, str] = None):
93
  if videoid:
@@ -96,7 +122,8 @@ class YouTubeAPI:
96
  link = link.split("&")[0]
97
  results = VideosSearch(link, limit=1)
98
  for result in (await results.next())["result"]:
99
- return result["duration"]
 
100
 
101
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
102
  if videoid:
@@ -105,27 +132,17 @@ class YouTubeAPI:
105
  link = link.split("&")[0]
106
  results = VideosSearch(link, limit=1)
107
  for result in (await results.next())["result"]:
108
- return result["thumbnails"][0]["url"].split("?")[0]
 
109
 
110
  async def video(self, link: str, videoid: Union[bool, str] = None):
111
  if videoid:
112
  link = self.base + link
113
  if "&" in link:
114
  link = link.split("&")[0]
115
- proc = await asyncio.create_subprocess_exec(
116
- "yt-dlp",
117
- "-g",
118
- "-f",
119
- "best[height<=?720][width<=?1280]",
120
- f"{link}",
121
- stdout=asyncio.subprocess.PIPE,
122
- stderr=asyncio.subprocess.PIPE,
123
- )
124
- stdout, stderr = await proc.communicate()
125
- if stdout:
126
- return 1, stdout.decode().split("\n")[0]
127
- else:
128
- return 0, stderr.decode()
129
 
130
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
131
  if videoid:
@@ -137,7 +154,9 @@ class YouTubeAPI:
137
  )
138
  try:
139
  result = playlist.split("\n")
140
- result = [x for x in result if x.strip() != ""]
 
 
141
  except:
142
  result = []
143
  return result
@@ -149,14 +168,19 @@ class YouTubeAPI:
149
  link = link.split("&")[0]
150
  results = VideosSearch(link, limit=1)
151
  for result in (await results.next())["result"]:
152
- track_details = {
153
- "title": result["title"],
154
- "link": result["link"],
155
- "vidid": result["id"],
156
- "duration_min": result["duration"],
157
- "thumb": result["thumbnails"][0]["url"].split("?")[0],
158
- }
159
- return track_details, result["id"]
 
 
 
 
 
160
 
161
  async def formats(self, link: str, videoid: Union[bool, str] = None):
162
  if videoid:
@@ -173,23 +197,33 @@ class YouTubeAPI:
173
  str(format["format"])
174
  except:
175
  continue
176
- if "dash" not in str(format["format"]).lower():
177
  try:
178
- formats_available.append(
179
- {
180
- "format": format["format"],
181
- "filesize": format["filesize"],
182
- "format_id": format["format_id"],
183
- "ext": format["ext"],
184
- "format_note": format["format_note"],
185
- "yturl": link,
186
- }
187
- )
188
  except:
189
  continue
 
 
 
 
 
 
 
 
 
 
190
  return formats_available, link
191
 
192
- async def slider(self, link: str, query_type: int, videoid: Union[bool, str] = None):
 
 
 
 
 
193
  if videoid:
194
  link = self.base + link
195
  if "&" in link:
@@ -198,130 +232,108 @@ class YouTubeAPI:
198
  result = (await a.next()).get("result")
199
  title = result[query_type]["title"]
200
  duration_min = result[query_type]["duration"]
 
201
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
202
- return title, duration_min, thumbnail, result[query_type]["id"]
203
 
204
- async def get_video_info_from_bitflow(self, url: str, video: bool):
205
- api_url = "https://bitflow.in/api/youtube"
206
- params = {
207
- "query": url,
208
- "format": "video" if video else "audio",
209
- "download": True,
210
- "api_key": "1spiderkey2"
211
- }
212
- async with httpx.AsyncClient() as client:
213
- response = await client.get(api_url, params=params, timeout=150)
214
- if response.status_code == 200:
215
- return response.json()
216
- else:
217
- return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
218
-
219
- async def download(self, link: str, mystic, video: Union[bool, str] = None,
220
- videoid: Union[bool, str] = None, songaudio: Union[bool, str] = None,
221
- songvideo: Union[bool, str] = None, format_id: Union[bool, str] = None,
222
- title: Union[bool, str] = None) -> str:
223
  if videoid:
224
  link = self.base + link
225
- if "&" in link:
226
- link = link.split("&")[0]
227
  loop = asyncio.get_running_loop()
228
- try:
229
- bitflow_info = await self.get_video_info_from_bitflow(link, video)
230
 
231
- def audio_dl(bitflow_info):
232
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
233
- ydl_optssx = {
234
- "format": "bestaudio/best",
235
- "outtmpl": xyz,
236
- "geo_bypass": True,
237
- "nocheckcertificate": True,
238
- "quiet": True,
239
- "no_warnings": True,
240
- }
241
- x = yt_dlp.YoutubeDL(ydl_optssx)
242
- if os.path.exists(xyz):
243
- return xyz
244
- x.download([bitflow_info['url']])
245
  return xyz
 
 
246
 
247
- def video_dl(bitflow_info):
248
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
249
- ydl_optssx = {
250
- "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
251
- "outtmpl": xyz,
252
- "geo_bypass": True,
253
- "nocheckcertificate": True,
254
- "quiet": True,
255
- "no_warnings": True,
256
- }
257
- x = yt_dlp.YoutubeDL(ydl_optssx)
258
- if os.path.exists(xyz):
259
- return xyz
260
- x.download([bitflow_info['url']])
261
  return xyz
 
 
262
 
263
- def song_video_dl():
264
- formats = f"{format_id}+140"
265
- fpath = f"downloads/{title}"
266
- ydl_optssx = {
267
- "format": formats,
268
- "outtmpl": fpath,
269
- "geo_bypass": True,
270
- "nocheckcertificate": True,
271
- "quiet": True,
272
- "no_warnings": True,
273
- "prefer_ffmpeg": True,
274
- "merge_output_format": "mp4",
275
- }
276
- x = yt_dlp.YoutubeDL(ydl_optssx)
277
- x.download([link])
278
-
279
- def song_audio_dl():
280
- fpath = f"downloads/{title}.%(ext)s"
281
- ydl_optssx = {
282
- "format": format_id,
283
- "outtmpl": fpath,
284
- "geo_bypass": True,
285
- "nocheckcertificate": True,
286
- "quiet": True,
287
- "no_warnings": True,
288
- "prefer_ffmpeg": True,
289
- "postprocessors": [
290
- {
291
- "key": "FFmpegExtractAudio",
292
- "preferredcodec": "mp3",
293
- "preferredquality": "192",
294
- }
295
- ],
296
- }
297
- x = yt_dlp.YoutubeDL(ydl_optssx)
298
- x.download([link])
299
-
300
- if songvideo:
301
- await loop.run_in_executor(None, song_video_dl)
302
- fpath = f"downloads/{title}.mp4"
303
- return fpath
304
- elif songaudio:
305
- await loop.run_in_executor(None, song_audio_dl)
306
- fpath = f"downloads/{title}.mp3"
307
- return fpath
308
- elif video:
309
- direct = True
310
- downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
311
- else:
312
- direct = True
313
- downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
314
-
315
- # Format check before thumbnail
316
- ext = os.path.splitext(downloaded_file)[1].lower().replace(".", "")
317
- if ext not in SUPPORTED_FORMATS:
318
- print(f"[ERROR] Unsupported format for thumbnail: {ext}")
319
- print("[INFO] Skipping thumbnail generation.")
320
- return downloaded_file, direct
321
 
322
- return downloaded_file, direct
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
 
324
- except Exception as e:
325
- print(f"[ERROR] Failed to download video: {e}")
326
- print(traceback.format_exc())
327
- return None, False
 
 
 
 
 
 
 
 
 
 
 
 
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
15
+
16
+ def time_to_seconds(time):
17
+ stringt = str(time)
18
+ return sum(int(x) * 60**i for i, x in enumerate(reversed(stringt.split(":"))))
19
+
20
 
21
  async def shell_cmd(cmd):
22
  proc = await asyncio.create_subprocess_shell(
 
33
  return out.decode("utf-8")
34
 
35
 
36
+
37
+ async def get_stream_url(query, video=False):
38
+ api_url = "http://46.250.243.87:1470/youtube"
39
+ api_key = "1a873582a7c83342f961cc0a177b2b26"
40
+
41
+ async with httpx.AsyncClient(timeout=60) as client:
42
+ params = {"query": query, "video": video, "api_key": api_key}
43
+ response = await client.get(api_url, params=params)
44
+ if response.status_code != 200:
45
+ return ""
46
+ info = response.json()
47
+ return info.get("stream_url")
48
+
49
+
50
+
51
  class YouTubeAPI:
52
  def __init__(self):
53
  self.base = "https://www.youtube.com/watch?v="
54
  self.regex = r"(?:youtube\.com|youtu\.be)"
55
  self.status = "https://www.youtube.com/oembed?url="
56
  self.listbase = "https://youtube.com/playlist?list="
57
+ self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
58
 
59
  async def exists(self, link: str, videoid: Union[bool, str] = None):
60
  if videoid:
61
  link = self.base + link
62
+ if re.search(self.regex, link):
63
+ return True
64
+ else:
65
+ return False
66
 
67
  async def url(self, message_1: Message) -> Union[str, None]:
68
  messages = [message_1]
 
99
  duration_min = result["duration"]
100
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
101
  vidid = result["id"]
102
+ if str(duration_min) == "None":
103
+ duration_sec = 0
104
+ else:
105
+ duration_sec = int(time_to_seconds(duration_min))
106
  return title, duration_min, duration_sec, thumbnail, vidid
107
 
108
  async def title(self, link: str, videoid: Union[bool, str] = None):
 
112
  link = link.split("&")[0]
113
  results = VideosSearch(link, limit=1)
114
  for result in (await results.next())["result"]:
115
+ title = result["title"]
116
+ return title
117
 
118
  async def duration(self, link: str, videoid: Union[bool, str] = None):
119
  if videoid:
 
122
  link = link.split("&")[0]
123
  results = VideosSearch(link, limit=1)
124
  for result in (await results.next())["result"]:
125
+ duration = result["duration"]
126
+ return duration
127
 
128
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
129
  if videoid:
 
132
  link = link.split("&")[0]
133
  results = VideosSearch(link, limit=1)
134
  for result in (await results.next())["result"]:
135
+ thumbnail = result["thumbnails"][0]["url"].split("?")[0]
136
+ return thumbnail
137
 
138
  async def video(self, link: str, videoid: Union[bool, str] = None):
139
  if videoid:
140
  link = self.base + link
141
  if "&" in link:
142
  link = link.split("&")[0]
143
+
144
+ return await get_stream_url(link, True)
145
+
 
 
 
 
 
 
 
 
 
 
 
146
 
147
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
148
  if videoid:
 
154
  )
155
  try:
156
  result = playlist.split("\n")
157
+ for key in result:
158
+ if key == "":
159
+ result.remove(key)
160
  except:
161
  result = []
162
  return result
 
168
  link = link.split("&")[0]
169
  results = VideosSearch(link, limit=1)
170
  for result in (await results.next())["result"]:
171
+ title = result["title"]
172
+ duration_min = result["duration"]
173
+ vidid = result["id"]
174
+ yturl = result["link"]
175
+ thumbnail = result["thumbnails"][0]["url"].split("?")[0]
176
+ track_details = {
177
+ "title": title,
178
+ "link": yturl,
179
+ "vidid": vidid,
180
+ "duration_min": duration_min,
181
+ "thumb": thumbnail,
182
+ }
183
+ return track_details, vidid
184
 
185
  async def formats(self, link: str, videoid: Union[bool, str] = None):
186
  if videoid:
 
197
  str(format["format"])
198
  except:
199
  continue
200
+ if not "dash" in str(format["format"]).lower():
201
  try:
202
+ format["format"]
203
+ format["filesize"]
204
+ format["format_id"]
205
+ format["ext"]
206
+ format["format_note"]
 
 
 
 
 
207
  except:
208
  continue
209
+ formats_available.append(
210
+ {
211
+ "format": format["format"],
212
+ "filesize": format["filesize"],
213
+ "format_id": format["format_id"],
214
+ "ext": format["ext"],
215
+ "format_note": format["format_note"],
216
+ "yturl": link,
217
+ }
218
+ )
219
  return formats_available, link
220
 
221
+ async def slider(
222
+ self,
223
+ link: str,
224
+ query_type: int,
225
+ videoid: Union[bool, str] = None,
226
+ ):
227
  if videoid:
228
  link = self.base + link
229
  if "&" in link:
 
232
  result = (await a.next()).get("result")
233
  title = result[query_type]["title"]
234
  duration_min = result[query_type]["duration"]
235
+ vidid = result[query_type]["id"]
236
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
237
+ return title, duration_min, thumbnail, vidid
238
 
239
+ async def download(
240
+ self,
241
+ link: str,
242
+ mystic,
243
+ video: Union[bool, str] = None,
244
+ videoid: Union[bool, str] = None,
245
+ songaudio: Union[bool, str] = None,
246
+ songvideo: Union[bool, str] = None,
247
+ format_id: Union[bool, str] = None,
248
+ title: Union[bool, str] = None,
249
+ ) -> str:
 
 
 
 
 
 
 
 
250
  if videoid:
251
  link = self.base + link
 
 
252
  loop = asyncio.get_running_loop()
 
 
253
 
254
+ def audio_dl():
255
+ ydl_optssx = {
256
+ "format": "bestaudio/best",
257
+ "outtmpl": "downloads/%(id)s.%(ext)s",
258
+ "geo_bypass": True,
259
+ "nocheckcertificate": True,
260
+ "quiet": True,
261
+ "no_warnings": True,
262
+ }
263
+ x = yt_dlp.YoutubeDL(ydl_optssx)
264
+ info = x.extract_info(link, False)
265
+ xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
266
+ if os.path.exists(xyz):
 
267
  return xyz
268
+ x.download([link])
269
+ return xyz
270
 
271
+ def video_dl():
272
+ ydl_optssx = {
273
+ "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
274
+ "outtmpl": "downloads/%(id)s.%(ext)s",
275
+ "geo_bypass": True,
276
+ "nocheckcertificate": True,
277
+ "quiet": True,
278
+ "no_warnings": True,
279
+ }
280
+ x = yt_dlp.YoutubeDL(ydl_optssx)
281
+ info = x.extract_info(link, False)
282
+ xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
283
+ if os.path.exists(xyz):
 
284
  return xyz
285
+ x.download([link])
286
+ return xyz
287
 
288
+ def song_video_dl():
289
+ formats = f"{format_id}+140"
290
+ fpath = f"downloads/{title}"
291
+ ydl_optssx = {
292
+ "format": formats,
293
+ "outtmpl": fpath,
294
+ "geo_bypass": True,
295
+ "nocheckcertificate": True,
296
+ "quiet": True,
297
+ "no_warnings": True,
298
+ "prefer_ffmpeg": True,
299
+ "merge_output_format": "mp4",
300
+ }
301
+ x = yt_dlp.YoutubeDL(ydl_optssx)
302
+ x.download([link])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
303
 
304
+ def song_audio_dl():
305
+ fpath = f"downloads/{title}.%(ext)s"
306
+ ydl_optssx = {
307
+ "format": format_id,
308
+ "outtmpl": fpath,
309
+ "geo_bypass": True,
310
+ "nocheckcertificate": True,
311
+ "quiet": True,
312
+ "no_warnings": True,
313
+ "prefer_ffmpeg": True,
314
+ "postprocessors": [
315
+ {
316
+ "key": "FFmpegExtractAudio",
317
+ "preferredcodec": "mp3",
318
+ "preferredquality": "192",
319
+ }
320
+ ],
321
+ }
322
+ x = yt_dlp.YoutubeDL(ydl_optssx)
323
+ x.download([link])
324
 
325
+ if songvideo:
326
+ await loop.run_in_executor(None, song_video_dl)
327
+ fpath = f"downloads/{title}.mp4"
328
+ return fpath
329
+ elif songaudio:
330
+ await loop.run_in_executor(None, song_audio_dl)
331
+ fpath = f"downloads/{title}.mp3"
332
+ return fpath
333
+ elif video:
334
+ downloaded_file = await get_stream_url(link, True)
335
+ direct = None
336
+ else:
337
+ direct = None
338
+ downloaded_file = await get_stream_url(link, False)
339
+ return downloaded_file, direct