Spaces:
Paused
Paused
# Ultroid - UserBot | |
# Copyright (C) 2021-2022 TeamUltroid | |
# | |
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
# PLease read the GNU Affero General Public License in | |
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>. | |
import base64 | |
import inspect | |
from datetime import datetime | |
from random import choice | |
from re import compile as re_compile | |
from bs4 import BeautifulSoup as bs | |
from pyUltroid.functions.misc import google_search | |
from pyUltroid.functions.tools import ( | |
_webupload_cache, | |
async_searcher, | |
get_ofox, | |
saavn_search, | |
webuploader, | |
) | |
from telethon import Button | |
from telethon.tl.alltlobjects import LAYER, tlobjects | |
from telethon.tl.types import DocumentAttributeAudio as Audio | |
from telethon.tl.types import InputWebDocument as wb | |
from . import * | |
from . import _ult_cache | |
SUP_BUTTONS = [ | |
[ | |
Button.url("• Repo •", url="https://github.com/TeamUltroid/Ultroid"), | |
Button.url("• Support •", url="t.me/UltroidSupport"), | |
], | |
] | |
ofox = "https://telegra.ph/file/231f0049fcd722824f13b.jpg" | |
gugirl = "https://telegra.ph/file/0df54ae4541abca96aa11.jpg" | |
ultpic = "https://telegra.ph/file/4136aa1650bc9d4109cc5.jpg" | |
apis = [ | |
"QUl6YVN5QXlEQnNZM1dSdEI1WVBDNmFCX3c4SkF5NlpkWE5jNkZV", | |
"QUl6YVN5QkYwenhMbFlsUE1wOXh3TVFxVktDUVJxOERnZHJMWHNn", | |
"QUl6YVN5RGRPS253blB3VklRX2xiSDVzWUU0Rm9YakFLSVFWMERR", | |
] | |
async def _(e): | |
try: | |
match = e.text.split(" ", maxsplit=1)[1] | |
except IndexError: | |
kkkk = e.builder.article( | |
title="Enter Device Codename", | |
thumb=wb(ofox, 0, "image/jpeg", []), | |
text="**OFᴏx🦊Rᴇᴄᴏᴠᴇʀʏ**\n\nYou didn't search anything", | |
buttons=Button.switch_inline("Sᴇᴀʀᴄʜ Aɢᴀɪɴ", query="ofox ", same_peer=True), | |
) | |
return await e.answer([kkkk]) | |
device, releases = await get_ofox(match) | |
if device.get("detail") is None: | |
fox = [] | |
fullname = device["full_name"] | |
codename = device["codename"] | |
str(device["supported"]) | |
maintainer = device["maintainer"]["name"] | |
link = f"https://orangefox.download/device/{codename}" | |
for data in releases["data"]: | |
release = data["type"] | |
version = data["version"] | |
size = humanbytes(data["size"]) | |
release_date = datetime.utcfromtimestamp(data["date"]).strftime("%Y-%m-%d") | |
text = f"[\xad]({ofox})**OʀᴀɴɢᴇFᴏx Rᴇᴄᴏᴠᴇʀʏ Fᴏʀ**\n\n" | |
text += f"` Fᴜʟʟ Nᴀᴍᴇ: {fullname}`\n" | |
text += f"` Cᴏᴅᴇɴᴀᴍᴇ: {codename}`\n" | |
text += f"` Mᴀɪɴᴛᴀɪɴᴇʀ: {maintainer}`\n" | |
text += f"` Bᴜɪʟᴅ Tʏᴘᴇ: {release}`\n" | |
text += f"` Vᴇʀsɪᴏɴ: {version}`\n" | |
text += f"` Sɪᴢᴇ: {size}`\n" | |
text += f"` Bᴜɪʟᴅ Dᴀᴛᴇ: {release_date}`" | |
fox.append( | |
await e.builder.article( | |
title=f"{fullname}", | |
description=f"{version}\n{release_date}", | |
text=text, | |
thumb=wb(ofox, 0, "image/jpeg", []), | |
link_preview=True, | |
buttons=[ | |
Button.url("Dᴏᴡɴʟᴏᴀᴅ", url=f"{link}"), | |
Button.switch_inline( | |
"Sᴇᴀʀᴄʜ Aɢᴀɪɴ", query="ofox ", same_peer=True | |
), | |
], | |
) | |
) | |
await e.answer( | |
fox, switch_pm="OrangeFox Recovery Search.", switch_pm_param="start" | |
) | |
else: | |
await e.answer( | |
[], switch_pm="OrangeFox Recovery Search.", switch_pm_param="start" | |
) | |
async def _(e): | |
match = e.pattern_match.group(1) | |
chat_id, msg_id = match.split(":") | |
filename = _webupload_cache[int(chat_id)][int(msg_id)] | |
if "/" in filename: | |
filename = filename.split("/")[-1] | |
__cache = f"{chat_id}:{msg_id}" | |
buttons = [ | |
[ | |
Button.inline("anonfiles", data=f"flanonfiles//{__cache}"), | |
Button.inline("transfer", data=f"fltransfer//{__cache}"), | |
], | |
[ | |
Button.inline("bayfiles", data=f"flbayfiles//{__cache}"), | |
Button.inline("x0.at", data=f"flx0.at//{__cache}"), | |
], | |
[ | |
Button.inline("file.io", data=f"flfile.io//{__cache}"), | |
Button.inline("siasky", data=f"flsiasky//{__cache}"), | |
], | |
] | |
try: | |
lnk = [ | |
await e.builder.article( | |
title=f"Upload {filename}", | |
text=f"**File:**\n{filename}", | |
buttons=buttons, | |
) | |
] | |
except BaseException as er: | |
LOGS.exception(er) | |
lnk = [ | |
await e.builder.article( | |
title="fl2lnk", | |
text="File not found", | |
) | |
] | |
await e.answer(lnk, switch_pm="File to Link.", switch_pm_param="start") | |
async def _(e): | |
t = (e.data).decode("UTF-8") | |
data = t[2:] | |
host = data.split("//")[0] | |
chat_id, msg_id = data.split("//")[1].split(":") | |
filename = _webupload_cache[int(chat_id)][int(msg_id)] | |
if "/" in filename: | |
filename = filename.split("/")[-1] | |
await e.edit(f"Uploading `{filename}` on {host}") | |
link = (await webuploader(chat_id, msg_id, host)).strip().replace("\n", "") | |
await e.edit(f"Uploaded `{filename}` on {host}.", buttons=Button.url("View", link)) | |
async def repo(e): | |
res = [ | |
await e.builder.article( | |
title="Ultroid Userbot", | |
description="Userbot | Telethon", | |
thumb=wb(ultpic, 0, "image/jpeg", []), | |
text="• **ULTROID USERBOT** •", | |
buttons=SUP_BUTTONS, | |
), | |
] | |
await e.answer(res, switch_pm="Ultroid Repo.", switch_pm_param="start") | |
async def gsearch(q_event): | |
try: | |
match = q_event.text.split(" ", maxsplit=1)[1] | |
except IndexError: | |
return await q_event.answer( | |
[], switch_pm="Google Search. Enter a query!", switch_pm_param="start" | |
) | |
searcher = [] | |
gresults = await google_search(match) | |
for i in gresults: | |
try: | |
title = i["title"] | |
link = i["link"] | |
desc = i["description"] | |
searcher.append( | |
await q_event.builder.article( | |
title=title, | |
description=desc, | |
thumb=wb(gugirl, 0, "image/jpeg", []), | |
text=f"**Gᴏᴏɢʟᴇ Sᴇᴀʀᴄʜ**\n\n**••Tɪᴛʟᴇ••**\n`{title}`\n\n**••Dᴇsᴄʀɪᴘᴛɪᴏɴ••**\n`{desc}`", | |
link_preview=False, | |
buttons=[ | |
[Button.url("Lɪɴᴋ", url=f"{link}")], | |
[ | |
Button.switch_inline( | |
"Sᴇᴀʀᴄʜ Aɢᴀɪɴ", | |
query="go ", | |
same_peer=True, | |
), | |
Button.switch_inline( | |
"Sʜᴀʀᴇ", | |
query=f"go {match}", | |
same_peer=False, | |
), | |
], | |
], | |
), | |
) | |
except IndexError: | |
break | |
await q_event.answer(searcher, switch_pm="Google Search.", switch_pm_param="start") | |
async def _(e): | |
try: | |
quer = e.text.split(" ", maxsplit=1)[1] | |
except IndexError: | |
return await e.answer( | |
[], switch_pm="Mod Apps Search. Enter app name!", switch_pm_param="start" | |
) | |
page = 1 | |
start = (page - 1) * 3 + 1 | |
da = base64.b64decode(choice(apis)).decode("ascii") | |
url = f"https://www.googleapis.com/customsearch/v1?key={da}&cx=25b3b50edb928435b&q={quer}&start={start}" | |
data = await async_searcher(url, re_json=True) | |
search_items = data.get("items") | |
modss = [] | |
for a in search_items: | |
title = a.get("title") | |
desc = a.get("snippet") | |
link = a.get("link") | |
text = f"**••Tɪᴛʟᴇ••** `{title}`\n\n" | |
text += f"**Dᴇsᴄʀɪᴘᴛɪᴏɴ** `{desc}`" | |
modss.append( | |
await e.builder.article( | |
title=title, | |
description=desc, | |
text=text, | |
link_preview=True, | |
buttons=[ | |
[Button.url("Dᴏᴡɴʟᴏᴀᴅ", url=f"{link}")], | |
[ | |
Button.switch_inline( | |
"Mᴏʀᴇ Mᴏᴅs", | |
query="mods ", | |
same_peer=True, | |
), | |
Button.switch_inline( | |
"Sʜᴀʀᴇ", | |
query=f"mods {quer}", | |
same_peer=False, | |
), | |
], | |
], | |
), | |
) | |
await e.answer(modss, switch_pm="Search Mod Applications.", switch_pm_param="start") | |
# Inspired by @FindXDaBot | |
async def xda_dev(event): | |
QUERY = event.text.split(" ", maxsplit=1) | |
try: | |
query = QUERY[1] | |
except IndexError: | |
return await event.answer( | |
[], switch_pm=get_string("instu_3"), switch_pm_param="start" | |
) | |
le = "https://www.xda-developers.com/search/" + query.replace(" ", "+") | |
ct = await async_searcher(le, re_content=True) | |
ml = bs(ct, "html.parser", from_encoding="utf-8") | |
ml = ml.find_all("div", re_compile("layout_post_"), id=re_compile("post-")) | |
out = [] | |
for on in ml: | |
data = on.find_all("img", "xda_image")[0] | |
title = data["alt"] | |
thumb = data["src"] | |
hre = on.find_all("div", "item_content")[0].find("h4").find("a")["href"] | |
desc = on.find_all("div", "item_meta clearfix")[0].text | |
thumb = wb(thumb, 0, "image/jpeg", []) | |
text = f"[{title}]({hre})" | |
out.append( | |
await event.builder.article( | |
title=title, description=desc, url=hre, thumb=thumb, text=text | |
) | |
) | |
uppar = "No Results Found :(" if not out else "|| XDA Search Results ||" | |
await event.answer(out, switch_pm=uppar, switch_pm_param="start") | |
APP_CACHE = {} | |
RECENTS = {} | |
async def _(e): | |
try: | |
f = e.text.split(" ", maxsplit=1)[1].lower() | |
except IndexError: | |
get_string("instu_1") | |
res = [] | |
if APP_CACHE and RECENTS.get(e.sender_id): | |
for a in RECENTS[e.sender_id]: | |
if APP_CACHE.get(a): | |
res.append(APP_CACHE[a][0]) | |
return await e.answer( | |
res, switch_pm=get_string("instu_2"), switch_pm_param="start" | |
) | |
try: | |
return await e.answer( | |
APP_CACHE[f], switch_pm="Application Searcher.", switch_pm_param="start" | |
) | |
except KeyError: | |
pass | |
foles = [] | |
base_uri = "https://play.google.com" | |
url = f"{base_uri}/store/search?q={f.replace(' ', '%20')}&c=apps" | |
aap = await async_searcher(url, re_content=True) | |
b_ = bs(aap, "html.parser", from_encoding="utf-8") | |
aap = b_.find_all("div", "Vpfmgd") | |
for z in aap[:10]: | |
url = base_uri + z.find("a")["href"] | |
scra = await async_searcher(url, re_content=True) | |
bp = bs(scra, "html.parser", from_encoding="utf-8") | |
name = z.find("div", "WsMG1c nnK0zc")["title"] | |
desc = ( | |
str(bp.find("div", jsname="sngebd")) | |
.replace('<div jsname="sngebd">', "") | |
.replace("<br/>", "\n") | |
.replace("</div>", "")[:300] | |
+ "..." | |
) | |
dev = z.find("div", "KoLSrc").text | |
icon = z.find("img", "T75of QNCnCf")["data-src"] | |
text = f"**••Aᴘᴘ Nᴀᴍᴇ••** [{name}]({icon})\n" | |
text += f"**••Dᴇᴠᴇʟᴏᴘᴇʀ••** `{dev}`\n" | |
text += f"**••Dᴇsᴄʀɪᴘᴛɪᴏɴ••**\n`{desc}`" | |
foles.append( | |
await e.builder.article( | |
title=name, | |
description=dev, | |
thumb=wb(icon, 0, "image/jpeg", []), | |
text=text, | |
link_preview=True, | |
buttons=[ | |
[Button.url("Lɪɴᴋ", url=url)], | |
[ | |
Button.switch_inline( | |
"Mᴏʀᴇ Aᴘᴘs", | |
query="app ", | |
same_peer=True, | |
), | |
Button.switch_inline( | |
"Sʜᴀʀᴇ", | |
query=f"app {f}", | |
same_peer=False, | |
), | |
], | |
], | |
), | |
) | |
APP_CACHE.update({f: foles}) | |
if RECENTS.get(e.sender_id): | |
RECENTS[e.sender_id].append(f) | |
else: | |
RECENTS.update({e.sender_id: [f]}) | |
await e.answer(foles, switch_pm="Application Searcher.", switch_pm_param="start") | |
PISTON_URI = "https://emkc.org/api/v2/piston/" | |
PISTON_LANGS = {} | |
async def piston_run(event): | |
try: | |
lang = event.text.split()[1] | |
code = event.text.split(maxsplit=2)[2] | |
except IndexError: | |
result = await event.builder.article( | |
title="Bad Query", | |
description="Usage: [Language] [code]", | |
thumb=wb( | |
"https://telegra.ph/file/e33c57fc5f1044547e4d8.jpg", 0, "image/jpeg", [] | |
), | |
text=f'**Inline Usage**\n\n`@{asst.me.username} run python print("hello world")`\n\n[Language List](https://telegra.ph/Ultroid-09-01-6)', | |
) | |
return await event.answer([result]) | |
if not PISTON_LANGS: | |
se = await async_searcher(PISTON_URI + "runtimes", re_json=True) | |
PISTON_LANGS.update({lang.pop("language"): lang for lang in se}) | |
if lang in PISTON_LANGS.keys(): | |
version = PISTON_LANGS[lang]["version"] | |
else: | |
result = await event.builder.article( | |
title="Unsupported Language", | |
description="Usage: [Language] [code]", | |
thumb=wb( | |
"https://telegra.ph/file/e33c57fc5f1044547e4d8.jpg", 0, "image/jpeg", [] | |
), | |
text=f'**Inline Usage**\n\n`@{asst.me.username} run python print("hello world")`\n\n[Language List](https://telegra.ph/Ultroid-09-01-6)', | |
) | |
return await event.answer([result]) | |
output = await async_searcher( | |
PISTON_URI + "execute", | |
post=True, | |
json={"language": lang, "version": version, "files": [{"content": code}]}, | |
re_json=True, | |
) | |
output = output["run"]["output"] or get_string("instu_4") | |
if len(output) > 3000: | |
output = output[:3000] + "..." | |
result = await event.builder.article( | |
title="Result", | |
description=output, | |
text=f"• **Language:**\n`{lang}`\n\n• **Code:**\n`{code}`\n\n• **Result:**\n`{output}`", | |
thumb=wb( | |
"https://telegra.ph/file/871ee4a481f58117dccc4.jpg", 0, "image/jpeg", [] | |
), | |
buttons=Button.switch_inline("Fork", query=event.text, same_peer=True), | |
) | |
await event.answer([result], switch_pm="• Piston •", switch_pm_param="start") | |
FDROID_ = {} | |
async def do_magic(event): | |
try: | |
match = event.text.split(" ", maxsplit=1)[1].lower() | |
except IndexError: | |
return await event.answer( | |
[], switch_pm="Enter Query to Search", switch_pm_param="start" | |
) | |
if FDROID_.get(match): | |
return await event.answer( | |
FDROID_[match], switch_pm=f"• Results for {match}", switch_pm_param="start" | |
) | |
link = "https://search.f-droid.org/?q=" + match.replace(" ", "+") | |
content = await async_searcher(link, re_content=True) | |
BSC = bs(content, "html.parser", from_encoding="utf-8") | |
ress = [] | |
for dat in BSC.find_all("a", "package-header")[:10]: | |
image = dat.find("img", "package-icon")["src"] | |
if image.endswith("/"): | |
image = "https://telegra.ph/file/a8dd4a92c5a53a89d0eff.jpg" | |
title = dat.find("h4", "package-name").text.strip() | |
desc = dat.find("span", "package-summary").text.strip() | |
text = f"• **Name :** `{title}`\n\n" | |
text += f"• **Description :** `{desc}`\n" | |
text += f"• **License :** `{dat.find('span', 'package-license').text.strip()}`" | |
imga = wb(image, 0, "image/jpeg", []) | |
ress.append( | |
await event.builder.article( | |
title=title, | |
type="photo", | |
description=desc, | |
text=text, | |
content=imga, | |
thumb=imga, | |
include_media=True, | |
buttons=[ | |
Button.inline( | |
"• Download •", "fd" + dat["href"].split("packages/")[-1] | |
), | |
Button.switch_inline("• Share •", query=event.text), | |
], | |
) | |
) | |
msg = f"Showing {len(ress)} Results!" if ress else "No Results Found" | |
FDROID_.update({match: ress}) | |
await event.answer(ress, switch_pm=msg, switch_pm_param="start") | |
_koo_ = {} | |
async def koo_search(ult): | |
"""Search Users on koo with API""" | |
try: | |
match = ult.text.split(maxsplit=1)[1].lower() | |
match_ = match | |
except IndexError: | |
return await ult.answer( | |
[], switch_pm="Enter Query to Search..", switch_pm_param="start" | |
) | |
if _koo_.get(match): | |
return await ult.answer( | |
_koo_[match], switch_pm="• Koo Search •", switch_pm_param="start" | |
) | |
res = [] | |
se_ = None | |
key_count = None | |
if " | " in match: | |
match = match.split(" | ", maxsplit=1) | |
try: | |
key_count = int(match[1]) | |
except ValueError: | |
pass | |
match = match[0] | |
match = match.replace(" ", "+") | |
req = await async_searcher( | |
f"https://www.kooapp.com/apiV1/search?query={match}&searchType=EXPLORE", | |
re_json=True, | |
) | |
if key_count: | |
try: | |
se_ = [req["feed"][key_count - 1]] | |
except KeyError: | |
pass | |
if not se_: | |
se_ = req["feed"] | |
for count, feed in enumerate(se_[:10]): | |
if feed["uiItemType"] == "search_profile": | |
count += 1 | |
item = feed["items"][0] | |
profileImage = ( | |
item["profileImageBaseUrl"] | |
if item.get("profileImageBaseUrl") | |
else "https://telegra.ph/file/dc28e69bd7ea2c0f25329.jpg" | |
) | |
extra = await async_searcher( | |
"https://www.kooapp.com/apiV1/users/handle/" + item["userHandle"], | |
re_json=True, | |
) | |
img = wb(profileImage, 0, "image/jpeg", []) | |
text = f"‣ **Name :** `{item['name']}`" | |
if extra.get("title"): | |
text += f"\n‣ **Title :** `{extra['title']}`" | |
text += f"\n‣ **Username :** `@{item['userHandle']}`" | |
if extra.get("description"): | |
text += f"\n‣ **Description :** `{extra['description']}`" | |
text += f"\n‣ **Followers :** `{extra['followerCount']}` ‣ **Following :** {extra['followingCount']}" | |
if extra.get("socialProfile") and extra["socialProfile"].get("website"): | |
text += f"\n‣ **Website :** {extra['socialProfile']['website']}" | |
res.append( | |
await ult.builder.article( | |
title=item["name"], | |
description=item.get("title") or f"@{item['userHandle']}", | |
type="photo", | |
content=img, | |
thumb=img, | |
include_media=True, | |
text=text, | |
buttons=[ | |
Button.url( | |
"View", "https://kooapp.com/profile/" + item["userHandle"] | |
), | |
Button.switch_inline( | |
"• Share •", | |
query=ult.text if key_count else ult.text + f" | {count}", | |
), | |
], | |
) | |
) | |
if not res: | |
switch = "No Results Found :(" | |
else: | |
_koo_.update({match_: res}) | |
switch = f"Showing {len(res)} Results!" | |
await ult.answer(res, switch_pm=switch, switch_pm_param="start") | |
# Thanks to OpenSource | |
_bearer_collected = [ | |
"AAAAAAAAAAAAAAAAAAAAALIKKgEAAAAA1DRuS%2BI7ZRKiagD6KHYmreaXomo%3DP5Vaje4UTtEkODg0fX7nCh5laSrchhtLxeyEqxXpv0w9ZKspLD", | |
"AAAAAAAAAAAAAAAAAAAAAL5iUAEAAAAAmo6FYRjqdKlI3cNziIm%2BHUQB9Xs%3DS31pj0mxARMTOk2g9dvQ1yP9wknvY4FPBPUlE00smJcncw4dPR", | |
"AAAAAAAAAAAAAAAAAAAAAN6sVgEAAAAAMMjMMWrwgGyv7YQOWN%2FSAsO5SGM%3Dg8MG9Jq93Rlllaok6eht7HvRCruN4Vpzp4NaVsZaaHHWSTzKI8", | |
] | |
async def twitter_search(event): | |
try: | |
match = event.text.split(maxsplit=1)[1].lower() | |
except IndexError: | |
return await event.answer( | |
[], switch_pm="Enter Query to Search", switch_pm_param="start" | |
) | |
try: | |
return await event.answer( | |
_ult_cache["twitter"][match], | |
switch_pm="• Twitter Search •", | |
switch_pm_param="start", | |
) | |
except KeyError: | |
pass | |
headers = {"Authorization": "bearer " + choice(_bearer_collected)} | |
res = await async_searcher( | |
f"https://api.twitter.com/1.1/users/search.json?q={match}", | |
headers=headers, | |
re_json=True, | |
) | |
reso = [] | |
for user in res: | |
thumb = wb(user["profile_image_url_https"], 0, "image/jpeg", []) | |
if user.get("profile_banner_url"): | |
url = user["profile_banner_url"] | |
text = f"[\xad]({url})• **Name :** `{user['name']}`\n" | |
else: | |
text = f"• **Name :** `{user['name']}`\n" | |
text += f"• **Description :** `{user['description']}`\n" | |
text += f"• **Username :** `@{user['screen_name']}`\n" | |
text += f"• **Followers :** `{user['followers_count']}` • **Following :** `{user['friends_count']}`\n" | |
pro_ = "https://twitter.com/" + user["screen_name"] | |
text += f"• **Link :** [Click Here]({pro_})\n_" | |
reso.append( | |
await event.builder.article( | |
title=user["name"], | |
description=user["description"], | |
url=pro_, | |
text=text, | |
thumb=thumb, | |
) | |
) | |
swi_ = "No User Found :(" if not reso else f"🐦 Showing {len(reso)} Results!" | |
await event.answer(reso, switch_pm=swi_, switch_pm_param="start") | |
if _ult_cache.get("twitter"): | |
_ult_cache["twitter"].update({match: reso}) | |
else: | |
_ult_cache.update({"twitter": {match: reso}}) | |
_savn_cache = {} | |
async def savn_s(event): | |
try: | |
query = event.text.split(maxsplit=1)[1].lower() | |
except IndexError: | |
return await event.answer( | |
[], switch_pm="Enter Query to search 🔍", switch_pm_param="start" | |
) | |
if query in _savn_cache: | |
return await event.answer( | |
_savn_cache[query], | |
switch_pm=f"Showing Results for {query}", | |
switch_pm_param="start", | |
) | |
results = await saavn_search(query) | |
swi = "No Results Found!" if not results else "🎵 Saavn Search" | |
res = [] | |
for song in results: | |
thumb = wb(song["image"], 0, "image/jpeg", []) | |
text = f"• **Title :** {song['song']}" | |
text += f"\n• **Year :** {song['year']}" | |
text += f"\n• **Lang :** {song['language']}" | |
text += f"\n• **Artist :** {song['primary_artists']}" | |
text += f"\n• **Release Date :** {song['release_date']}" | |
res.append( | |
await event.builder.article( | |
title=song["song"], | |
type="audio", | |
text=text, | |
include_media=True, | |
buttons=Button.switch_inline( | |
"Search Again 🔍", query="saavn", same_peer=True | |
), | |
thumb=thumb, | |
content=wb( | |
song["media_url"], | |
0, | |
"audio/mp4", | |
[ | |
Audio( | |
title=song["song"], | |
duration=int(song["duration"]), | |
performer=song["primary_artists"], | |
) | |
], | |
), | |
) | |
) | |
await event.answer(res, switch_pm=swi, switch_pm_param="start") | |
_savn_cache.update({query: res}) | |
_OMG = {} | |
async def omgubuntu(ult): | |
try: | |
match = ult.text.split(maxsplit=1)[1].lower() | |
except IndexError: | |
return await ult.answer( | |
[], switch_pm="Enter Query to search...", switch_pm_param="start" | |
) | |
if _OMG.get(match): | |
return await ult.answer( | |
_OMG[match], switch_pm="OMG Ubuntu Search :]", switch_pm_param="start" | |
) | |
get_web = "https://www.omgubuntu.co.uk/?s=" + match.replace(" ", "+") | |
get_ = await async_searcher(get_web, re_content=True) | |
BSC = bs(get_, "html.parser", from_encoding="utf-8") | |
res = [] | |
for cont in BSC.find_all("div", "sbs-layout__item"): | |
img = cont.find("div", "sbs-layout__image") | |
url = img.find("a")["href"] | |
src = img.find("img")["src"] | |
con = cont.find("div", "sbs-layout__content") | |
tit = con.find("a", "layout__title-link") | |
title = tit.text.strip() | |
desc = con.find("p", "layout__description").text.strip() | |
text = f"[{title.strip()}]({url})\n\n{desc}" | |
img = wb(src, 0, "image/jpeg", []) | |
res.append( | |
await ult.builder.article( | |
title=title, | |
type="photo", | |
description=desc, | |
url=url, | |
text=text, | |
buttons=Button.switch_inline( | |
"Search Again", query=ult.text, same_peer=True | |
), | |
include_media=True, | |
content=img, | |
thumb=img, | |
) | |
) | |
await ult.answer( | |
res, | |
switch_pm=f"Showing {len(res)} results!" if res else "No Results Found :[", | |
switch_pm_param="start", | |
) | |
_OMG[match] = res | |
async def inline_tl(ult): | |
try: | |
match = ult.text.split(maxsplit=1)[1] | |
except IndexError: | |
text = f"**It is Telegram TlObjects Searcher.**\n__(Don't use if you don't know what it is!)__\n\n• Example Usage\n`@{asst.me.username} tl GetUserRequest`" | |
return await ult.answer( | |
[ | |
await ult.builder.article( | |
title="How to Use?", | |
description="Tl Searcher by Ultroid", | |
url="https://t.me/TheUltroid", | |
text=text, | |
) | |
], | |
switch_pm="Tl Search 🔍", | |
switch_pm_param="start", | |
) | |
res = [] | |
for key in tlobjects.values(): | |
if match.lower() in key.__name__.lower(): | |
tyyp = "Function" if "tl.functions." in str(key) else "Type" | |
text = f"**Name:** `{key.__name__}`\n" | |
text += f"**Category:** `{tyyp}`\n" | |
text += f"\n`from {key.__module__} import {key.__name__}`\n\n" | |
args = str(inspect.signature(key))[1:][:-1] | |
if args: | |
text += "**Parameter:**\n" | |
for para in args.split(","): | |
text += " " * 4 + "`" + para + "`\n" | |
text += f"\n**Layer:** `{LAYER}`" | |
res.append( | |
await ult.builder.article( | |
title=key.__name__, | |
description=tyyp, | |
url="https://t.me/TheUltroid", | |
text=text[:4000], | |
) | |
) | |
if not res: | |
mo = f"No Results for {match}!" | |
else: | |
mo = f"Showing {len(res)} results!" | |
await ult.answer(res[:50], switch_pm=mo, switch_pm_param="start") | |
async def gh_feeds(ult): | |
try: | |
username = ult.text.split(maxsplit=1)[1] | |
except IndexError: | |
return await ult.answer( | |
[], | |
switch_pm="Enter Github Username to see feeds...", | |
switch_pm_param="start", | |
) | |
if not username.endswith("."): | |
return await ult.answer( | |
[], switch_pm="End your query with . to search...", switch_pm_param="start" | |
) | |
username = username[:-1] | |
data = await async_searcher( | |
f"https://api.github.com/users/{username}/events", re_json=True | |
) | |
if not isinstance(data, list): | |
msg = "" | |
for ak in list(data.keys()): | |
msg += ak + ": `" + data[ak] + "`\n" | |
return await ult.answer( | |
[ | |
await ult.builder.article( | |
title=data["message"], text=msg, link_preview=False | |
) | |
], | |
cache_time=300, | |
switch_pm="Error!!!", | |
switch_pm_param="start", | |
) | |
res = [] | |
res_ids = [] | |
for cont in data[:50]: | |
text = f"<b><a href='https://github.com/{username}'>@{username}</a></b>" | |
title = f"@{username}" | |
extra = None | |
if cont["type"] == "PushEvent": | |
text += " pushed in" | |
title += " pushed in" | |
dt = cont["payload"]["commits"][-1] | |
url = "https://github.com/" + dt["url"].split("/repos/")[-1] | |
extra = f"\n-> <b>message:</b> <code>{dt['message']}</code>" | |
elif cont["type"] == "IssueCommentEvent": | |
title += " commented at" | |
text += " commented at" | |
url = cont["payload"]["comment"]["html_url"] | |
elif cont["type"] == "CreateEvent": | |
title += " created" | |
text += " created" | |
url = "https://github.com/" + cont["repo"]["name"] | |
elif cont["type"] == "PullRequestEvent": | |
if ( | |
cont["payload"]["pull_request"].get("user", {}).get("login") | |
!= username.lower() | |
): | |
continue | |
url = cont["payload"]["pull_request"]["html_url"] | |
text += " created a pull request in" | |
title += " created a pull request in" | |
elif cont["type"] == "ForkEvent": | |
text += " forked" | |
title += " forked" | |
url = cont["payload"]["forkee"]["html_url"] | |
else: | |
continue | |
repo = cont["repo"]["name"] | |
repo_url = "https://github.com/" + repo | |
title += " " + repo | |
text += f" <b><a href='{repo_url}'>{repo}</a></b>" | |
if extra: | |
text += extra | |
thumb = wb(cont["actor"]["avatar_url"], 0, "image/jpeg", []) | |
article = await ult.builder.article( | |
title=title, | |
text=text, | |
url=repo_url, | |
parse_mode="html", | |
link_preview=False, | |
thumb=thumb, | |
buttons=[ | |
Button.url("View", url), | |
Button.switch_inline("Search again", query=ult.text, same_peer=True), | |
], | |
) | |
if article.id not in res_ids: | |
res_ids.append(article.id) | |
res.append(article) | |
if res: | |
msg = f"Showing {len(res)} feeds!" | |
else: | |
msg = "Nothing Found" | |
await ult.answer(res, cache_time=5000, switch_pm=msg, switch_pm_param="start") | |