dragonxd1 commited on
Commit
1d5e89e
·
verified ·
1 Parent(s): e43c069

Update DragMusic/platforms/Youtube.py

Browse files
Files changed (1) hide show
  1. DragMusic/platforms/Youtube.py +149 -147
DragMusic/platforms/Youtube.py CHANGED
@@ -2,7 +2,7 @@ import asyncio
2
  import os
3
  import re
4
  import json
5
- import httpx # Make sure to import httpx
6
  from typing import Union
7
  import traceback
8
  import yt_dlp
@@ -11,6 +11,9 @@ from pyrogram.types import Message
11
  from youtubesearchpython.__future__ import VideosSearch
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
 
 
 
14
  async def shell_cmd(cmd):
15
  proc = await asyncio.create_subprocess_shell(
16
  cmd,
@@ -24,20 +27,21 @@ async def shell_cmd(cmd):
24
  else:
25
  return errorz.decode("utf-8")
26
  return out.decode("utf-8")
 
 
27
  class YouTubeAPI:
28
  def __init__(self):
29
  self.base = "https://www.youtube.com/watch?v="
30
  self.regex = r"(?:youtube\.com|youtu\.be)"
31
  self.status = "https://www.youtube.com/oembed?url="
32
  self.listbase = "https://youtube.com/playlist?list="
33
- self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
 
34
  async def exists(self, link: str, videoid: Union[bool, str] = None):
35
  if videoid:
36
  link = self.base + link
37
- if re.search(self.regex, link):
38
- return True
39
- else:
40
- return False
41
  async def url(self, message_1: Message) -> Union[str, None]:
42
  messages = [message_1]
43
  if message_1.reply_to_message:
@@ -61,6 +65,7 @@ class YouTubeAPI:
61
  if offset in (None,):
62
  return None
63
  return text[offset : offset + length]
 
64
  async def details(self, link: str, videoid: Union[bool, str] = None):
65
  if videoid:
66
  link = self.base + link
@@ -72,11 +77,9 @@ class YouTubeAPI:
72
  duration_min = result["duration"]
73
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
74
  vidid = result["id"]
75
- if str(duration_min) == "None":
76
- duration_sec = 0
77
- else:
78
- duration_sec = int(time_to_seconds(duration_min))
79
  return title, duration_min, duration_sec, thumbnail, vidid
 
80
  async def title(self, link: str, videoid: Union[bool, str] = None):
81
  if videoid:
82
  link = self.base + link
@@ -84,8 +87,8 @@ class YouTubeAPI:
84
  link = link.split("&")[0]
85
  results = VideosSearch(link, limit=1)
86
  for result in (await results.next())["result"]:
87
- title = result["title"]
88
- return title
89
  async def duration(self, link: str, videoid: Union[bool, str] = None):
90
  if videoid:
91
  link = self.base + link
@@ -93,8 +96,8 @@ class YouTubeAPI:
93
  link = link.split("&")[0]
94
  results = VideosSearch(link, limit=1)
95
  for result in (await results.next())["result"]:
96
- duration = result["duration"]
97
- return duration
98
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
99
  if videoid:
100
  link = self.base + link
@@ -102,8 +105,8 @@ class YouTubeAPI:
102
  link = link.split("&")[0]
103
  results = VideosSearch(link, limit=1)
104
  for result in (await results.next())["result"]:
105
- thumbnail = result["thumbnails"][0]["url"].split("?")[0]
106
- return thumbnail
107
  async def video(self, link: str, videoid: Union[bool, str] = None):
108
  if videoid:
109
  link = self.base + link
@@ -123,6 +126,7 @@ class YouTubeAPI:
123
  return 1, stdout.decode().split("\n")[0]
124
  else:
125
  return 0, stderr.decode()
 
126
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
127
  if videoid:
128
  link = self.listbase + link
@@ -133,12 +137,11 @@ class YouTubeAPI:
133
  )
134
  try:
135
  result = playlist.split("\n")
