dragonxd1 commited on
Commit
2dc590e
·
verified ·
1 Parent(s): 265223e

Update DragMusic/platforms/Youtube.py

Browse files
Files changed (1) hide show
  1. DragMusic/platforms/Youtube.py +157 -169
DragMusic/platforms/Youtube.py CHANGED
@@ -12,11 +12,7 @@ from youtubesearchpython.__future__ import VideosSearch
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
15
-
16
- def time_to_seconds(time):
17
- stringt = str(time)
18
- return sum(int(x) * 60**i for i, x in enumerate(reversed(stringt.split(":"))))
19
-
20
 
21
  async def shell_cmd(cmd):
22
  proc = await asyncio.create_subprocess_shell(
@@ -33,36 +29,18 @@ async def shell_cmd(cmd):
33
  return out.decode("utf-8")
34
 
35
 
36
-
37
- async def get_stream_url(query, video=False):
38
- api_url = "http://46.250.243.87:1470/youtube"
39
- api_key = "1a873582a7c83342f961cc0a177b2b26"
40
-
41
- async with httpx.AsyncClient(timeout=60) as client:
42
- params = {"query": query, "video": video, "api_key": api_key}
43
- response = await client.get(api_url, params=params)
44
- if response.status_code != 200:
45
- return ""
46
- info = response.json()
47
- return info.get("stream_url")
48
-
49
-
50
-
51
  class YouTubeAPI:
52
  def __init__(self):
53
  self.base = "https://www.youtube.com/watch?v="
54
  self.regex = r"(?:youtube\.com|youtu\.be)"
55
  self.status = "https://www.youtube.com/oembed?url="
56
  self.listbase = "https://youtube.com/playlist?list="
57
- self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
58
 
59
  async def exists(self, link: str, videoid: Union[bool, str] = None):
60
  if videoid:
61
  link = self.base + link
62
- if re.search(self.regex, link):
63
- return True
64
- else:
65
- return False
66
 
67
  async def url(self, message_1: Message) -> Union[str, None]:
68
  messages = [message_1]
@@ -99,10 +77,7 @@ class YouTubeAPI:
99
  duration_min = result["duration"]
100
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
101
  vidid = result["id"]
102
- if str(duration_min) == "None":
103
- duration_sec = 0
104
- else:
105
- duration_sec = int(time_to_seconds(duration_min))
106
  return title, duration_min, duration_sec, thumbnail, vidid
107
 
108
  async def title(self, link: str, videoid: Union[bool, str] = None):
@@ -112,8 +87,7 @@ class YouTubeAPI:
112
  link = link.split("&")[0]
113
  results = VideosSearch(link, limit=1)
114
  for result in (await results.next())["result"]:
115
- title = result["title"]
116
- return title
117
 
118
  async def duration(self, link: str, videoid: Union[bool, str] = None):
119
  if videoid:
@@ -122,8 +96,7 @@ class YouTubeAPI:
122
  link = link.split("&")[0]
123
  results = VideosSearch(link, limit=1)
124
  for result in (await results.next())["result"]:
125
- duration = result["duration"]
126
- return duration
127
 
128
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
129
  if videoid:
@@ -132,17 +105,27 @@ class YouTubeAPI:
132
  link = link.split("&")[0]
133
  results = VideosSearch(link, limit=1)
134
  for result in (await results.next())["result"]:
135
- thumbnail = result["thumbnails"][0]["url"].split("?")[0]
136
- return thumbnail
137
 
138
  async def video(self, link: str, videoid: Union[bool, str] = None):
139
  if videoid:
140
  link = self.base + link
141
  if "&" in link:
142
  link = link.split("&")[0]
143
-
144
- return await get_stream_url(link, True)
145
-
 
 
 
 
 
 
 
 
 
 
 
146
 
147
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
148
  if videoid:
@@ -154,9 +137,7 @@ class YouTubeAPI:
154
  )
155
  try:
156
  result = playlist.split("\n")
157
- for key in result:
158
- if key == "":
159
- result.remove(key)
160
  except:
161
  result = []
162
  return result
@@ -168,19 +149,14 @@ class YouTubeAPI:
168
  link = link.split("&")[0]
169
  results = VideosSearch(link, limit=1)
170
  for result in (await results.next())["result"]:
171
- title = result["title"]
172
- duration_min = result["duration"]
173
- vidid = result["id"]
174
- yturl = result["link"]
175
- thumbnail = result["thumbnails"][0]["url"].split("?")[0]
176
- track_details = {
177
- "title": title,
178
- "link": yturl,
179
- "vidid": vidid,
180
- "duration_min": duration_min,
181
- "thumb": thumbnail,
182
- }
183
- return track_details, vidid
184
 
