Spaces:
Paused
Paused
# Ultroid - UserBot | |
# Copyright (C) 2021-2023 TeamUltroid | |
# | |
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
# PLease read the GNU Affero General Public License in | |
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>. | |
import ast | |
import asyncio | |
import re | |
import sys | |
import time | |
from asyncio.exceptions import TimeoutError as AsyncTimeOut | |
from os import execl, remove | |
from random import choice | |
from bs4 import BeautifulSoup as bs | |
try: | |
from xteam.fns.gDrive import GDriveManager | |
except ImportError: | |
GDriveManager = None | |
from telegraph import upload_file as upl | |
from telethon import Button, events | |
from telethon.tl.types import MessageMediaWebPage | |
from telethon.utils import get_peer_id | |
from xteam.fns.helper import fast_download, progress | |
from xteam.fns.tools import Carbon, async_searcher, get_paste, telegraph_client | |
from xteam.startup.loader import Loader | |
from . import * | |
# --------------------------------------------------------------------# | |
telegraph = telegraph_client() | |
GDrive = GDriveManager() if GDriveManager else None | |
# --------------------------------------------------------------------# | |
def text_to_url(event): | |
"""function to get media url (with|without) Webpage""" | |
if isinstance(event.media, MessageMediaWebPage): | |
webpage = event.media.webpage | |
if not isinstance(webpage, types.WebPageEmpty) and webpage.type in ["photo"]: | |
return webpage.display_url | |
return event.text | |
# --------------------------------------------------------------------# | |
_buttons = { | |
"otvars": { | |
"text": "Other Variables to set for @TeamUltroid:", | |
"buttons": [ | |
[ | |
Button.inline("Tᴀɢ Lᴏɢɢᴇʀ", data="taglog"), | |
Button.inline("SᴜᴘᴇʀFʙᴀɴ", data="cbs_sfban"), | |
], | |
[ | |
Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ", data="sudo"), | |
Button.inline("Hᴀɴᴅʟᴇʀ", data="hhndlr"), | |
], | |
[ | |
Button.inline("Exᴛʀᴀ Pʟᴜɢɪɴs", data="plg"), | |
Button.inline("Aᴅᴅᴏɴs", data="eaddon"), | |
], | |
[ | |
Button.inline("Eᴍᴏᴊɪ ɪɴ Hᴇʟᴘ", data="emoj"), | |
Button.inline("Sᴇᴛ ɢDʀɪᴠᴇ", data="gdrive"), | |
], | |
[ | |
Button.inline("Iɴʟɪɴᴇ Pɪᴄ", data="inli_pic"), | |
Button.inline("Sᴜᴅᴏ HNDLR", data="shndlr"), | |
], | |
[Button.inline("Dᴜᴀʟ Mᴏᴅᴇ", "cbs_oofdm")], | |
[Button.inline("« Bᴀᴄᴋ", data="setter")], | |
], | |
}, | |
"sfban": { | |
"text": "SuperFban Settings:", | |
"buttons": [ | |
[Button.inline("FBᴀɴ Gʀᴏᴜᴘ", data="sfgrp")], | |
[Button.inline("Exᴄʟᴜᴅᴇ Fᴇᴅs", data="abs_sfexf")], | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")], | |
], | |
}, | |
"apauto": { | |
"text": "This'll auto approve on outgoing messages", | |
"buttons": [ | |
[Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ ON", data="apon")], | |
[Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ OFF", data="apof")], | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")], | |
], | |
}, | |
"alvcstm": { | |
"text": f"Customise your {HNDLR}alive. Choose from the below options -", | |
"buttons": [ | |
[Button.inline("Aʟɪᴠᴇ Tᴇxᴛ", data="abs_alvtx")], | |
[Button.inline("Aʟɪᴠᴇ ᴍᴇᴅɪᴀ", data="alvmed")], | |
[Button.inline("Dᴇʟᴇᴛᴇ Aʟɪᴠᴇ Mᴇᴅɪᴀ", data="delmed")], | |
[Button.inline("« Bᴀᴄᴋ", data="setter")], | |
], | |
}, | |
"pmcstm": { | |
"text": "Customise your PMPERMIT Settings -", | |
"buttons": [ | |
[ | |
Button.inline("Pᴍ Tᴇxᴛ", data="pmtxt"), | |
Button.inline("Pᴍ Mᴇᴅɪᴀ", data="pmmed"), | |
], | |
[ | |
Button.inline("Aᴜᴛᴏ Aᴘᴘʀᴏᴠᴇ", data="cbs_apauto"), | |
Button.inline("PMLOGGER", data="pml"), | |
], | |
[ | |
Button.inline("Sᴇᴛ Wᴀʀɴs", data="swarn"), | |
Button.inline("Dᴇʟᴇᴛᴇ Pᴍ Mᴇᴅɪᴀ", data="delpmmed"), | |
], | |
[Button.inline("PMPermit Type", data="cbs_pmtype")], | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")], | |
], | |
}, | |
"pmtype": { | |
"text": "Select the type of PMPermit needed.", | |
"buttons": [ | |
[Button.inline("Inline", data="inpm_in")], | |
[Button.inline("Normal", data="inpm_no")], | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")], | |
], | |
}, | |
"ppmset": { | |
"text": "PMPermit Settings:", | |
"buttons": [ | |
[Button.inline("Tᴜʀɴ PMPᴇʀᴍɪᴛ Oɴ", data="pmon")], | |
[Button.inline("Tᴜʀɴ PMPᴇʀᴍɪᴛ Oғғ", data="pmoff")], | |
[Button.inline("Cᴜsᴛᴏᴍɪᴢᴇ PMPᴇʀᴍɪᴛ", data="cbs_pmcstm")], | |
[Button.inline("« Bᴀᴄᴋ", data="setter")], | |
], | |
}, | |
"chatbot": { | |
"text": "From This Feature U can chat with ppls Via ur Assistant Bot.\n[More info](https://t.me/UltroidUpdates/2)", | |
"buttons": [ | |
[ | |
Button.inline("Cʜᴀᴛ Bᴏᴛ Oɴ", data="onchbot"), | |
Button.inline("Cʜᴀᴛ Bᴏᴛ Oғғ", data="ofchbot"), | |
], | |
[ | |
Button.inline("Bᴏᴛ Wᴇʟᴄᴏᴍᴇ", data="bwel"), | |
Button.inline("Bᴏᴛ Wᴇʟᴄᴏᴍᴇ Mᴇᴅɪᴀ", data="botmew"), | |
], | |
[Button.inline("Bᴏᴛ Iɴғᴏ Tᴇxᴛ", data="botinfe")], | |
[Button.inline("Fᴏʀᴄᴇ Sᴜʙsᴄʀɪʙᴇ", data="pmfs")], | |
[Button.inline("« Bᴀᴄᴋ", data="setter")], | |
], | |
}, | |
"vcb": { | |
"text": "From This Feature U can play songs in group voice chat\n\n[moreinfo](https://t.me/UltroidUpdates/4)", | |
"buttons": [ | |
[Button.inline("VC Sᴇssɪᴏɴ", data="abs_vcs")], | |
[Button.inline("« Bᴀᴄᴋ", data="setter")], | |
], | |
}, | |
"oofdm": { | |
"text": "About [Dual Mode](https://t.me/UltroidUpdates/18)", | |
"buttons": [ | |
[ | |
Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Oɴ", "dmof"), | |
Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Oғғ", "dmof"), | |
], | |
[Button.inline("Dᴜᴀʟ Mᴏᴅᴇ Hɴᴅʟʀ", "dmhn")], | |
[Button.inline("« Back", data="cbs_otvars")], | |
], | |
}, | |
"apiset": { | |
"text": get_string("ast_1"), | |
"buttons": [ | |
[Button.inline("Remove.bg API", data="abs_rmbg")], | |
[Button.inline("DEEP API", data="abs_dapi")], | |
[Button.inline("OCR API", data="abs_oapi")], | |
[Button.inline("« Back", data="setter")], | |
], | |
}, | |
} | |
_convo = { | |
"rmbg": { | |
"var": "RMBG_API", | |
"name": "Remove.bg API Key", | |
"text": get_string("ast_2"), | |
"back": "cbs_apiset", | |
}, | |
"dapi": { | |
"var": "DEEP_AI", | |
"name": "Deep AI Api Key", | |
"text": "Get Your Deep Api from deepai.org and send here.", | |
"back": "cbs_apiset", | |
}, | |
"oapi": { | |
"var": "OCR_API", | |
"name": "Ocr Api Key", | |
"text": "Get Your OCR api from ocr.space and send that Here.", | |
"back": "cbs_apiset", | |
}, | |
"pmlgg": { | |
"var": "PMLOGGROUP", | |
"name": "Pm Log Group", | |
"text": "Send chat id of chat which you want to save as Pm log Group.", | |
"back": "pml", | |
}, | |
"vcs": { | |
"var": "VC_SESSION", | |
"name": "Vc Session", | |
"text": "**Vc session**\nEnter the New session u generated for vc bot.\n\nUse /cancel to terminate the operation.", | |
"back": "cbs_vcb", | |
}, | |
"settag": { | |
"var": "TAG_LOG", | |
"name": "Tag Log Group", | |
"text": f"Make a group, add your assistant and make it admin.\nGet the `{HNDLR}id` of that group and send it here for tag logs.\n\nUse /cancel to cancel.", | |
"back": "taglog", | |
}, | |
"alvtx": { | |
"var": "ALIVE_TEXT", | |
"name": "Alive Text", | |
"text": "**Alive Text**\nEnter the new alive text.\n\nUse /cancel to terminate the operation.", | |
"back": "cbs_alvcstm", | |
}, | |
"sfexf": { | |
"var": "EXCLUDE_FED", | |
"name": "Excluded Fed", | |
"text": "Send the Fed IDs you want to exclude in the ban. Split by a space.\neg`id1 id2 id3`\nSet is as `None` if you dont want any.\nUse /cancel to go back.", | |
"back": "cbs_sfban", | |
}, | |
} | |
TOKEN_FILE = "resources/auths/auth_token.txt" | |
async def send(eve): | |
key, name = (eve.data_match.group(1)).decode("UTF-8").split("_") | |
thumb = "resources/extras/inline.jpg" | |
await eve.answer("■ Sending ■") | |
data = f"uh_{key}_" | |
index = None | |
if "|" in name: | |
name, index = name.split("|") | |
key = "plugins" if key == "Official" else key.lower() | |
plugin = f"{key}/{name}.py" | |
_ = f"pasta-{plugin}" | |
if index is not None: | |
data += f"|{index}" | |
_ += f"|{index}" | |
buttons = [ | |
[ | |
Button.inline( | |
"« Pᴀsᴛᴇ »", | |
data=_, | |
) | |
], | |
[ | |
Button.inline("« Bᴀᴄᴋ", data=data), | |
], | |
] | |
try: | |
await eve.edit(file=plugin, thumb=thumb, buttons=buttons) | |
except Exception as er: | |
await eve.answer(str(er), alert=True) | |
heroku_api, app_name = Var.HEROKU_API, Var.HEROKU_APP_NAME | |
async def update(eve): | |
repo = Repo() | |
ac_br = repo.active_branch | |
ups_rem = repo.remote("upstream") | |
if heroku_api: | |
import heroku3 | |
try: | |
heroku = heroku3.from_key(heroku_api) | |
heroku_app = None | |
heroku_applications = heroku.apps() | |
except BaseException as er: | |
LOGS.exception(er) | |
return await eve.edit("`Wrong HEROKU_API.`") | |
for app in heroku_applications: | |
if app.name == app_name: | |
heroku_app = app | |
if not heroku_app: | |
await eve.edit("`Wrong HEROKU_APP_NAME.`") | |
repo.__del__() | |
return | |
await eve.edit(get_string("clst_1")) | |
ups_rem.fetch(ac_br) | |
repo.git.reset("--hard", "FETCH_HEAD") | |
heroku_git_url = heroku_app.git_url.replace( | |
"https://", f"https://api:{heroku_api}@" | |
) | |
if "heroku" in repo.remotes: | |
remote = repo.remote("heroku") | |
remote.set_url(heroku_git_url) | |
else: | |
remote = repo.create_remote("heroku", heroku_git_url) | |
try: | |
remote.push(refspec=f"HEAD:refs/heads/{ac_br}", force=True) | |
except GitCommandError as error: | |
await eve.edit(f"`Here is the error log:\n{error}`") | |
repo.__del__() | |
return | |
await eve.edit("`Successfully Updated!\nRestarting, please wait...`") | |
else: | |
await eve.edit(get_string("clst_1")) | |
call_back() | |
await bash("git pull && pip3 install -r requirements.txt") | |
execl(sys.executable, sys.executable, "-m", "xteam") | |
async def changes(okk): | |
match = okk.data_match.group(1).decode("utf-8") | |
await okk.answer(get_string("clst_3")) | |
repo = Repo.init() | |
button = [[Button.inline("Update Now", data="updatenow")]] | |
changelog, tl_chnglog = await gen_chlog( | |
repo, f"HEAD..upstream/{repo.active_branch}" | |
) | |
cli = "\n\nClick the below button to update!" | |
if not match: | |
try: | |
if len(tl_chnglog) > 700: | |
tl_chnglog = f"{tl_chnglog[:700]}..." | |
button.append([Button.inline("View Complete", "changesall")]) | |
await okk.edit("• Writing Changelogs 📝 •") | |
img = await Carbon( | |
file_name="changelog", | |
code=tl_chnglog, | |
backgroundColor=choice(ATRA_COL), | |
language="md", | |
) | |
return await okk.edit( | |
f"**• Ultroid Userbot •**{cli}", file=img, buttons=button | |
) | |
except Exception as er: | |
LOGS.exception(er) | |
changelog_str = changelog + cli | |
if len(changelog_str) > 1024: | |
await okk.edit(get_string("upd_4")) | |
await asyncio.sleep(2) | |
with open("ultroid_updates.txt", "w+") as file: | |
file.write(tl_chnglog) | |
await okk.edit( | |
get_string("upd_5"), | |
file="ultroid_updates.txt", | |
buttons=button, | |
) | |
remove("ultroid_updates.txt") | |
return | |
await okk.edit( | |
changelog_str, | |
buttons=button, | |
parse_mode="html", | |
) | |
async def _(e): | |
ok = (e.data_match.group(1)).decode("UTF-8") | |
index = None | |
if "|" in ok: | |
ok, index = ok.split("|") | |
with open(ok, "r") as hmm: | |
_, key = await get_paste(hmm.read()) | |
link = f"https://spaceb.in/{key}" | |
raw = f"https://spaceb.in/api/v1/documents/{key}/raw" | |
if not _: | |
return await e.answer(key[:30], alert=True) | |
if ok.startswith("addons"): | |
key = "Addons" | |
elif ok.startswith("vcbot"): | |
key = "VCBot" | |
else: | |
key = "Official" | |
data = f"uh_{key}_" | |
if index is not None: | |
data += f"|{index}" | |
await e.edit( | |
"", | |
buttons=[ | |
[Button.url("Lɪɴᴋ", link), Button.url("Rᴀᴡ", raw)], | |
[Button.inline("« Bᴀᴄᴋ", data=data)], | |
], | |
) | |
async def _edit_to(event): | |
match = event.data_match.group(1).decode("utf-8") | |
data = _buttons.get(match) | |
if not data: | |
return | |
await event.edit(data["text"], buttons=data["buttons"], link_preview=False) | |
async def convo_handler(event: events.CallbackQuery): | |
match = event.data_match.group(1).decode("utf-8") | |
if not _convo.get(match): | |
return | |
await event.delete() | |
get_ = _convo[match] | |
back = get_["back"] | |
async with event.client.conversation(event.sender_id) as conv: | |
await conv.send_message(get_["text"]) | |
response = await conv.get_response() | |
themssg = response.message | |
try: | |
themssg = ast.literal_eval(themssg) | |
except Exception: | |
pass | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button(back), | |
) | |
await setit(event, get_["var"], themssg) | |
await conv.send_message( | |
f"{get_['name']} changed to `{themssg}`", | |
buttons=get_back_button(back), | |
) | |
async def _(e): | |
if not e.is_private: | |
return | |
url = GDrive._create_token_file() | |
await e.edit("Go to the below link and send the code!") | |
async with asst.conversation(e.sender_id) as conv: | |
await conv.send_message(url) | |
code = await conv.get_response() | |
if GDrive._create_token_file(code=code.text): | |
await conv.send_message( | |
"`Success!\nYou are all set to use Google Drive with Ultroid Userbot.`", | |
buttons=Button.inline("Main Menu", data="setter"), | |
) | |
else: | |
await conv.send_message("Wrong code! Click authorise again.") | |
async def _(e): | |
if not e.is_private: | |
return | |
msg = ( | |
"Send your FOLDER ID\n\n" | |
+ "For FOLDER ID:\n" | |
+ "1. Open Google Drive App.\n" | |
+ "2. Create Folder.\n" | |
+ "3. Make that folder public.\n" | |
+ "4. Send link of that folder." | |
) | |
await e.delete() | |
async with asst.conversation(e.sender_id, timeout=150) as conv: | |
await conv.send_message(msg) | |
repl = await conv.get_response() | |
id = repl.text | |
if id.startswith("https"): | |
id = id.split("?id=")[-1] | |
udB.set_key("GDRIVE_FOLDER_ID", id) | |
await repl.reply( | |
"`Success.`", | |
buttons=get_back_button("gdrive"), | |
) | |
async def _(e): | |
if not e.is_private: | |
return | |
await e.edit( | |
"Click Authorise and send the code.\n\nYou can use your own CLIENT ID and SECRET by [this](https://t.me/UltroidUpdates/37)", | |
buttons=[ | |
[ | |
Button.inline("Folder ID", data="folderid"), | |
Button.inline("Authorise", data="authorise"), | |
], | |
[Button.inline("« Back", data="cbs_otvars")], | |
], | |
link_preview=False, | |
) | |
async def rhwhe(e): | |
if udB.get_key("DUAL_MODE"): | |
udB.del_key("DUAL_MODE") | |
key = "Off" | |
else: | |
udB.set_key("DUAL_MODE", "True") | |
key = "On" | |
Msg = f"Dual Mode : {key}" | |
await e.edit(Msg, buttons=get_back_button("cbs_otvars")) | |
async def hndlrr(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "DUAL_HNDLR" | |
name = "Dual Handler" | |
CH = udB.get_key(var) or "/" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
f"Send The Symbol Which u want as Handler/Trigger to use your Assistant bot\nUr Current Handler is [ `{CH}` ]\n\n use /cancel to cancel.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif len(themssg) > 1: | |
await conv.send_message( | |
"Incorrect Handler", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
else: | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
async def emoji(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "EMOJI_IN_HELP" | |
name = f"Emoji in `{HNDLR}help` menu" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message("Send emoji u want to set 🙃.\n\nUse /cancel to cancel.") | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif themssg.startswith(("/", HNDLR)): | |
await conv.send_message( | |
"Incorrect Emoji", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
else: | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}\n", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
async def pluginch(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "PLUGIN_CHANNEL" | |
name = "Plugin Channel" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"Send id or username of a channel from where u want to install all plugins\n\nOur Channel~ @ultroidplugins\n\nUse /cancel to cancel.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif themssg.startswith(("/", HNDLR)): | |
await conv.send_message( | |
"Incorrect channel", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
else: | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}\n After Setting All Things Do Restart", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
async def hndlrr(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "HNDLR" | |
name = "Handler/ Trigger" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
f"Send The Symbol Which u want as Handler/Trigger to use bot\nUr Current Handler is [ `{HNDLR}` ]\n\n use /cancel to cancel.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif len(themssg) > 1: | |
await conv.send_message( | |
"Incorrect Handler", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif themssg.startswith(("/", "#", "@")): | |
await conv.send_message( | |
"This cannot be used as handler", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
else: | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
async def hndlrr(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "SUDO_HNDLR" | |
name = "Sudo Handler" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"Send The Symbol Which u want as Sudo Handler/Trigger to use bot\n\n use /cancel to cancel." | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif len(themssg) > 1: | |
await conv.send_message( | |
"Incorrect Handler", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
elif themssg.startswith(("/", "#", "@")): | |
await conv.send_message( | |
"This cannot be used as handler", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
else: | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}", | |
buttons=get_back_button("cbs_otvars"), | |
) | |
async def tagloggrr(e): | |
BUTTON = [ | |
[Button.inline("SET TAG LOG", data="abs_settag")], | |
[Button.inline("DELETE TAG LOG", data="deltag")], | |
get_back_button("cbs_otvars"), | |
] | |
await e.edit( | |
"Choose Options", | |
buttons=BUTTON, | |
) | |
async def _(e): | |
udB.del_key("TAG_LOG") | |
await e.answer("Done!!! Tag Logger has been turned Off") | |
async def pmset(event): | |
BT = ( | |
[Button.inline("Aᴅᴅᴏɴs Oғғ", data="edof")] | |
if udB.get_key("ADDONS") | |
else [Button.inline("Aᴅᴅᴏɴs Oɴ", data="edon")] | |
) | |
await event.edit( | |
"ADDONS~ Extra Plugins:", | |
buttons=[ | |
BT, | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")], | |
], | |
) | |
async def eddon(event): | |
var = "ADDONS" | |
await setit(event, var, "True") | |
await event.edit( | |
"Done! ADDONS has been turned on!!\n\n After Setting All Things Do Restart", | |
buttons=get_back_button("eaddon"), | |
) | |
async def eddof(event): | |
udB.set_key("ADDONS", "False") | |
await event.edit( | |
"Done! ADDONS has been turned off!! After Setting All Things Do Restart", | |
buttons=get_back_button("eaddon"), | |
) | |
async def pmset(event): | |
BT = ( | |
[Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ Oғғ", data="ofsudo")] | |
if udB.get_key("SUDO") | |
else [Button.inline("Sᴜᴅᴏ Mᴏᴅᴇ Oɴ", data="onsudo")] | |
) | |
await event.edit( | |
f"SUDO MODE ~ Some peoples can use ur Bot which u selected. To know More use `{HNDLR}help sudo`", | |
buttons=[ | |
BT, | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_otvars")], | |
], | |
) | |
async def eddon(event): | |
var = "SUDO" | |
await setit(event, var, "True") | |
await event.edit( | |
"Done! SUDO MODE has been turned on!!\n\n After Setting All Things Do Restart", | |
buttons=get_back_button("sudo"), | |
) | |
async def eddof(event): | |
var = "SUDO" | |
await setit(event, var, "False") | |
await event.edit( | |
"Done! SUDO MODE has been turned off!! After Setting All Things Do Restart", | |
buttons=get_back_button("sudo"), | |
) | |
async def sfgrp(event): | |
await event.delete() | |
name = "FBan Group ID" | |
var = "FBAN_GROUP_ID" | |
pru = event.sender_id | |
async with asst.conversation(pru) as conv: | |
await conv.send_message( | |
f"Make a group, add @MissRose_Bot, send `{HNDLR}id`, copy that and send it here.\nUse /cancel to go back.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_sfban"), | |
) | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}", | |
buttons=get_back_button("cbs_sfban"), | |
) | |
async def media(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "ALIVE_PIC" | |
name = "Alive Media" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"**Alive Media**\nSend me a pic/gif/media to set as alive media.\n\nUse /cancel to terminate the operation.", | |
) | |
response = await conv.get_response() | |
try: | |
themssg = response.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Operation cancelled!!", | |
buttons=get_back_button("cbs_alvcstm"), | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
if ( | |
not (response.text).startswith("/") | |
and response.text != "" | |
and (not response.media or isinstance(response.media, MessageMediaWebPage)) | |
): | |
url = text_to_url(response) | |
elif response.sticker: | |
url = response.file.id | |
else: | |
media = await event.client.download_media(response, "alvpc") | |
try: | |
x = upl(media) | |
url = f"https://graph.org/{x[0]}" | |
remove(media) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await conv.send_message( | |
"Terminated.", | |
buttons=get_back_button("cbs_alvcstm"), | |
) | |
await setit(event, var, url) | |
await conv.send_message( | |
f"{name} has been set.", | |
buttons=get_back_button("cbs_alvcstm"), | |
) | |
async def dell(event): | |
try: | |
udB.del_key("ALIVE_PIC") | |
return await event.edit( | |
get_string("clst_5"), buttons=get_back_button("cbs_alabs_vcstm") | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await event.edit( | |
get_string("clst_4"), | |
buttons=get_back_button("cbs_alabs_vcstm"), | |
) | |
async def inl_on(event): | |
var = "INLINE_PM" | |
await setit(event, var, "True") | |
await event.edit( | |
"Done!! PMPermit type has been set to inline!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_pmtype")]], | |
) | |
async def inl_on(event): | |
var = "INLINE_PM" | |
await setit(event, var, "False") | |
await event.edit( | |
"Done!! PMPermit type has been set to normal!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_pmtype")]], | |
) | |
async def name(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "PM_TEXT" | |
name = "PM Text" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"**PM Text**\nEnter the new Pmpermit text.\n\nu can use `{name}` `{fullname}` `{count}` `{mention}` `{username}` to get this from user Too\n\nUse /cancel to terminate the operation.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
if len(themssg) > 4090: | |
return await conv.send_message( | |
"Message too long!\nGive a shorter message please!!", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}\n\nAfter Setting All Things Do restart", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
async def name(event): | |
m = range(1, 10) | |
tultd = [Button.inline(f"{x}", data=f"wrns_{x}") for x in m] | |
lst = list(zip(tultd[::3], tultd[1::3], tultd[2::3])) | |
lst.append([Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")]) | |
await event.edit( | |
"Select the number of warnings for a user before getting blocked in PMs.", | |
buttons=lst, | |
) | |
async def set_wrns(event): | |
value = int(event.data_match.group(1).decode("UTF-8")) | |
if dn := udB.set_key("PMWARNS", value): | |
await event.edit( | |
f"PM Warns Set to {value}.\nNew users will have {value} chances in PMs before getting banned.", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
else: | |
await event.edit( | |
f"Something went wrong, please check your {HNDLR}logs!", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
async def media(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "PMPIC" | |
name = "PM Media" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"**PM Media**\nSend me a pic/gif/sticker/link to set as pmpermit media.\n\nUse /cancel to terminate the operation.", | |
) | |
response = await conv.get_response() | |
try: | |
themssg = response.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Operation cancelled!!", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
media = await event.client.download_media(response, "pmpc") | |
if ( | |
not (response.text).startswith("/") | |
and response.text != "" | |
and (not response.media or isinstance(response.media, MessageMediaWebPage)) | |
): | |
url = text_to_url(response) | |
elif response.sticker: | |
url = response.file.id | |
else: | |
try: | |
x = upl(media) | |
url = f"https://graph.org/{x[0]}" | |
remove(media) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await conv.send_message( | |
"Terminated.", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
await setit(event, var, url) | |
await conv.send_message( | |
f"{name} has been set.", | |
buttons=get_back_button("cbs_pmcstm"), | |
) | |
async def dell(event): | |
try: | |
udB.del_key("PMPIC") | |
return await event.edit( | |
get_string("clst_5"), buttons=get_back_button("cbs_pmcstm") | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await event.edit( | |
get_string("clst_4"), | |
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]], | |
) | |
async def apon(event): | |
var = "AUTOAPPROVE" | |
await setit(event, var, "True") | |
await event.edit( | |
"Done!! AUTOAPPROVE Started!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_apauto")]], | |
) | |
async def apof(event): | |
try: | |
udB.set_key("AUTOAPPROVE", "False") | |
return await event.edit( | |
"Done! AUTOAPPROVE Stopped!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_apauto")]], | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await event.edit( | |
get_string("clst_4"), | |
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]], | |
) | |
async def l_vcs(event): | |
BT = ( | |
[Button.inline("PMLOGGER OFF", data="pmlogof")] | |
if udB.get_key("PMLOG") | |
else [Button.inline("PMLOGGER ON", data="pmlog")] | |
) | |
await event.edit( | |
"PMLOGGER This Will Forward Ur Pm to Ur Private Group -", | |
buttons=[ | |
BT, | |
[Button.inline("PᴍLᴏɢɢᴇʀ Gʀᴏᴜᴘ", "abs_pmlgg")], | |
[Button.inline("« Bᴀᴄᴋ", data="cbs_pmcstm")], | |
], | |
) | |
async def pmlog(event): | |
await setit(event, "PMLOG", "True") | |
await event.edit( | |
"Done!! PMLOGGER Started!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="pml")]], | |
) | |
async def pmlogof(event): | |
try: | |
udB.del_key("PMLOG") | |
return await event.edit( | |
"Done! PMLOGGER Stopped!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="pml")]], | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await event.edit( | |
get_string("clst_4"), | |
buttons=[[Button.inline("« Sᴇᴛᴛɪɴɢs", data="setter")]], | |
) | |
async def pmonn(event): | |
var = "PMSETTING" | |
await setit(event, var, "True") | |
await event.edit( | |
"Done! PMPermit has been turned on!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")]], | |
) | |
async def pmofff(event): | |
var = "PMSETTING" | |
await setit(event, var, "False") | |
await event.edit( | |
"Done! PMPermit has been turned off!!", | |
buttons=[[Button.inline("« Bᴀᴄᴋ", data="cbs_ppmset")]], | |
) | |
async def hhh(e): | |
async with e.client.conversation(e.chat_id) as conv: | |
await conv.send_message("Send Any Media to keep at your Bot's welcome ") | |
msg = await conv.get_response() | |
if not msg.media or msg.text.startswith("/"): | |
return await conv.send_message( | |
"Terminated!", buttons=get_back_button("cbs_chatbot") | |
) | |
udB.set_key("STARTMEDIA", msg.file.id) | |
await conv.send_message("Done!", buttons=get_back_button("cbs_chatbot")) | |
async def hhh(e): | |
async with e.client.conversation(e.chat_id) as conv: | |
await conv.send_message( | |
"Send message to set to Display, when user Press Info button in Bot Welcome!\n\nsend `False` to completely remove that button.." | |
) | |
msg = await conv.get_response() | |
if msg.media or msg.text.startswith("/"): | |
return await conv.send_message( | |
"Terminated!", buttons=get_back_button("cbs_chatbot") | |
) | |
udB.set_key("BOT_INFO_START", msg.text) | |
await conv.send_message("Done!", buttons=get_back_button("cbs_chatbot")) | |
async def heheh(event): | |
Ll = [] | |
err = "" | |
async with event.client.conversation(event.chat_id) as conv: | |
await conv.send_message( | |
"• Send The Chat Id(s), which you want user to Join Before using Chat/Pm Bot\n\n• Send /clear to disable PmBot Force sub..\n• • Send /cancel to stop this process.." | |
) | |
await conv.send_message( | |
"Example : \n`-1001234567\n-100778888`\n\nFor Multiple Chat(s)." | |
) | |
try: | |
msg = await conv.get_response() | |
except AsyncTimeOut: | |
return await conv.send_message("**• TimeUp!**\nStart from /start back.") | |
if not msg.text or msg.text.startswith("/"): | |
timyork = "Cancelled!" | |
if msg.text == "/clear": | |
udB.del_key("PMBOT_FSUB") | |
timyork = "Done! Force Subscribe Stopped\nRestart your Bot!" | |
return await conv.send_message( | |
"Cancelled!", buttons=get_back_button("cbs_chatbot") | |
) | |
for chat in msg.message.split("\n"): | |
if chat.startswith("-") or chat.isdigit(): | |
chat = int(chat) | |
try: | |
CHSJSHS = await event.client.get_entity(chat) | |
Ll.append(get_peer_id(CHSJSHS)) | |
except Exception as er: | |
err += f"**{chat}** : {er}\n" | |
if err: | |
return await conv.send_message(err) | |
udB.set_key("PMBOT_FSUB", str(Ll)) | |
await conv.send_message( | |
"Done!\nRestart Your Bot.", buttons=get_back_button("cbs_chatbot") | |
) | |
async def name(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "STARTMSG" | |
name = "Bot Welcome Message:" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"**BOT WELCOME MSG**\nEnter the msg which u want to show when someone start your assistant Bot.\nYou Can use `{me}` , `{mention}` Parameters Too\nUse /cancel to terminate the operation.", | |
) | |
response = conv.wait_event(events.NewMessage(chats=pru)) | |
response = await response | |
themssg = response.message.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Cancelled!!", | |
buttons=get_back_button("cbs_chatbot"), | |
) | |
await setit(event, var, themssg) | |
await conv.send_message( | |
f"{name} changed to {themssg}", | |
buttons=get_back_button("cbs_chatbot"), | |
) | |
async def chon(event): | |
var = "PMBOT" | |
await setit(event, var, "True") | |
Loader(path="assistant/pmbot.py", key="PM Bot").load() | |
if AST_PLUGINS.get("pmbot"): | |
for i, e in AST_PLUGINS["pmbot"]: | |
event.client.remove_event_handler(i) | |
for i, e in AST_PLUGINS["pmbot"]: | |
event.client.add_event_handler(i, events.NewMessage(**e)) | |
await event.edit( | |
"Done! Now u Can Chat With People Via This Bot", | |
buttons=[Button.inline("« Bᴀᴄᴋ", data="cbs_chatbot")], | |
) | |
async def chon(event): | |
var = "PMBOT" | |
await setit(event, var, "False") | |
if AST_PLUGINS.get("pmbot"): | |
for i, e in AST_PLUGINS["pmbot"]: | |
event.client.remove_event_handler(i) | |
await event.edit( | |
"Done! Chat People Via This Bot Stopped.", | |
buttons=[Button.inline("« Bᴀᴄᴋ", data="cbs_chatbot")], | |
) | |
async def media(event): | |
await event.delete() | |
pru = event.sender_id | |
var = "INLINE_PIC" | |
name = "Inline Media" | |
async with event.client.conversation(pru) as conv: | |
await conv.send_message( | |
"**Inline Media**\nSend me a pic/gif/ or link to set as inline media.\n\nUse /cancel to terminate the operation.", | |
) | |
response = await conv.get_response() | |
try: | |
themssg = response.message | |
if themssg == "/cancel": | |
return await conv.send_message( | |
"Operation cancelled!!", | |
buttons=get_back_button("setter"), | |
) | |
except BaseException as er: | |
LOGS.exception(er) | |
media = await event.client.download_media(response, "inlpic") | |
if ( | |
not (response.text).startswith("/") | |
and response.text != "" | |
and (not response.media or isinstance(response.media, MessageMediaWebPage)) | |
): | |
url = text_to_url(response) | |
else: | |
try: | |
x = upl(media) | |
url = f"https://graph.org/{x[0]}" | |
remove(media) | |
except BaseException as er: | |
LOGS.exception(er) | |
return await conv.send_message( | |
"Terminated.", | |
buttons=get_back_button("setter"), | |
) | |
await setit(event, var, url) | |
await conv.send_message( | |
f"{name} has been set.", | |
buttons=get_back_button("setter"), | |
) | |
FD_MEDIA = {} | |
async def fdroid_dler(event): | |
uri = event.data_match.group(1).decode("utf-8") | |
if FD_MEDIA.get(uri): | |
return await event.edit(file=FD_MEDIA[uri]) | |
await event.answer("• Starting Download •", alert=True) | |
await event.edit("• Downloading.. •") | |
URL = f"https://f-droid.org/packages/{uri}" | |
conte = await async_searcher(URL, re_content=True) | |
BSC = bs(conte, "html.parser", from_encoding="utf-8") | |
dl_ = BSC.find("p", "package-version-download").find("a")["href"] | |
title = BSC.find("h3", "package-name").text.strip() | |
thumb = BSC.find("img", "package-icon")["src"] | |
if thumb.startswith("/"): | |
thumb = f"https://f-droid.org{thumb}" | |
thumb, _ = await fast_download(thumb, filename=f"{uri}.png") | |
s_time = time.time() | |
file, _ = await fast_download( | |
dl_, | |
filename=f"{title}.apk", | |
progress_callback=lambda d, t: asyncio.get_event_loop().create_task( | |
progress( | |
d, | |
t, | |
event, | |
s_time, | |
"Downloading...", | |
) | |
), | |
) | |
time.time() | |
n_file = await event.client.fast_uploader( | |
file, show_progress=True, event=event, message="Uploading...", to_delete=True | |
) | |
buttons = Button.switch_inline("Search Back", query="fdroid", same_peer=True) | |
try: | |
msg = await event.edit( | |
f"**• [{title}]({URL}) •**", file=n_file, thumb=thumb, buttons=buttons | |
) | |
except Exception as er: | |
LOGS.exception(er) | |
try: | |
msg = await event.client.edit_message( | |
await event.get_input_chat(), | |
event.message_id, | |
f"**• [{title}]({URL}) •**", | |
buttons=buttons, | |
thumb=thumb, | |
file=n_file, | |
) | |
except Exception as er: | |
os.remove(thumb) | |
LOGS.exception(er) | |
return await event.edit(f"**ERROR**: `{er}`", buttons=buttons) | |
if msg and hasattr(msg, "media"): | |
FD_MEDIA.update({uri: msg.media}) | |
os.remove(thumb) | |