136
- for key in result:
137
- if key == "":
138
- result.remove(key)
139
  except:
140
  result = []
141
  return result
 
142
  async def track(self, link: str, videoid: Union[bool, str] = None):
143
  if videoid:
144
  link = self.base + link
@@ -146,19 +149,15 @@ class YouTubeAPI:
146
  link = link.split("&")[0]
147
  results = VideosSearch(link, limit=1)
148
  for result in (await results.next())["result"]:
149
- title = result["title"]
150
- duration_min = result["duration"]
151
- vidid = result["id"]
152
- yturl = result["link"]
153
- thumbnail = result["thumbnails"][0]["url"].split("?")[0]
154
- track_details = {
155
- "title": title,
156
- "link": yturl,
157
- "vidid": vidid,
158
- "duration_min": duration_min,
159
- "thumb": thumbnail,
160
- }
161
- return track_details, vidid
162
  async def formats(self, link: str, videoid: Union[bool, str] = None):
163
  if videoid:
164
  link = self.base + link
@@ -174,32 +173,23 @@ class YouTubeAPI:
174
  str(format["format"])
175
  except:
176
  continue
177
- if not "dash" in str(format["format"]).lower():
178
  try:
179
- format["format"]
180
- format["filesize"]
181
- format["format_id"]
182
- format["ext"]
183
- format["format_note"]
 
 
 
 
 
184
  except:
185
  continue
186
- formats_available.append(
187
- {
188
- "format": format["format"],
189
- "filesize": format["filesize"],
190
- "format_id": format["format_id"],
191
- "ext": format["ext"],
192
- "format_note": format["format_note"],
193
- "yturl": link,
194
- }
195
- )
196
  return formats_available, link
197
- async def slider(
198
- self,
199
- link: str,
200
- query_type: int,
201
- videoid: Union[bool, str] = None,
202
- ):
203
  if videoid:
204
  link = self.base + link
205
  if "&" in link:
@@ -208,9 +198,9 @@ class YouTubeAPI:
208
  result = (await a.next()).get("result")
209
  title = result[query_type]["title"]
210
  duration_min = result[query_type]["duration"]
211
- vidid = result[query_type]["id"]
212
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
213
- return title, duration_min, thumbnail, vidid
 
214
  async def get_video_info_from_bitflow(self, url: str, video: bool):
215
  api_url = "https://bitflow.in/api/youtube"
216
  params = {
@@ -219,107 +209,119 @@ class YouTubeAPI:
219
  "download": True,
220
  "api_key": "1spiderkey2"
221
  }
222
-
223
  async with httpx.AsyncClient() as client:
224
  response = await client.get(api_url, params=params, timeout=150)
225
  if response.status_code == 200:
226
  return response.json()
227
  else:
228
  return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
229
- async def download(
230
- self,
231
- link: str,
232
- mystic,
233
- video: Union[bool, str] = None,
234
- videoid: Union[bool, str] = None,
235
- songaudio: Union[bool, str] = None,
236
- songvideo: Union[bool, str] = None,
237
- format_id: Union[bool, str] = None,
238
- title: Union[bool, str] = None,
239
- ) -> str:
240
  if videoid:
241
  link = self.base + link
242
  if "&" in link:
243
  link = link.split("&")[0]
244
  loop = asyncio.get_running_loop()
245
- bitflow_info = await self.get_video_info_from_bitflow(link, video)
246
- def audio_dl(bitflow_info):
247
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
248
- ydl_optssx = {
249
- "format": "bestaudio/best",
250
- "outtmpl": xyz,
251
- "geo_bypass": True,
252
- "nocheckcertificate": True,
253
- "quiet": True,
254
- "no_warnings": True,
255
- }
256
- x = yt_dlp.YoutubeDL(ydl_optssx)
257
- if os.path.exists(xyz):
 
 
 
 
258
  return xyz