185
  async def formats(self, link: str, videoid: Union[bool, str] = None):
186
  if videoid:
@@ -197,33 +173,23 @@ class YouTubeAPI:
197
  str(format["format"])
198
  except:
199
  continue
200
- if not "dash" in str(format["format"]).lower():
201
  try:
202
- format["format"]
203
- format["filesize"]
204
- format["format_id"]
205
- format["ext"]
206
- format["format_note"]
 
 
 
 
 
207
  except:
208
  continue
209
- formats_available.append(
210
- {
211
- "format": format["format"],
212
- "filesize": format["filesize"],
213
- "format_id": format["format_id"],
214
- "ext": format["ext"],
215
- "format_note": format["format_note"],
216
- "yturl": link,
217
- }
218
- )
219
  return formats_available, link
220
 
221
- async def slider(
222
- self,
223
- link: str,
224
- query_type: int,
225
- videoid: Union[bool, str] = None,
226
- ):
227
  if videoid:
228
  link = self.base + link
229
  if "&" in link:
@@ -232,108 +198,130 @@ class YouTubeAPI:
232
  result = (await a.next()).get("result")
233
  title = result[query_type]["title"]
234
  duration_min = result[query_type]["duration"]
235
- vidid = result[query_type]["id"]
236
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
237
- return title, duration_min, thumbnail, vidid
238
 
239
- async def download(
240
- self,
241
- link: str,
242
- mystic,
243
- video: Union[bool, str] = None,
244
- videoid: Union[bool, str] = None,
245
- songaudio: Union[bool, str] = None,
246
- songvideo: Union[bool, str] = None,
247
- format_id: Union[bool, str] = None,
248
- title: Union[bool, str] = None,
249
- ) -> str:
 
 
 
 
 
 
 
 
250
  if videoid:
251
  link = self.base + link
 
 
252
  loop = asyncio.get_running_loop()
 
 
253
 
254
- def audio_dl():
255
- ydl_optssx = {
256
- "format": "bestaudio/best",
257
- "outtmpl": "downloads/%(id)s.%(ext)s",
258
- "geo_bypass": True,
259
- "nocheckcertificate": True,
260
- "quiet": True,
261
- "no_warnings": True,
262
- }
263
- x = yt_dlp.YoutubeDL(ydl_optssx)
264
- info = x.extract_info(link, False)
265
- xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
266
- if os.path.exists(xyz):
 
267
  return xyz
268
- x.download([link])
269
- return xyz
270
 
271
- def video_dl():
272
- ydl_optssx = {
273
- "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
274
- "outtmpl": "downloads/%(id)s.%(ext)s",
275
- "geo_bypass": True,
276
- "nocheckcertificate": True,
277
- "quiet": True,
278
- "no_warnings": True,
279
- }
280
- x = yt_dlp.YoutubeDL(ydl_optssx)
281
- info = x.extract_info(link, False)
282
- xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
283
- if os.path.exists(xyz):
 
284
  return xyz
285
- x.download([link])
286
- return xyz
287
 
288
- def song_video_dl():
289
- formats = f"{format_id}+140"
290
- fpath = f"downloads/{title}"
291
- ydl_optssx = {
292
- "format": formats,
293
- "outtmpl": fpath,
294
- "geo_bypass": True,
295
- "nocheckcertificate": True,
296
- "quiet": True,
297
- "no_warnings": True,
298
- "prefer_ffmpeg": True,
299
- "merge_output_format": "mp4",
300
- }
301
- x = yt_dlp.YoutubeDL(ydl_optssx)
302
- x.download([link])
303
 
304
- def song_audio_dl():
305
- fpath = f"downloads/{title}.%(ext)s"
306
- ydl_optssx = {
307
- "format": format_id,
308
- "outtmpl": fpath,
309
- "geo_bypass": True,
310
- "nocheckcertificate": True,
311
- "quiet": True,
312
- "no_warnings": True,
313
- "prefer_ffmpeg": True,
314
- "postprocessors": [
315
- {
316
- "key": "FFmpegExtractAudio",
317
- "preferredcodec": "mp3",
318
- "preferredquality": "192",
319
- }
320
- ],
321
- }
322
- x = yt_dlp.YoutubeDL(ydl_optssx)
323
- x.download([link])
324
 
325
- if songvideo:
326
- await loop.run_in_executor(None, song_video_dl)
327
- fpath = f"downloads/{title}.mp4"
328
- return fpath
329
- elif songaudio:
330
- await loop.run_in_executor(None, song_audio_dl)
331
- fpath = f"downloads/{title}.mp3"
332
- return fpath
333
- elif video:
334
- downloaded_file = await get_stream_url(link, True)
335
- direct = None
336
- else:
337
- direct = None
338
- downloaded_file = await get_stream_url(link, False)
339
- return downloaded_file, direct
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
15
+ SUPPORTED_FORMATS = {"mp4", "mkv", "webm", "avi", "mov"} # formats that ffmpeg can handle
 
 
 
 
16
 
