dragonxd1 commited on
Commit
645154d
·
verified ·
1 Parent(s): 2dc590e

Update DragMusic/platforms/Youtube.py

Browse files
Files changed (1) hide show
  1. DragMusic/platforms/Youtube.py +147 -149
DragMusic/platforms/Youtube.py CHANGED
@@ -2,9 +2,8 @@ import asyncio
2
  import os
3
  import re
4
  import json
5
- import httpx
6
  from typing import Union
7
- import traceback
8
  import yt_dlp
9
  from pyrogram.enums import MessageEntityType
10
  from pyrogram.types import Message
@@ -12,8 +11,6 @@ from youtubesearchpython.__future__ import VideosSearch
12
  from DragMusic.utils.database import is_on_off
13
  from DragMusic.utils.formatters import time_to_seconds
14
 
15
- SUPPORTED_FORMATS = {"mp4", "mkv", "webm", "avi", "mov"} # formats that ffmpeg can handle
16
-
17
  async def shell_cmd(cmd):
18
  proc = await asyncio.create_subprocess_shell(
19
  cmd,
@@ -27,21 +24,20 @@ async def shell_cmd(cmd):
27
  else:
28
  return errorz.decode("utf-8")
29
  return out.decode("utf-8")
30
-
31
-
32
  class YouTubeAPI:
33
  def __init__(self):
34
  self.base = "https://www.youtube.com/watch?v="
35
  self.regex = r"(?:youtube\.com|youtu\.be)"
36
  self.status = "https://www.youtube.com/oembed?url="
37
  self.listbase = "https://youtube.com/playlist?list="
38
- self.reg = re.compile(r"\\x1B(?:[@-Z\\-_]|\\[[0-?]*[ -/]*[@-~])")
39
-
40
  async def exists(self, link: str, videoid: Union[bool, str] = None):
41
  if videoid:
42
  link = self.base + link
43
- return bool(re.search(self.regex, link))
44
-
 
 
45
  async def url(self, message_1: Message) -> Union[str, None]:
46
  messages = [message_1]
47
  if message_1.reply_to_message:
@@ -65,7 +61,6 @@ class YouTubeAPI:
65
  if offset in (None,):
66
  return None
67
  return text[offset : offset + length]
68
-
69
  async def details(self, link: str, videoid: Union[bool, str] = None):
70
  if videoid:
71
  link = self.base + link
@@ -77,9 +72,11 @@ class YouTubeAPI:
77
  duration_min = result["duration"]
78
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
79
  vidid = result["id"]
80
- duration_sec = 0 if str(duration_min) == "None" else int(time_to_seconds(duration_min))
 
 
 
81
  return title, duration_min, duration_sec, thumbnail, vidid
82
-
83
  async def title(self, link: str, videoid: Union[bool, str] = None):
84
  if videoid:
85
  link = self.base + link
@@ -87,8 +84,8 @@ class YouTubeAPI:
87
  link = link.split("&")[0]
88
  results = VideosSearch(link, limit=1)
89
  for result in (await results.next())["result"]:
90
- return result["title"]
91
-
92
  async def duration(self, link: str, videoid: Union[bool, str] = None):
93
  if videoid:
94
  link = self.base + link
@@ -96,8 +93,8 @@ class YouTubeAPI:
96
  link = link.split("&")[0]
97
  results = VideosSearch(link, limit=1)
98
  for result in (await results.next())["result"]:
99
- return result["duration"]
100
-
101
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
102
  if videoid:
103
  link = self.base + link
@@ -105,8 +102,8 @@ class YouTubeAPI:
105
  link = link.split("&")[0]
106
  results = VideosSearch(link, limit=1)
107
  for result in (await results.next())["result"]:
108
- return result["thumbnails"][0]["url"].split("?")[0]
109
-
110
  async def video(self, link: str, videoid: Union[bool, str] = None):
111
  if videoid:
112
  link = self.base + link
@@ -126,7 +123,6 @@ class YouTubeAPI:
126
  return 1, stdout.decode().split("\n")[0]
127
  else:
128
  return 0, stderr.decode()
129
-
130
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
131
  if videoid:
132
  link = self.listbase + link
@@ -137,11 +133,12 @@ class YouTubeAPI:
137
  )