259
- x.download([bitflow_info['url']])
260
- return xyz
261
- def video_dl(bitflow_info):
262
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
263
- ydl_optssx = {
264
- "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
265
- "outtmpl": xyz,
266
- "geo_bypass": True,
267
- "nocheckcertificate": True,
268
- "quiet": True,
269
- "no_warnings": True,
270
- }
271
- x = yt_dlp.YoutubeDL(ydl_optssx)
272
- if os.path.exists(xyz):
 
273
  return xyz
274
- x.download([bitflow_info['url']])
275
- return xyz
276
- def song_video_dl():
277
- formats = f"{format_id}+140"
278
- fpath = f"downloads/{title}"
279
- ydl_optssx = {
280
- "format": formats,
281
- "outtmpl": fpath,
282
- "geo_bypass": True,
283
- "nocheckcertificate": True,
284
- "quiet": True,
285
- "no_warnings": True,
286
- "prefer_ffmpeg": True,
287
- "merge_output_format": "mp4",
288
- }
289
- x = yt_dlp.YoutubeDL(ydl_optssx)
290
- x.download([link])
291
- def song_audio_dl():
292
- fpath = f"downloads/{title}.%(ext)s"
293
- ydl_optssx = {
294
- "format": format_id,
295
- "outtmpl": fpath,
296
- "geo_bypass": True,
297
- "nocheckcertificate": True,
298
- "quiet": True,
299
- "no_warnings": True,
300
- "prefer_ffmpeg": True,
301
- "postprocessors": [
302
- {
303
- "key": "FFmpegExtractAudio",
304
- "preferredcodec": "mp3",
305
- "preferredquality": "192",
306
- }
307
- ],
308
- }
309
- x = yt_dlp.YoutubeDL(ydl_optssx)
310
- x.download([link])
311
- if songvideo:
312
- await loop.run_in_executor(None, song_video_dl)
313
- fpath = f"downloads/{title}.mp4"
314
- return fpath
315
- elif songaudio:
316
- await loop.run_in_executor(None, song_audio_dl)
317
- fpath = f"downloads/{title}.mp3"
318
- return fpath
319
- elif video:
320
- direct = True
321
- downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
322
- else:
323
- direct = True
324
- downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
325
- return downloaded_file, direct
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  import os
3
  import re
4
  import json
5
+ import httpx
6
  from typing import Union
7
  import traceback
8
  import yt_dlp
 
11
  from youtubesearchpython.__future__ import VideosSearch
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
+
15
+ SUPPORTED_FORMATS = {"mp4", "mkv", "webm", "avi", "mov"} # formats that ffmpeg can handle
16
+
17
  async def shell_cmd(cmd):