17
  async def shell_cmd(cmd):
18
  proc = await asyncio.create_subprocess_shell(
 
29
  return out.decode("utf-8")
30
 
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  class YouTubeAPI:
33
  def __init__(self):
34
  self.base = "https://www.youtube.com/watch?v="
35
  self.regex = r"(?:youtube\.com|youtu\.be)"
36
  self.status = "https://www.youtube.com/oembed?url="
37
  self.listbase = "https://youtube.com/playlist?list="
38
+ self.reg = re.compile(r"\\x1B(?:[@-Z\\-_]|\\[[0-?]*[ -/]*[@-~])")
39
 
40
  async def exists(self, link: str, videoid: Union[bool, str] = None):
41
  if videoid:
42
  link = self.base + link
43
+ return bool(re.search(self.regex, link))
 
 
 
44
 
45
  async def url(self, message_1: Message) -> Union[str, None]:
46
  messages = [message_1]
 
77
  duration_min = result["duration"]
78
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
79
  vidid = result["id"]
80
+ duration_sec = 0 if str(duration_min) == "None" else int(time_to_seconds(duration_min))
 
 
 
81
  return title, duration_min, duration_sec, thumbnail, vidid
82
 
83
  async def title(self, link: str, videoid: Union[bool, str] = None):
 
87
  link = link.split("&")[0]
88
  results = VideosSearch(link, limit=1)
89
  for result in (await results.next())["result"]:
90
+ return result["title"]
 
91
 
92
  async def duration(self, link: str, videoid: Union[bool, str] = None):
93
  if videoid:
 
96
  link = link.split("&")[0]
97
  results = VideosSearch(link, limit=1)
98
  for result in (await results.next())["result"]:
99
+ return result["duration"]
 
100
 
101
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
102
  if videoid:
 
105
  link = link.split("&")[0]
106
  results = VideosSearch(link, limit=1)
107
  for result in (await results.next())["result"]:
108
+ return result["thumbnails"][0]["url"].split("?")[0]
 
109
 
110
  async def video(self, link: str, videoid: Union[bool, str] = None):
111
  if videoid:
112
  link = self.base + link
113
  if "&" in link:
114
  link = link.split("&")[0]
115
+ proc = await asyncio.create_subprocess_exec(
116
+ "yt-dlp",
117
+ "-g",
118
+ "-f",
119
+ "best[height<=?720][width<=?1280]",
120
+ f"{link}",
121
+ stdout=asyncio.subprocess.PIPE,
122
+ stderr=asyncio.subprocess.PIPE,
123
+ )
124
+ stdout, stderr = await proc.communicate()
125
+ if stdout:
126
+ return 1, stdout.decode().split("\n")[0]
127
+ else:
128
+ return 0, stderr.decode()
129
 
130
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
131
  if videoid:
 
137
  )
138
  try:
139
  result = playlist.split("\n")
140
+ result = [x for x in result if x.strip() != ""]
 
 
141
  except:
142
  result = []
143
  return result
 
149
  link = link.split("&")[0]
150
  results = VideosSearch(link, limit=1)
151
  for result in (await results.next())["result"]:
152
+ track_details = {
153
+ "title": result["title"],
154
+ "link": result["link"],
155
+ "vidid": result["id"],
156
+ "duration_min": result["duration"],
157
+ "thumb": result["thumbnails"][0]["url"].split("?")[0],
158
+ }
159
+ return track_details, result["id"]
 
 
 
 
 
160
 
161
  async def formats(self, link: str, videoid: Union[bool, str] = None):
162
  if videoid:
 
173
  str(format["format"])
174
  except:
175
  continue
176
+ if "dash" not in str(format["format"]).lower():
177
  try:
178
+ formats_available.append(
179
+ {
180
+ "format": format["format"],
181
+ "filesize": format["filesize"],
182
+ "format_id": format["format_id"],
183
+ "ext": format["ext"],
184
+ "format_note": format["format_note"],
185
+ "yturl": link,
186
+ }
187
+ )
188
  except:
189
  continue
 
 
 
 
 
 
 
 
 
 
190
  return formats_available, link
191
 
192
+ async def slider(self, link: str, query_type: int, videoid: Union[bool, str] = None):
 
 
 
 
 
193
  if videoid:
194
  link = self.base + link
195
  if "&" in link:
 
198
  result = (await a.next()).get("result")
199
  title = result[query_type]["title"]
200
  duration_min = result[query_type]["duration"]
 