138
  try:
139
  result = playlist.split("\n")
140
- result = [x for x in result if x.strip() != ""]
 
 
141
  except:
142
  result = []
143
  return result
144
-
145
  async def track(self, link: str, videoid: Union[bool, str] = None):
146
  if videoid:
147
  link = self.base + link
@@ -149,15 +146,19 @@ class YouTubeAPI:
149
  link = link.split("&")[0]
150
  results = VideosSearch(link, limit=1)
151
  for result in (await results.next())["result"]:
152
- track_details = {
153
- "title": result["title"],
154
- "link": result["link"],
155
- "vidid": result["id"],
156
- "duration_min": result["duration"],
157
- "thumb": result["thumbnails"][0]["url"].split("?")[0],
158
- }
159
- return track_details, result["id"]
160
-
 
 
 
 
161
  async def formats(self, link: str, videoid: Union[bool, str] = None):
162
  if videoid:
163
  link = self.base + link
@@ -173,23 +174,32 @@ class YouTubeAPI:
173
  str(format["format"])
174
  except:
175
  continue
176
- if "dash" not in str(format["format"]).lower():
177
  try:
178
- formats_available.append(
179
- {
180
- "format": format["format"],
181
- "filesize": format["filesize"],
182
- "format_id": format["format_id"],
183
- "ext": format["ext"],
184
- "format_note": format["format_note"],
185
- "yturl": link,
186
- }
187
- )
188
  except:
189
  continue
 
 
 
 
 
 
 
 
 
 
190
  return formats_available, link
191
-
192
- async def slider(self, link: str, query_type: int, videoid: Union[bool, str] = None):
 
 
 
 
193
  if videoid:
194
  link = self.base + link
195
  if "&" in link:
@@ -198,9 +208,9 @@ class YouTubeAPI:
198
  result = (await a.next()).get("result")
199
  title = result[query_type]["title"]
200
  duration_min = result[query_type]["duration"]
 
201
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
202
- return title, duration_min, thumbnail, result[query_type]["id"]
203
-
204
  async def get_video_info_from_bitflow(self, url: str, video: bool):
205
  api_url = "https://bitflow.in/api/youtube"
206
  params = {
@@ -209,119 +219,107 @@ class YouTubeAPI:
209
  "download": True,
210
  "api_key": "1spiderkey2"
211
  }
 
212
  async with httpx.AsyncClient() as client:
213
  response = await client.get(api_url, params=params, timeout=150)
214
  if response.status_code == 200:
215
  return response.json()
216
  else:
217
  return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
218
-
219
- async def download(self, link: str, mystic, video: Union[bool, str] = None,
220
- videoid: Union[bool, str] = None, songaudio: Union[bool, str] = None,
221
- songvideo: Union[bool, str] = None, format_id: Union[bool, str] = None,
222
- title: Union[bool, str] = None) -> str:
 
 
 
 
 
 
223
  if videoid:
224
  link = self.base + link
225
  if "&" in link:
226
  link = link.split("&")[0]
227
  loop = asyncio.get_running_loop()
228
- try:
229
- bitflow_info = await self.get_video_info_from_bitflow(link, video)
230
-
231
- def audio_dl(bitflow_info):
232
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
233
- ydl_optssx = {
234
- "format": "bestaudio/best",
235
- "outtmpl": xyz,
236
- "geo_bypass": True,
237
- "nocheckcertificate": True,
238
- "quiet": True,
239
- "no_warnings": True,
240
- }
241
- x = yt_dlp.YoutubeDL(ydl_optssx)
242
- if os.path.exists(xyz):
243
- return xyz
244
- x.download([bitflow_info['url']])
245
  return xyz
