Spaces:
Paused
Paused
File size: 8,498 Bytes
2068ffe cfd8710 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
from datetime import datetime, timedelta
import re
from pyrogram import filters
from pyrogram.errors import PeerIdInvalid
from pyrogram.types import Message, ChatPermissions
from DragMusic import app
from DragMusic.utils.decorators.admins import AdminActual
from DragMusic.utils.formatters import parse_time
# A helper function to extract user and time from a message
async def extract_user_and_time(message: Message, client):
user = None
time_val = None
if message.reply_to_message:
user = message.reply_to_message.from_user
if len(message.command) > 1:
time_val = message.command[1]
elif len(message.command) > 1:
identifier = message.command[1]
try:
user = await client.get_users(int(identifier))
except (ValueError, PeerIdInvalid):
try:
user = await client.get_users(identifier)
except PeerIdInvalid:
await message.reply_text("Invalid user ID or username.")
return None, None
if len(message.command) > 2:
time_val = message.command[2]
else:
await message.reply_text("Reply to a user or provide a username/user ID.")
return None, None
return user, time_val
@app.on_message(filters.command("ban") & filters.group)
@AdminActual
async def ban_command(client, message: Message, _):
target_user, _ = await extract_user_and_time(message, client)
if not target_user:
return
try:
await client.ban_chat_member(message.chat.id, target_user.id)
await message.reply_text(f"Banned {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to ban user. Error: {e}")
@app.on_message(filters.command("tban") & filters.group)
@AdminActual
async def tban_command(client, message: Message, _):
target_user, time_val = await extract_user_and_time(message, client)
if not target_user:
return
if not time_val:
return await message.reply_text("Usage: /tban <user> <duration> (e.g., 1h, 3d).")
duration = parse_time(time_val)
if not duration:
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).")
try:
until_date = datetime.now() + duration
await client.ban_chat_member(message.chat.id, target_user.id, until_date=until_date)
await message.reply_text(f"Banned {target_user.mention} for {time_val}.")
except Exception as e:
await message.reply_text(f"Failed to ban user. Error: {e}")
@app.on_message(filters.command("unban") & filters.group)
@AdminActual
async def unban_command(client, message: Message, _):
target_user, _ = await extract_user_and_time(message, client)
if not target_user:
return
try:
await client.unban_chat_member(message.chat.id, target_user.id)
await message.reply_text(f"Unbanned {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to unban user. Error: {e}")
@app.on_message(filters.command("mute") & filters.group)
@AdminActual
async def mute_command(client, message: Message, _):
target_user, _ = await extract_user_and_time(message, client)
if not target_user:
return
try:
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions())
await message.reply_text(f"Muted {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to mute user. Error: {e}")
@app.on_message(filters.command("tmute") & filters.group)
@AdminActual
async def tmute_command(client, message: Message, _):
target_user, time_val = await extract_user_and_time(message, client)
if not target_user:
return
if not time_val:
return await message.reply_text("Usage: /tmute <user> <duration> (e.g., 1h, 3d).")
duration = parse_time(time_val)
if not duration:
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).")
try:
until_date = datetime.now() + duration
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions(), until_date=until_date)
await message.reply_text(f"Muted {target_user.mention} for {time_val}.")
except Exception as e:
await message.reply_text(f"Failed to mute user. Error: {e}")
@app.on_message(filters.command("unmute") & filters.group)
@AdminActual
async def unmute_command(client, message: Message, _):
target_user, _ = await extract_user_and_time(message, client)
if not target_user:
return
try:
await client.unban_chat_member(message.chat.id, target_user.id) # Un-restricting is done with unban_chat_member
await message.reply_text(f"Unmuted {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to unmute user. Error: {e}")
@app.on_message(filters.command("kick") & filters.group)
@AdminActual
async def kick_command(client, message: Message, _):
target_user, _ = await extract_user_and_time(message, client)
if not target_user:
return
try:
await client.ban_chat_member(message.chat.id, target_user.id)
await client.unban_chat_member(message.chat.id, target_user.id)
await message.reply_text(f"Kicked {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to kick user. Error: {e}")
@app.on_message(filters.command("kickme") & filters.group)
async def kickme_command(client, message: Message):
try:
await client.ban_chat_member(message.chat.id, message.from_user.id)
await client.unban_chat_member(message.chat.id, message.from_user.id)
await message.reply_text("As you wish.")
except Exception as e:
await message.reply_text(f"I can't kick you. Maybe I'm not an admin? Error: {e}")
# Delete variants
@app.on_message(filters.command(["dban", "sban", "dmute", "smute", "dkick", "skick"]) & filters.group)
@AdminActual
async def action_variants(client, message: Message, _):
command = message.command[0].lower()
if command.startswith('s') and not message.reply_to_message:
target_user, time_val = await extract_user_and_time(message, client)
else:
target_user, time_val = await extract_user_and_time(message, client)
if not target_user:
return
# Delete the command message for silent actions
if command.startswith('s'):
try:
await message.delete()
except Exception:
pass # Ignore if it fails
# Delete the replied-to message for 'd' actions
if command.startswith('d') and message.reply_to_message:
try:
await message.reply_to_message.delete()
except Exception:
pass # Ignore if it fails
action_text = ""
# Perform the action
try:
if 'ban' in command:
duration = parse_time(time_val) if time_val else None
until_date = datetime.now() + duration if duration else None
await client.ban_chat_member(message.chat.id, target_user.id, until_date=until_date)
action_text = f"Banned {target_user.mention}"
if time_val:
action_text += f" for {time_val}"
action_text += "."
elif 'mute' in command:
duration = parse_time(time_val) if time_val else None
until_date = datetime.now() + duration if duration else None
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions(), until_date=until_date)
action_text = f"Muted {target_user.mention}"
if time_val:
action_text += f" for {time_val}"
action_text += "."
elif 'kick' in command:
await client.ban_chat_member(message.chat.id, target_user.id)
await client.unban_chat_member(message.chat.id, target_user.id)
action_text = f"Kicked {target_user.mention}."
# Send confirmation only for 'd' commands, as 's' commands are silent
if command.startswith('d'):
await message.reply_text(action_text)
except Exception as e:
if not command.startswith('s'): # Don't send error for silent commands
await message.reply_text(f"Failed to perform action. Error: {e}") |