201
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
202
+ return title, duration_min, thumbnail, result[query_type]["id"]
203
 
204
+ async def get_video_info_from_bitflow(self, url: str, video: bool):
205
+ api_url = "https://bitflow.in/api/youtube"
206
+ params = {
207
+ "query": url,
208
+ "format": "video" if video else "audio",
209
+ "download": True,
210
+ "api_key": "1spiderkey2"
211
+ }
212
+ async with httpx.AsyncClient() as client:
213
+ response = await client.get(api_url, params=params, timeout=150)
214
+ if response.status_code == 200:
215
+ return response.json()
216
+ else:
217
+ return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
218
+
219
+ async def download(self, link: str, mystic, video: Union[bool, str] = None,
220
+ videoid: Union[bool, str] = None, songaudio: Union[bool, str] = None,
221
+ songvideo: Union[bool, str] = None, format_id: Union[bool, str] = None,
222
+ title: Union[bool, str] = None) -> str:
223
  if videoid:
224
  link = self.base + link
225
+ if "&" in link:
226
+ link = link.split("&")[0]
227
  loop = asyncio.get_running_loop()
228
+ try:
229
+ bitflow_info = await self.get_video_info_from_bitflow(link, video)
230
 
231
+ def audio_dl(bitflow_info):
232
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
233
+ ydl_optssx = {
234
+ "format": "bestaudio/best",
235
+ "outtmpl": xyz,
236
+ "geo_bypass": True,
237
+ "nocheckcertificate": True,
238
+ "quiet": True,
239
+ "no_warnings": True,
240
+ }
241
+ x = yt_dlp.YoutubeDL(ydl_optssx)
242
+ if os.path.exists(xyz):
243
+ return xyz
244
+ x.download([bitflow_info['url']])
245
  return xyz
 
 
246
 
247
+ def video_dl(bitflow_info):
248
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
249
+ ydl_optssx = {
250
+ "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
251
+ "outtmpl": xyz,
252
+ "geo_bypass": True,
253
+ "nocheckcertificate": True,
254
+ "quiet": True,
255
+ "no_warnings": True,
256
+ }
257
+ x = yt_dlp.YoutubeDL(ydl_optssx)
258
+ if os.path.exists(xyz):
259
+ return xyz
260
+ x.download([bitflow_info['url']])
261
  return xyz
 
 
262
 
263
+ def song_video_dl():
264
+ formats = f"{format_id}+140"
265
+ fpath = f"downloads/{title}"
266
+ ydl_optssx = {
267
+ "format": formats,
268
+ "outtmpl": fpath,
269
+ "geo_bypass": True,
270
+ "nocheckcertificate": True,
271
+ "quiet": True,
272
+ "no_warnings": True,
273
+ "prefer_ffmpeg": True,
274
+ "merge_output_format": "mp4",
275
+ }
276
+ x = yt_dlp.YoutubeDL(ydl_optssx)
277
+ x.download([link])
278
 
279
+ def song_audio_dl():
280
+ fpath = f"downloads/{title}.%(ext)s"
281
+ ydl_optssx = {
282
+ "format": format_id,
283
+ "outtmpl": fpath,
284
+ "geo_bypass": True,
285
+ "nocheckcertificate": True,
286
+ "quiet": True,
287
+ "no_warnings": True,
288
+ "prefer_ffmpeg": True,
289
+ "postprocessors": [
290
+ {
291
+ "key": "FFmpegExtractAudio",
292
+ "preferredcodec": "mp3",
293
+ "preferredquality": "192",
294
+ }
295
+ ],
296
+ }
297
+ x = yt_dlp.YoutubeDL(ydl_optssx)
298
+ x.download([link])
299
 
300
+ if songvideo:
301
+ await loop.run_in_executor(None, song_video_dl)
302
+ fpath = f"downloads/{title}.mp4"
303
+ return fpath
304
+ elif songaudio:
305
+ await loop.run_in_executor(None, song_audio_dl)
306
+ fpath = f"downloads/{title}.mp3"
307
+ return fpath
308
+ elif video:
309
+ direct = True
310
+ downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
311
+ else:
312
+ direct = True
313
+ downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
314
+
315
+ # Format check before thumbnail
316
+ ext = os.path.splitext(downloaded_file)[1].lower().replace(".", "")
317
+ if ext not in SUPPORTED_FORMATS:
318
+ print(f"[ERROR] Unsupported format for thumbnail: {ext}")
319
+ print("[INFO] Skipping thumbnail generation.")
320
+ return downloaded_file, direct
321
+
322
+ return downloaded_file, direct
323
+
324
+ except Exception as e:
325
+ print(f"[ERROR] Failed to download video: {e}")
326
+ print(traceback.format_exc())
327
+ return None, False