246
-
247
- def video_dl(bitflow_info):
248
- xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
249
- ydl_optssx = {
250
- "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
251
- "outtmpl": xyz,
252
- "geo_bypass": True,
253
- "nocheckcertificate": True,
254
- "quiet": True,
255
- "no_warnings": True,
256
- }
257
- x = yt_dlp.YoutubeDL(ydl_optssx)
258
- if os.path.exists(xyz):
259
- return xyz
260
- x.download([bitflow_info['url']])
261
  return xyz
262
-
263
- def song_video_dl():
264
- formats = f"{format_id}+140"
265
- fpath = f"downloads/{title}"
266
- ydl_optssx = {
267
- "format": formats,
268
- "outtmpl": fpath,
269
- "geo_bypass": True,
270
- "nocheckcertificate": True,
271
- "quiet": True,
272
- "no_warnings": True,
273
- "prefer_ffmpeg": True,
274
- "merge_output_format": "mp4",
275
- }
276
- x = yt_dlp.YoutubeDL(ydl_optssx)
277
- x.download([link])
278
-
279
- def song_audio_dl():
280
- fpath = f"downloads/{title}.%(ext)s"
281
- ydl_optssx = {
282
- "format": format_id,
283
- "outtmpl": fpath,
284
- "geo_bypass": True,
285
- "nocheckcertificate": True,
286
- "quiet": True,
287
- "no_warnings": True,
288
- "prefer_ffmpeg": True,
289
- "postprocessors": [
290
- {
291
- "key": "FFmpegExtractAudio",
292
- "preferredcodec": "mp3",
293
- "preferredquality": "192",
294
- }
295
- ],
296
- }
297
- x = yt_dlp.YoutubeDL(ydl_optssx)
298
- x.download([link])
299
-
300
- if songvideo:
301
- await loop.run_in_executor(None, song_video_dl)
302
- fpath = f"downloads/{title}.mp4"
303
- return fpath
304
- elif songaudio:
305
- await loop.run_in_executor(None, song_audio_dl)
306
- fpath = f"downloads/{title}.mp3"
307
- return fpath
308
- elif video:
309
- direct = True
310
- downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
311
- else:
312
- direct = True
313
- downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
314
-
315
- # Format check before thumbnail
316
- ext = os.path.splitext(downloaded_file)[1].lower().replace(".", "")
317
- if ext not in SUPPORTED_FORMATS:
318
- print(f"[ERROR] Unsupported format for thumbnail: {ext}")
319
- print("[INFO] Skipping thumbnail generation.")
320
- return downloaded_file, direct
321
-
322
- return downloaded_file, direct
323
-
324
- except Exception as e:
325
- print(f"[ERROR] Failed to download video: {e}")
326
- print(traceback.format_exc())
327
- return None, False
 
2
  import os
3
  import re
4
  import json
5
+ import httpx # Make sure to import httpx
6
  from typing import Union
 
7
  import yt_dlp
8
  from pyrogram.enums import MessageEntityType
9
  from pyrogram.types import Message
 
11
  from DragMusic.utils.database import is_on_off
12
  from DragMusic.utils.formatters import time_to_seconds
13
 
 
 
14
  async def shell_cmd(cmd):
