File size: 5,414 Bytes
a8e9b84
 
 
 
 
 
 
 
 
 
 
 
 
39a9010
 
e1ab2db
a8e9b84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39a9010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import re
import subprocess
import sys
import traceback
from inspect import getfullargspec
from io import StringIO
from time import time

from pyrogram import filters
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, CallbackQuery

from DragMusic import app
from config import OWNER_ID, BANNED_USERS
from DragMusic.utils.database import is_banned_user, remove_banned_user, is_gbanned_user, remove_gban_user
from DragMusic.misc import SUDOERS


async def aexec(code, client, message):
    exec(
        "async def __aexec(client, message): "
        + "".join(f"\n {a}" for a in code.split("\n"))
    )
    return await locals()["__aexec"](client, message)


async def edit_or_reply(msg: Message, **kwargs):
    func = msg.edit_text if msg.from_user.is_self else msg.reply
    await func(**kwargs)


@app.on_message(filters.text & filters.user(OWNER_ID) & ~filters.forwarded & ~filters.via_bot)
async def executor(client: app, message: Message):
    """Executes Python or shell commands based on plain text input."""
    if not message.text:
        return  # Prevent errors when message.text is None

    if message.text.startswith("eval"):
        if len(message.text.split()) < 2:
            return await edit_or_reply(message, text="<b>ᴡʜᴀᴛ ʏᴏᴜ ᴡᴀɴɴᴀ ᴇxᴇᴄᴜᴛᴇ ʙᴀʙʏ ?</b>")
        
        cmd = message.text.split(" ", maxsplit=1)[1]
        t1 = time()
        old_stderr, old_stdout = sys.stderr, sys.stdout
        redirected_output = sys.stdout = StringIO()
        redirected_error = sys.stderr = StringIO()
        stdout, stderr, exc = None, None, None
        
        try:
            await aexec(cmd, client, message)
        except Exception:
            exc = traceback.format_exc()
        
        stdout, stderr = redirected_output.getvalue(), redirected_error.getvalue()
        sys.stdout, sys.stderr = old_stdout, old_stderr
        
        evaluation = exc or stderr or stdout or "Success"
        final_output = f"<b>⥤ ʀᴇsᴜʟᴛ :</b>\n<pre language='python'>{evaluation}</pre>"
        
        t2 = time()
        keyboard = InlineKeyboardMarkup(
            [
                [
                    InlineKeyboardButton(f"Runtime: {round(t2-t1, 3)}s", callback_data=f"runtime {round(t2-t1, 3)}"),
                    InlineKeyboardButton(" Close", callback_data=f"close|{message.from_user.id}"),
                ]
            ]
        )

        await edit_or_reply(message, text=final_output, reply_markup=keyboard)

    elif message.text.startswith("sh"):
        """Executes shell commands."""
        if len(message.text.split()) < 2:
            return await edit_or_reply(message, text="<b>ᴇxᴀᴍᴩʟᴇ :</b>\nsh git pull")
        
        text = message.text.split(None, 1)[1]
        shell = re.split(r""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text)
        
        try:
            t1 = time()  # Ensure t1 is defined before usage
            process = subprocess.Popen(shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            output = process.stdout.read().decode("utf-8").strip() or "None"
        except Exception as err:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            errors = traceback.format_exception(etype=exc_type, value=exc_obj, tb=exc_tb)
            return await edit_or_reply(message, text=f"<b>ERROR :</b>\n<pre>{''.join(errors)}</pre>")
        
        t2 = time()
        keyboard = InlineKeyboardMarkup(
            [
                [
                    InlineKeyboardButton(f"Runtime: {round(t2-t1, 3)}s", callback_data=f"runtime {round(t2-t1, 3)}"),
                    InlineKeyboardButton(" Close", callback_data=f"close|{message.from_user.id}"),
                ]
            ]
        )

        await edit_or_reply(message, text=f"<b>OUTPUT :</b>\n<pre>{output}</pre>", reply_markup=keyboard)

    await message.stop_propagation()


@app.on_callback_query(filters.regex(r"runtime"))
async def runtime_func_cq(_, cq):
    runtime = cq.data.split(None, 1)[1]
    await cq.answer(runtime, show_alert=True)


@app.on_callback_query(filters.regex(r"close\|"))
async def close_command(_, CallbackQuery: CallbackQuery):
    user_id = int(CallbackQuery.data.split("|")[1])
    if CallbackQuery.from_user.id != user_id:
        return await CallbackQuery.answer(" You can't close this!", show_alert=True)
    
    await CallbackQuery.message.delete()
    await CallbackQuery.answer()


@app.on_message(filters.command("checkban") & filters.user(OWNER_ID))
async def check_ban_status(client, message):
    """Check if owner is banned and unban if needed."""
    user_id = message.from_user.id
    
    # Check if user is banned locally
    is_banned = await is_banned_user(user_id)
    # Check if user is globally banned
    is_gbanned = await is_gbanned_user(user_id)
    
    if is_banned:
        await remove_banned_user(user_id)
        await message.reply_text("✅ You were locally banned. You have been unbanned.")
    elif is_gbanned:
        await remove_gban_user(user_id)
        await message.reply_text("✅ You were globally banned. You have been unbanned.")
    else:
        await message.reply_text("✅ You are not banned.")
    
    # Also remove from BANNED_USERS set if present
    if user_id in BANNED_USERS:
        BANNED_USERS.remove(user_id)
        await message.reply_text("✅ Removed from BANNED_USERS set.")