Spaces:
Running
Running
# Ultroid - UserBot | |
# Copyright (C) 2021-2025 TeamUltroid | |
# | |
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
# PLease read the GNU Affero General Public License in | |
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>. | |
import os | |
import re | |
try: | |
from PIL import Image | |
except ImportError: | |
Image = None | |
from telethon import Button | |
from telethon.errors.rpcerrorlist import FilePartLengthInvalidError, MediaEmptyError | |
from telethon.tl.types import DocumentAttributeAudio, DocumentAttributeVideo | |
from telethon.tl.types import InputWebDocument as wb | |
from xteam.fns.helper import ( | |
bash, | |
fast_download, | |
humanbytes, | |
numerize, | |
time_formatter, | |
) | |
from xteam.fns.ytdl import dler, get_buttons, get_formats | |
from . import LOGS, asst, callback, in_pattern, udB | |
try: | |
from youtubesearchpython import VideosSearch | |
except ImportError: | |
LOGS.info("'youtubesearchpython' not installed!") | |
VideosSearch = None | |
ytt = "https://graph.org/file/afd04510c13914a06dd03.jpg" | |
_yt_base_url = "https://www.youtube.com/watch?v=" | |
BACK_BUTTON = {} | |
async def _(event): | |
try: | |
string = event.text.split(" ", maxsplit=1)[1] | |
except IndexError: | |
fuk = event.builder.article( | |
title="Search Something", | |
thumb=wb(ytt, 0, "image/jpeg", []), | |
text="**Yα΄α΄Tα΄Κα΄ Sα΄α΄Κα΄Κ**\n\nYou didn't search anything", | |
buttons=Button.switch_inline( | |
"Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", | |
query="yt ", | |
same_peer=True, | |
), | |
) | |
await event.answer([fuk]) | |
return | |
results = [] | |
search = VideosSearch(string, limit=50) | |
nub = search.result() | |
nibba = nub["result"] | |
for v in nibba: | |
ids = v["id"] | |
link = _yt_base_url + ids | |
title = v["title"] | |
duration = v["duration"] | |
views = v["viewCount"]["short"] | |
publisher = v["channel"]["name"] | |
published_on = v["publishedTime"] | |
description = ( | |
v["descriptionSnippet"][0]["text"] | |
if v.get("descriptionSnippet") | |
and len(v["descriptionSnippet"][0]["text"]) < 500 | |
else "None" | |
) | |
thumb = f"https://i.ytimg.com/vi/{ids}/hqdefault.jpg" | |
text = f"**Title: [{title}]({link})**\n\n" | |
text += f"`Description: {description}\n\n" | |
text += f"γ Duration: {duration} γ\n" | |
text += f"γ Views: {views} γ\n" | |
text += f"γ Publisher: {publisher} γ\n" | |
text += f"γ Published on: {published_on} γ`" | |
desc = f"{title}\n{duration}" | |
file = wb(thumb, 0, "image/jpeg", []) | |
buttons = [ | |
[ | |
Button.inline("Audio", data=f"ytdl:audio:{ids}"), | |
Button.inline("Video", data=f"ytdl:video:{ids}"), | |
], | |
[ | |
Button.switch_inline( | |
"Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", | |
query="yt ", | |
same_peer=True, | |
), | |
Button.switch_inline( | |
"SΚα΄Κα΄", | |
query=f"yt {string}", | |
same_peer=False, | |
), | |
], | |
] | |
BACK_BUTTON.update({ids: {"text": text, "buttons": buttons}}) | |
results.append( | |
await event.builder.article( | |
type="photo", | |
title=title, | |
description=desc, | |
thumb=file, | |
content=file, | |
text=text, | |
include_media=True, | |
buttons=buttons, | |
), | |
) | |
await event.answer(results[:50]) | |
async def _(e): | |
_e = e.pattern_match.group(1).strip().decode("UTF-8") | |
_lets_split = _e.split(":") | |
_ytdl_data = await dler(e, _yt_base_url + _lets_split[1]) | |
_data = get_formats(_lets_split[0], _lets_split[1], _ytdl_data) | |
_buttons = get_buttons(_data) | |
_text = ( | |
"`Select Your Format.`" | |
if _buttons | |
else "`Error downloading from YouTube.\nTry Restarting your bot.`" | |
) | |
await e.edit(_text, buttons=_buttons) | |
async def _(event): | |
url = event.pattern_match.group(1).strip().decode("UTF-8") | |
lets_split = url.split(":") | |
vid_id = lets_split[2] | |
link = _yt_base_url + vid_id | |
format = lets_split[1] | |
try: | |
ext = lets_split[3] | |
except IndexError: | |
ext = "mp3" | |
if lets_split[0] == "audio": | |
opts = { | |
"format": "bestaudio", | |
"addmetadata": True, | |
"key": "FFmpegMetadata", | |
"prefer_ffmpeg": True, | |
"geo_bypass": True, | |
"outtmpl": f"%(id)s.{ext}", | |
"logtostderr": False, | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": ext, | |
"preferredquality": format, | |
}, | |
{"key": "FFmpegMetadata"}, | |
], | |
} | |
ytdl_data = await dler(event, link, opts, True) | |
title = ytdl_data["title"] | |
if ytdl_data.get("artist"): | |
artist = ytdl_data["artist"] | |
elif ytdl_data.get("creator"): | |
artist = ytdl_data["creator"] | |
elif ytdl_data.get("channel"): | |
artist = ytdl_data["channel"] | |
views = numerize(ytdl_data.get("view_count")) or 0 | |
thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") | |
likes = numerize(ytdl_data.get("like_count")) or 0 | |
duration = ytdl_data.get("duration") or 0 | |
description = ( | |
ytdl_data["description"] | |
if len(ytdl_data["description"]) < 100 | |
else ytdl_data["description"][:100] | |
) | |
description = description or "None" | |
filepath = f"{vid_id}.{ext}" | |
if not os.path.exists(filepath): | |
filepath = f"{filepath}.{ext}" | |
size = os.path.getsize(filepath) | |
file, _ = await event.client.fast_uploader( | |
filepath, | |
filename=f"{title}.{ext}", | |
show_progress=True, | |
event=event, | |
to_delete=True, | |
) | |
attributes = [ | |
DocumentAttributeAudio( | |
duration=int(duration), | |
title=title, | |
performer=artist, | |
), | |
] | |
elif lets_split[0] == "video": | |
opts = { | |
"format": str(format), | |
"addmetadata": True, | |
"key": "FFmpegMetadata", | |
"prefer_ffmpeg": True, | |
"geo_bypass": True, | |
"outtmpl": f"%(id)s.{ext}", | |
"logtostderr": False, | |
"postprocessors": [{"key": "FFmpegMetadata"}], | |
} | |
ytdl_data = await dler(event, link, opts, True) | |
title = ytdl_data["title"] | |
if ytdl_data.get("artist"): | |
artist = ytdl_data["artist"] | |
elif ytdl_data.get("creator"): | |
artist = ytdl_data["creator"] | |
elif ytdl_data.get("channel"): | |
artist = ytdl_data["channel"] | |
views = numerize(ytdl_data.get("view_count")) or 0 | |
thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") | |
try: | |
Image.open(thumb).save(thumb, "JPEG") | |
except Exception as er: | |
LOGS.exception(er) | |
thumb = None | |
description = ( | |
ytdl_data["description"] | |
if len(ytdl_data["description"]) < 100 | |
else ytdl_data["description"][:100] | |
) | |
likes = numerize(ytdl_data.get("like_count")) or 0 | |
hi, wi = ytdl_data.get("height") or 720, ytdl_data.get("width") or 1280 | |
duration = ytdl_data.get("duration") or 0 | |
filepath = f"{vid_id}.mkv" | |
if not os.path.exists(filepath): | |
filepath = f"{filepath}.webm" | |
size = os.path.getsize(filepath) | |
file, _ = await event.client.fast_uploader( | |
filepath, | |
filename=f"{title}.mkv", | |
show_progress=True, | |
event=event, | |
to_delete=True, | |
) | |
attributes = [ | |
DocumentAttributeVideo( | |
duration=int(duration), | |
w=wi, | |
h=hi, | |
supports_streaming=True, | |
), | |
] | |
description = description if description != "" else "None" | |
text = f"**Title: [{title}]({_yt_base_url}{vid_id})**\n\n" | |
text += f"`π Description: {description}\n\n" | |
text += f"γ Duration: {time_formatter(int(duration)*1000)} γ\n" | |
text += f"γ Artist: {artist} γ\n" | |
text += f"γ Views: {views} γ\n" | |
text += f"γ Likes: {likes} γ\n" | |
text += f"γ Size: {humanbytes(size)} γ`" | |
button = Button.switch_inline("Search More", query="yt ", same_peer=True) | |
try: | |
await event.edit( | |
text, | |
file=file, | |
buttons=button, | |
attributes=attributes, | |
thumb=thumb, | |
) | |
except (FilePartLengthInvalidError, MediaEmptyError): | |
file = await asst.send_message( | |
udB.get_key("LOG_CHANNEL"), | |
text, | |
file=file, | |
buttons=button, | |
attributes=attributes, | |
thumb=thumb, | |
) | |
await event.edit(text, file=file.media, buttons=button) | |
await bash(f"rm {vid_id}.jpg") | |
async def ytdl_back(event): | |
id_ = event.data_match.group(1).decode("utf-8") | |
if not BACK_BUTTON.get(id_): | |
return await event.answer("Query Expired! Search again π") | |
await event.edit(**BACK_BUTTON[id_]) | |