Spaces:
Runtime error
Runtime error
import os | |
import re | |
import subprocess | |
import sys | |
import traceback | |
from inspect import getfullargspec | |
from io import StringIO | |
from time import time | |
from pyrogram import filters | |
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message | |
from Devine import app | |
from config import OWNER_ID | |
async def aexec(code, client, message): | |
exec( | |
"async def __aexec(client, message): " | |
+ "".join(f"\n {a}" for a in code.split("\n")) | |
) | |
return await locals()["__aexec"](client, message) | |
async def edit_or_reply(msg: Message, **kwargs): | |
func = msg.edit_text if msg.from_user.is_self else msg.reply | |
spec = getfullargspec(func.__wrapped__).args | |
await func(**{k: v for k, v in kwargs.items() if k in spec}) | |
async def executor(client: app, message: Message): | |
if len(message.command) < 2: | |
return await edit_or_reply(message, text="<b>ᴡʜᴀᴛ ʏᴏᴜ ᴡᴀɴɴᴀ ᴇxᴇᴄᴜᴛᴇ ᴍʏ ʟᴏʀᴅ ?</b>") | |
try: | |
cmd = message.text.split(" ", maxsplit=1)[1] | |
except IndexError: | |
return await message.delete() | |
t1 = time() | |
old_stderr = sys.stderr | |
old_stdout = sys.stdout | |
redirected_output = sys.stdout = StringIO() | |
redirected_error = sys.stderr = StringIO() | |
stdout, stderr, exc = None, None, None | |
try: | |
await aexec(cmd, client, message) | |
except Exception: | |
exc = traceback.format_exc() | |
stdout = redirected_output.getvalue() | |
stderr = redirected_error.getvalue() | |
sys.stdout = old_stdout | |
sys.stderr = old_stderr | |
evaluation = "\n" | |
if exc: | |
evaluation += exc | |
elif stderr: | |
evaluation += stderr | |
elif stdout: | |
evaluation += stdout | |
else: | |
evaluation += "Success" | |
final_output = f"<b>⥤ ʀᴇsᴜʟᴛ :</b>\n<pre language='python'>{evaluation}</pre>" | |
if len(final_output) > 4096: | |
filename = "output.txt" | |
with open(filename, "w+", encoding="utf8") as out_file: | |
out_file.write(str(evaluation)) | |
t2 = time() | |
keyboard = InlineKeyboardMarkup( | |
[ | |
[ | |
InlineKeyboardButton( | |
text="⏳", | |
callback_data=f"runtime {t2-t1} Seconds", | |
) | |
] | |
] | |
) | |
await message.reply_document( | |
document=filename, | |
caption=f"<b>⥤ ᴇᴠᴀʟ :</b>\n<code>{cmd[0:980]}</code>\n\n<b>⥤ ʀᴇsᴜʟᴛ :</b>\nAttached Document", | |
quote=False, | |
reply_markup=keyboard, | |
) | |
await message.delete() | |
os.remove(filename) | |
else: | |
t2 = time() | |
keyboard = InlineKeyboardMarkup( | |
[ | |
[ | |
InlineKeyboardButton( | |
text="⏳", | |
callback_data=f"runtime {round(t2-t1, 3)} Seconds", | |
), | |
InlineKeyboardButton( | |
text="🗑", | |
callback_data=f"forceclose abc|{message.from_user.id}", | |
), | |
] | |
] | |
) | |
await edit_or_reply(message, text=final_output, reply_markup=keyboard) | |
async def runtime_func_cq(_, cq): | |
runtime = cq.data.split(None, 1)[1] | |
await cq.answer(runtime, show_alert=True) | |
async def forceclose_command(_, CallbackQuery): | |
callback_data = CallbackQuery.data.strip() | |
callback_request = callback_data.split(None, 1)[1] | |
query, user_id = callback_request.split("|") | |
if CallbackQuery.from_user.id != int(user_id): | |
try: | |
return await CallbackQuery.answer( | |
"<b>‣ ɪᴛ'ʟʟ ʙᴇ ʙᴇᴛᴛᴇʀ ɪғ ʏᴏᴜ sᴛᴀʏ ɪɴ ʏᴏᴜʀ ʟɪᴍɪᴛs ʙᴀʙʏe.</b>", show_alert=True | |
) | |
except: | |
return | |
await CallbackQuery.message.delete() | |
try: | |
await CallbackQuery.answer() | |
except: | |
return | |
async def shellrunner(_, message: Message): | |
if len(message.command) < 2: | |
return await edit_or_reply(message, text="<b>ᴇxᴀᴍᴩʟᴇ :</b>\n/sh git pull") | |
text = message.text.split(None, 1)[1] | |
if "\n" in text: | |
code = text.split("\n") | |
output = "" | |
for x in code: | |
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) | |
try: | |
process = subprocess.Popen( | |
shell, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
except Exception as err: | |
await edit_or_reply(message, text=f"<b>ERROR :</b>\n<pre>{err}</pre>") | |
output += f"<b>{code}</b>\n" | |
output += process.stdout.read()[:-1].decode("utf-8") | |
output += "\n" | |
else: | |
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) | |
for a in range(len(shell)): | |
shell[a] = shell[a].replace('"', "") | |
try: | |
process = subprocess.Popen( | |
shell, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
except Exception as err: | |
print(err) | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
errors = traceback.format_exception( | |
etype=exc_type, | |
value=exc_obj, | |
tb=exc_tb, | |
) | |
return await edit_or_reply( | |
message, text=f"<b>ERROR :</b>\n<pre>{''.join(errors)}</pre>" | |
) | |
output = process.stdout.read()[:-1].decode("utf-8") | |
if str(output) == "\n": | |
output = None | |
if output: | |
if len(output) > 4096: | |
with open("output.txt", "w+") as file: | |
file.write(output) | |
await app.send_document( | |
message.chat.id, | |
"output.txt", | |
reply_to_message_id=message.id, | |
caption="<code>Output</code>", | |
) | |
return os.remove("output.txt") | |
await edit_or_reply(message, text=f"<b>OUTPUT :</b>\n<pre>{output}</pre>") | |
else: | |
await edit_or_reply(message, text="<b>OUTPUT :</b>\n<code>None</code>") | |
await message.stop_propagation() | |