Spaces:
Paused
Paused
from datetime import datetime, timedelta | |
import re | |
from pyrogram import filters | |
from pyrogram.errors import PeerIdInvalid | |
from pyrogram.types import Message, ChatPermissions | |
from DragMusic import app | |
from DragMusic.utils.decorators.admins import AdminActual | |
from DragMusic.utils.formatters import parse_time | |
# A helper function to extract user and time from a message | |
async def extract_user_and_time(message: Message, client): | |
user = None | |
time_val = None | |
if message.reply_to_message: | |
user = message.reply_to_message.from_user | |
if len(message.command) > 1: | |
time_val = message.command[1] | |
elif len(message.command) > 1: | |
identifier = message.command[1] | |
try: | |
user = await client.get_users(int(identifier)) | |
except (ValueError, PeerIdInvalid): | |
try: | |
user = await client.get_users(identifier) | |
except PeerIdInvalid: | |
await message.reply_text("Invalid user ID or username.") | |
return None, None | |
if len(message.command) > 2: | |
time_val = message.command[2] | |
else: | |
await message.reply_text("Reply to a user or provide a username/user ID.") | |
return None, None | |
return user, time_val | |
async def ban_command(client, message: Message, _): | |
target_user, _ = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
try: | |
await client.ban_chat_member(message.chat.id, target_user.id) | |
await message.reply_text(f"Banned {target_user.mention}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to ban user. Error: {e}") | |
async def tban_command(client, message: Message, _): | |
target_user, time_val = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
if not time_val: | |
return await message.reply_text("Usage: /tban <user> <duration> (e.g., 1h, 3d).") | |
duration = parse_time(time_val) | |
if not duration: | |
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).") | |
try: | |
until_date = datetime.now() + duration | |
await client.ban_chat_member(message.chat.id, target_user.id, until_date=until_date) | |
await message.reply_text(f"Banned {target_user.mention} for {time_val}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to ban user. Error: {e}") | |
async def unban_command(client, message: Message, _): | |
target_user, _ = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
try: | |
await client.unban_chat_member(message.chat.id, target_user.id) | |
await message.reply_text(f"Unbanned {target_user.mention}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to unban user. Error: {e}") | |
async def mute_command(client, message: Message, _): | |
target_user, _ = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
try: | |
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions()) | |
await message.reply_text(f"Muted {target_user.mention}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to mute user. Error: {e}") | |
async def tmute_command(client, message: Message, _): | |
target_user, time_val = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
if not time_val: | |
return await message.reply_text("Usage: /tmute <user> <duration> (e.g., 1h, 3d).") | |
duration = parse_time(time_val) | |
if not duration: | |
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).") | |
try: | |
until_date = datetime.now() + duration | |
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions(), until_date=until_date) | |
await message.reply_text(f"Muted {target_user.mention} for {time_val}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to mute user. Error: {e}") | |
async def unmute_command(client, message: Message, _): | |
target_user, _ = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
try: | |
await client.unban_chat_member(message.chat.id, target_user.id) # Un-restricting is done with unban_chat_member | |
await message.reply_text(f"Unmuted {target_user.mention}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to unmute user. Error: {e}") | |
async def kick_command(client, message: Message, _): | |
target_user, _ = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
try: | |
await client.ban_chat_member(message.chat.id, target_user.id) | |
await client.unban_chat_member(message.chat.id, target_user.id) | |
await message.reply_text(f"Kicked {target_user.mention}.") | |
except Exception as e: | |
await message.reply_text(f"Failed to kick user. Error: {e}") | |
async def kickme_command(client, message: Message): | |
try: | |
await client.ban_chat_member(message.chat.id, message.from_user.id) | |
await client.unban_chat_member(message.chat.id, message.from_user.id) | |
await message.reply_text("As you wish.") | |
except Exception as e: | |
await message.reply_text(f"I can't kick you. Maybe I'm not an admin? Error: {e}") | |
# Delete variants | |
async def action_variants(client, message: Message, _): | |
command = message.command[0].lower() | |
if command.startswith('s') and not message.reply_to_message: | |
target_user, time_val = await extract_user_and_time(message, client) | |
else: | |
target_user, time_val = await extract_user_and_time(message, client) | |
if not target_user: | |
return | |
# Delete the command message for silent actions | |
if command.startswith('s'): | |
try: | |
await message.delete() | |
except Exception: | |
pass # Ignore if it fails | |
# Delete the replied-to message for 'd' actions | |
if command.startswith('d') and message.reply_to_message: | |
try: | |
await message.reply_to_message.delete() | |
except Exception: | |
pass # Ignore if it fails | |
action_text = "" | |
# Perform the action | |
try: | |
if 'ban' in command: | |
duration = parse_time(time_val) if time_val else None | |
until_date = datetime.now() + duration if duration else None | |
await client.ban_chat_member(message.chat.id, target_user.id, until_date=until_date) | |
action_text = f"Banned {target_user.mention}" | |
if time_val: | |
action_text += f" for {time_val}" | |
action_text += "." | |
elif 'mute' in command: | |
duration = parse_time(time_val) if time_val else None | |
until_date = datetime.now() + duration if duration else None | |
await client.restrict_chat_member(message.chat.id, target_user.id, ChatPermissions(), until_date=until_date) | |
action_text = f"Muted {target_user.mention}" | |
if time_val: | |
action_text += f" for {time_val}" | |
action_text += "." | |
elif 'kick' in command: | |
await client.ban_chat_member(message.chat.id, target_user.id) | |
await client.unban_chat_member(message.chat.id, target_user.id) | |
action_text = f"Kicked {target_user.mention}." | |
# Send confirmation only for 'd' commands, as 's' commands are silent | |
if command.startswith('d'): | |
await message.reply_text(action_text) | |
except Exception as e: | |
if not command.startswith('s'): # Don't send error for silent commands | |
await message.reply_text(f"Failed to perform action. Error: {e}") |