15
  proc = await asyncio.create_subprocess_shell(
16
  cmd,
 
24
  else:
25
  return errorz.decode("utf-8")
26
  return out.decode("utf-8")
 
 
27
  class YouTubeAPI:
28
  def __init__(self):
29
  self.base = "https://www.youtube.com/watch?v="
30
  self.regex = r"(?:youtube\.com|youtu\.be)"
31
  self.status = "https://www.youtube.com/oembed?url="
32
  self.listbase = "https://youtube.com/playlist?list="
33
+ self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
 
34
  async def exists(self, link: str, videoid: Union[bool, str] = None):
35
  if videoid:
36
  link = self.base + link
37
+ if re.search(self.regex, link):
38
+ return True
39
+ else:
40
+ return False
41
  async def url(self, message_1: Message) -> Union[str, None]:
42
  messages = [message_1]
43
  if message_1.reply_to_message:
 
61
  if offset in (None,):
62
  return None
63
  return text[offset : offset + length]
 
64
  async def details(self, link: str, videoid: Union[bool, str] = None):
65
  if videoid:
66
  link = self.base + link
 
72
  duration_min = result["duration"]
73
  thumbnail = result["thumbnails"][0]["url"].split("?")[0]
74
  vidid = result["id"]
75
+ if str(duration_min) == "None":
76
+ duration_sec = 0
77
+ else:
78
+ duration_sec = int(time_to_seconds(duration_min))
79
  return title, duration_min, duration_sec, thumbnail, vidid
 
80
  async def title(self, link: str, videoid: Union[bool, str] = None):
81
  if videoid:
82
  link = self.base + link
 
84
  link = link.split("&")[0]
85
  results = VideosSearch(link, limit=1)
86
  for result in (await results.next())["result"]:
87
+ title = result["title"]
88
+ return title
89
  async def duration(self, link: str, videoid: Union[bool, str] = None):
90
  if videoid:
91
  link = self.base + link
 
93
  link = link.split("&")[0]
94
  results = VideosSearch(link, limit=1)
95
  for result in (await results.next())["result"]:
96
+ duration = result["duration"]
97
+ return duration
98
  async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
99
  if videoid:
100
  link = self.base + link
 
102
  link = link.split("&")[0]
103
  results = VideosSearch(link, limit=1)
104
  for result in (await results.next())["result"]:
105
+ thumbnail = result["thumbnails"][0]["url"].split("?")[0]
106
+ return thumbnail
107
  async def video(self, link: str, videoid: Union[bool, str] = None):
108
  if videoid:
109
  link = self.base + link
 
123
  return 1, stdout.decode().split("\n")[0]
124
  else:
125
  return 0, stderr.decode()
 
126
  async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
127
  if videoid:
128
  link = self.listbase + link
 
133
  )
134
  try:
135
  result = playlist.split("\n")
136
+ for key in result:
137
+ if key == "":
138
+ result.remove(key)
139
  except:
140
  result = []
141
  return result
 
142
  async def track(self, link: str, videoid: Union[bool, str] = None):
143
  if videoid:
144
  link = self.base + link
 
146
  link = link.split("&")[0]
147
  results = VideosSearch(link, limit=1)
148
  for result in (await results.next())["result"]:
149
+ title = result["title"]
150
+ duration_min = result["duration"]
151
+ vidid = result["id"]
152
+ yturl = result["link"]
153
+ thumbnail = result["thumbnails"][0]["url"].split("?")[0]
154
+ track_details = {
155
+ "title": title,
156
+ "link": yturl,
157
+ "vidid": vidid,
158
+ "duration_min": duration_min,
159
+ "thumb": thumbnail,
160
+ }
161
+ return track_details, vidid
162
  async def formats(self, link: str, videoid: Union[bool, str] = None):
163
  if videoid:
164
  link = self.base + link
 
174
  str(format["format"])
175
  except:
176
  continue
177
+ if not "dash" in str(format["format"]).lower():
178
  try:
179
+ format["format"]
180
+ format["filesize"]
181
+ format["format_id"]
182
+ format["ext"]
183
+ format["format_note"]
 
 
 
 
 
184
  except:
185
  continue
186
+ formats_available.append(
187
+ {
188
+ "format": format["format"],
189
+ "filesize": format["filesize"],
190
+ "format_id": format["format_id"],
191
+ "ext": format["ext"],
192
+ "format_note": format["format_note"],
193
+ "yturl": link,
194
+ }
195
+ )
196
  return formats_available, link
197
+ async def slider(
198
+ self,
199
+ link: str,
200
+ query_type: int,
201
+ videoid: Union[bool, str] = None,
202
+ ):
203
  if videoid:
204
  link = self.base + link
205
  if "&" in link:
 
208
  result = (await a.next()).get("result")
209
  title = result[query_type]["title"]
210
  duration_min = result[query_type]["duration"]
211
+ vidid = result[query_type]["id"]
212
  thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
213
+ return title, duration_min, thumbnail, vidid
 
214
  async def get_video_info_from_bitflow(self, url: str, video: bool):
215
  api_url = "https://bitflow.in/api/youtube"
