import os import re import subprocess import sys import traceback from inspect import getfullargspec from io import StringIO from time import time from pyrogram import filters from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, CallbackQuery from DragMusic import app from config import OWNER_ID, BANNED_USERS from DragMusic.utils.database import is_banned_user, remove_banned_user, is_gbanned_user, remove_gban_user from DragMusic.misc import SUDOERS async def aexec(code, client, message): exec( "async def __aexec(client, message): " + "".join(f"\n {a}" for a in code.split("\n")) ) return await locals()["__aexec"](client, message) async def edit_or_reply(msg: Message, **kwargs): func = msg.edit_text if msg.from_user.is_self else msg.reply await func(**kwargs) @app.on_message(filters.text & filters.user(OWNER_ID) & ~filters.forwarded & ~filters.via_bot) async def executor(client: app, message: Message): """Executes Python or shell commands based on plain text input.""" if not message.text: return # Prevent errors when message.text is None if message.text.startswith("eval"): if len(message.text.split()) < 2: return await edit_or_reply(message, text="ᴡʜᴀᴛ ʏᴏᴜ ᴡᴀɴɴᴀ ᴇxᴇᴄᴜᴛᴇ ʙᴀʙʏ ?") cmd = message.text.split(" ", maxsplit=1)[1] t1 = time() old_stderr, old_stdout = sys.stderr, sys.stdout redirected_output = sys.stdout = StringIO() redirected_error = sys.stderr = StringIO() stdout, stderr, exc = None, None, None try: await aexec(cmd, client, message) except Exception: exc = traceback.format_exc() stdout, stderr = redirected_output.getvalue(), redirected_error.getvalue() sys.stdout, sys.stderr = old_stdout, old_stderr evaluation = exc or stderr or stdout or "Success" final_output = f"⥤ ʀᴇsᴜʟᴛ :\n
{evaluation}" t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"Runtime: {round(t2-t1, 3)}s", callback_data=f"runtime {round(t2-t1, 3)}"), InlineKeyboardButton(" Close", callback_data=f"close|{message.from_user.id}"), ] ] ) await edit_or_reply(message, text=final_output, reply_markup=keyboard) elif message.text.startswith("sh"): """Executes shell commands.""" if len(message.text.split()) < 2: return await edit_or_reply(message, text="ᴇxᴀᴍᴩʟᴇ :\nsh git pull") text = message.text.split(None, 1)[1] shell = re.split(r""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) try: t1 = time() # Ensure t1 is defined before usage process = subprocess.Popen(shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = process.stdout.read().decode("utf-8").strip() or "None" except Exception as err: exc_type, exc_obj, exc_tb = sys.exc_info() errors = traceback.format_exception(etype=exc_type, value=exc_obj, tb=exc_tb) return await edit_or_reply(message, text=f"ERROR :\n
{''.join(errors)}") t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton(f"Runtime: {round(t2-t1, 3)}s", callback_data=f"runtime {round(t2-t1, 3)}"), InlineKeyboardButton(" Close", callback_data=f"close|{message.from_user.id}"), ] ] ) await edit_or_reply(message, text=f"OUTPUT :\n
{output}", reply_markup=keyboard) await message.stop_propagation() @app.on_callback_query(filters.regex(r"runtime")) async def runtime_func_cq(_, cq): runtime = cq.data.split(None, 1)[1] await cq.answer(runtime, show_alert=True) @app.on_callback_query(filters.regex(r"close\|")) async def close_command(_, CallbackQuery: CallbackQuery): user_id = int(CallbackQuery.data.split("|")[1]) if CallbackQuery.from_user.id != user_id: return await CallbackQuery.answer(" You can't close this!", show_alert=True) await CallbackQuery.message.delete() await CallbackQuery.answer() @app.on_message(filters.command("checkban") & filters.user(OWNER_ID)) async def check_ban_status(client, message): """Check if owner is banned and unban if needed.""" user_id = message.from_user.id # Check if user is banned locally is_banned = await is_banned_user(user_id) # Check if user is globally banned is_gbanned = await is_gbanned_user(user_id) if is_banned: await remove_banned_user(user_id) await message.reply_text("✅ You were locally banned. You have been unbanned.") elif is_gbanned: await remove_gban_user(user_id) await message.reply_text("✅ You were globally banned. You have been unbanned.") else: await message.reply_text("✅ You are not banned.") # Also remove from BANNED_USERS set if present if user_id in BANNED_USERS: BANNED_USERS.remove(user_id) await message.reply_text("✅ Removed from BANNED_USERS set.")