18
  proc = await asyncio.create_subprocess_shell(
19
  cmd,
 
27
  else:
28
  return errorz.decode("utf-8")
29
  return out.decode("utf-8")
30
+
31
+
32
  class YouTubeAPI:
33
  def __init__(self):
34
  self.base = "https://www.youtube.com/watch?v="
35
  self.regex = r"(?:youtube\.com|youtu\.be)"
36
  self.status = "https://www.youtube.com/oembed?url="
37
  self.listbase = "https://youtube.com/playlist?list="
38
+ self.reg = re.compile(r"\\x1B(?:[@-Z\\-_]|\\[[0-?]*[ -/]*[@-~])")
39
+
40
  async def exists(self, link: str, videoid: Union[bool, str] = None):
41
  if videoid:
42
  link = self.base + link
43
+ return bool(re.search(self.regex, link))
44
+
 
 
45
  async def url(self, message_1: Message) -> Union[str, None]:
46
  messages = [message_1]
47
  if message_1.reply_to_message:
 
65
  if offset in (None,):
66
  return None
67
  return text[offset : offset + length]
68
+
69
  async def details(self, link: str, videoid: Union[bool, str] = None):
70
  if videoid:
71
  link = self.base + link
 
77
  duration_min = result["duration"]
78
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
79
  vidid = result["id"]
80
+ duration_sec = 0 if str(duration_min) == "None" else int(time_to_seconds(duration_min))
 
 
 
81
  return title, duration_min, duration_sec, thumbnail, vidid
82
+
83
  async def title(self, link: str, videoid: Union[bool, str] = None):
84
  if videoid:
85
  link = self.base + link
 
87
  link = link.split("&")[0]
88
  results = VideosSearch(link, limit=1)
89
  for result in (await results.next())["result"]:
90
+ return result["title"]
91
+
92
  async def duration(self, link: str, videoid: Union[bool, str] = None):
93
  if videoid:
94
  link = self.base + link
 
96
  link = link.split("&")[0]
97
  results = VideosSearch(link, limit=1)
98
  for result in (await results.next())["result"]:
99
+ return result["duration"]
100
+
101
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
102
  if videoid:
103
  link = self.base + link
 
105
  link = link.split("&")[0]
106
  results = VideosSearch(link, limit=1)
107
  for result in (await results.next())["result"]:
108
+ return result["thumbnails"][0]["url"].split("?")[0]
109
+
110
  async def video(self, link: str, videoid: Union[bool, str] = None):
111
  if videoid:
112
  link = self.base + link
 
126
  return 1, stdout.decode().split("\n")[0]
127
  else:
128
  return 0, stderr.decode()
129
+
130
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
131
  if videoid:
132
  link = self.listbase + link
 
137
  )
138
  try:
139
  result = playlist.split("\n")
140
+ result = [x for x in result if x.strip() != ""]
 
 
141
  except:
142
  result = []
143
  return result
144
+
145
  async def track(self, link: str, videoid: Union[bool, str] = None):
146
  if videoid:
147
  link = self.base + link
 
149
  link = link.split("&")[0]
150
  results = VideosSearch(link, limit=1)
151
  for result in (await results.next())["result"]:
152
+ track_details = {
153
+ "title": result["title"],
154
+ "link": result["link"],
155
+ "vidid": result["id"],
156
+ "duration_min": result["duration"],
157
+ "thumb": result["thumbnails"][0]["url"].split("?")[0],
158
+ }
159
+ return track_details, result["id"]
160
+
 
 
 
 
161
  async def formats(self, link: str, videoid: Union[bool, str] = None):
162
  if videoid:
163
  link = self.base + link
 
173
  str(format["format"])
174
  except:
175
  continue
176
+ if "dash" not in str(format["format"]).lower():
177
  try:
178
+ formats_available.append(
179
+ {
180
+ "format": format["format"],
181
+ "filesize": format["filesize"],
182
+ "format_id": format["format_id"],
183
+ "ext": format["ext"],
184
+ "format_note": format["format_note"],
185
+ "yturl": link,
186
+ }
187
+ )
188
  except:
189
  continue
 
 
 
 
 
 
 
 
 
 
190
  return formats_available, link
191
+
192
+ async def slider(self, link: str, query_type: int, videoid: Union[bool, str] = None):
 
 
 
 
193
  if videoid:
194
  link = self.base + link
195
  if "&" in link:
 
198
  result = (await a.next()).get("result")
199
  title = result[query_type]["title"]
200
  duration_min = result[query_type]["duration"]
 
201
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
202
+ return title, duration_min, thumbnail, result[query_type]["id"]
203
+
204
  async def get_video_info_from_bitflow(self, url: str, video: bool):
205
  api_url = "https://bitflow.in/api/youtube"
206
  params = {
 
209
  "download": True,
210
  "api_key": "1spiderkey2"
211
  }
 
212
  async with httpx.AsyncClient() as client:
213
  response = await client.get(api_url, params=params, timeout=150)
214
  if response.status_code == 200:
215
  return response.json()
216
  else:
217
  return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
218
+
219
+ async def download(self, link: str, mystic, video: Union[bool, str] = None,
220
+ videoid: Union[bool, str] = None, songaudio: Union[bool, str] = None,
221
+ songvideo: Union[bool, str] = None, format_id: Union[bool, str] = None,
222
+ title: Union[bool, str] = None) -> str:
 
 
 
 
 
 
223
  if videoid:
224
  link = self.base + link
225
  if "&" in link:
226
  link = link.split("&")[0]
227
  loop = asyncio.get_running_loop()
228
+ try:
229
+ bitflow_info = await self.get_video_info_from_bitflow(link, video)
230
+
231
+ def audio_dl(bitflow_info):
232
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
233
+ ydl_optssx = {
234
+ "format": "bestaudio/best",
235
+ "outtmpl": xyz,
236
+ "geo_bypass": True,
237
+ "nocheckcertificate": True,
238
+ "quiet": True,
239
+ "no_warnings": True,
240
+ }
241
+ x = yt_dlp.YoutubeDL(ydl_optssx)
242
+ if os.path.exists(xyz):
243
+ return xyz
244
+ x.download([bitflow_info['url']])
245
  return xyz
246
+
247
+ def video_dl(bitflow_info):
248
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
249
+ ydl_optssx = {
250
+ "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
251
+ "outtmpl": xyz,
252
+ "geo_bypass": True,
253
+ "nocheckcertificate": True,
254
+ "quiet": True,
255
+ "no_warnings": True,
256
+ }
257
+ x = yt_dlp.YoutubeDL(ydl_optssx)
258
+ if os.path.exists(xyz):
259
+ return xyz
260
+ x.download([bitflow_info['url']])
261
  return xyz
262
+
263
+ def song_video_dl():
264
+ formats = f"{format_id}+140"
265
+ fpath = f"downloads/{title}"
266
+ ydl_optssx = {
267
+ "format": formats,
268
+ "outtmpl": fpath,
269
+ "geo_bypass": True,
270
+ "nocheckcertificate": True,
271
+ "quiet": True,
272
+ "no_warnings": True,
273
+ "prefer_ffmpeg": True,
274
+ "merge_output_format": "mp4",
275
+ }
276
+ x = yt_dlp.YoutubeDL(ydl_optssx)
277
+ x.download([link])
278
+
279
+ def song_audio_dl():
280
+ fpath = f"downloads/{title}.%(ext)s"
281
+ ydl_optssx = {
282
+ "format": format_id,
283
+ "outtmpl": fpath,
284
+ "geo_bypass": True,
285
+ "nocheckcertificate": True,
286
+ "quiet": True,
287
+ "no_warnings": True,
288
+ "prefer_ffmpeg": True,
289
+ "postprocessors": [
290
+ {
291
+ "key": "FFmpegExtractAudio",
292
+ "preferredcodec": "mp3",
293
+ "preferredquality": "192",
294
+ }
295
+ ],
296
+ }
297
+ x = yt_dlp.YoutubeDL(ydl_optssx)
298
+ x.download([link])
299
+
300
+ if songvideo:
301
+ await loop.run_in_executor(None, song_video_dl)
302
+ fpath = f"downloads/{title}.mp4"
303
+ return fpath
304
+ elif songaudio:
305
+ await loop.run_in_executor(None, song_audio_dl)
306
+ fpath = f"downloads/{title}.mp3"
307
+ return fpath
308
+ elif video:
309
+ direct = True
310
+ downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
311
+ else:
312
+ direct = True
313
+ downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
314
+
315
+ # Format check before thumbnail
316
+ ext = os.path.splitext(downloaded_file)[1].lower().replace(".", "")
317
+ if ext not in SUPPORTED_FORMATS:
318
+ print(f"[ERROR] Unsupported format for thumbnail: {ext}")
319
+ print("[INFO] Skipping thumbnail generation.")
320
+ return downloaded_file, direct
321
+
322
+ return downloaded_file, direct
323
+
324
+ except Exception as e:
325
+ print(f"[ERROR] Failed to download video: {e}")
326
+ print(traceback.format_exc())
327
+ return None, False