216
  params = {
 
219
  "download": True,
220
  "api_key": "1spiderkey2"
221
  }
222
+
223
  async with httpx.AsyncClient() as client:
224
  response = await client.get(api_url, params=params, timeout=150)
225
  if response.status_code == 200:
226
  return response.json()
227
  else:
228
  return {"status": "error", "message": "Failed to fetch data from Bitflow API."}
229
+ async def download(
230
+ self,
231
+ link: str,
232
+ mystic,
233
+ video: Union[bool, str] = None,
234
+ videoid: Union[bool, str] = None,
235
+ songaudio: Union[bool, str] = None,
236
+ songvideo: Union[bool, str] = None,
237
+ format_id: Union[bool, str] = None,
238
+ title: Union[bool, str] = None,
239
+ ) -> str:
240
  if videoid:
241
  link = self.base + link
242
  if "&" in link:
243
  link = link.split("&")[0]
244
  loop = asyncio.get_running_loop()
245
+ bitflow_info = await self.get_video_info_from_bitflow(link, video)
246
+ def audio_dl(bitflow_info):
247
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
248
+ ydl_optssx = {
249
+ "format": "bestaudio/best",
250
+ "outtmpl": xyz,
251
+ "geo_bypass": True,
252
+ "nocheckcertificate": True,
253
+ "quiet": True,
254
+ "no_warnings": True,
255
+ }
256
+ x = yt_dlp.YoutubeDL(ydl_optssx)
257
+ if os.path.exists(xyz):
 
 
 
 
258
  return xyz
259
+ x.download([bitflow_info['url']])
260
+ return xyz
261
+ def video_dl(bitflow_info):
262
+ xyz = os.path.join("downloads", f"{bitflow_info['videoid']}.{bitflow_info['ext']}")
263
+ ydl_optssx = {
264
+ "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
265
+ "outtmpl": xyz,
266
+ "geo_bypass": True,
267
+ "nocheckcertificate": True,
268
+ "quiet": True,
269
+ "no_warnings": True,
270
+ }
271
+ x = yt_dlp.YoutubeDL(ydl_optssx)
272
+ if os.path.exists(xyz):
 
273
  return xyz
274
+ x.download([bitflow_info['url']])
275
+ return xyz
276
+ def song_video_dl():
277
+ formats = f"{format_id}+140"
278
+ fpath = f"downloads/{title}"
279
+ ydl_optssx = {
280
+ "format": formats,
281
+ "outtmpl": fpath,
282
+ "geo_bypass": True,
283
+ "nocheckcertificate": True,
284
+ "quiet": True,
285
+ "no_warnings": True,
286
+ "prefer_ffmpeg": True,
287
+ "merge_output_format": "mp4",
288
+ }
289
+ x = yt_dlp.YoutubeDL(ydl_optssx)
290
+ x.download([link])
291
+ def song_audio_dl():
292
+ fpath = f"downloads/{title}.%(ext)s"
293
+ ydl_optssx = {
294
+ "format": format_id,
295
+ "outtmpl": fpath,
296
+ "geo_bypass": True,
297
+ "nocheckcertificate": True,
298
+ "quiet": True,
299
+ "no_warnings": True,
300
+ "prefer_ffmpeg": True,
301
+ "postprocessors": [
302
+ {
303
+ "key": "FFmpegExtractAudio",
304
+ "preferredcodec": "mp3",
305
+ "preferredquality": "192",
306
+ }
307
+ ],
308
+ }
309
+ x = yt_dlp.YoutubeDL(ydl_optssx)
310
+ x.download([link])
311
+ if songvideo:
312
+ await loop.run_in_executor(None, song_video_dl)
313
+ fpath = f"downloads/{title}.mp4"
314
+ return fpath
315
+ elif songaudio:
316
+ await loop.run_in_executor(None, song_audio_dl)
317
+ fpath = f"downloads/{title}.mp3"
318
+ return fpath
319
+ elif video:
320
+ direct = True
321
+ downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info)
322
+ else:
323
+ direct = True
324
+ downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info)
325
+ return downloaded_file, direct