Spaces:
Paused
Paused
import re | |
from collections import defaultdict | |
from datetime import datetime, timedelta | |
from pyrogram import filters | |
from pyrogram.types import Message, ChatPermissions | |
from DragMusic import app | |
from DragMusic.utils.database import get_flood_settings, update_flood_setting | |
from DragMusic.utils.decorators.admins import AdminActual | |
from DragMusic.utils.formatters import parse_time | |
# In-memory storage for flood tracking | |
user_flood_count = defaultdict(lambda: defaultdict(int)) | |
user_timed_messages = defaultdict(lambda: defaultdict(list)) | |
user_offenses = defaultdict(lambda: defaultdict(int)) | |
async def flood_control(client, message: Message): | |
if not message.from_user: | |
return | |
chat_id = message.chat.id | |
user_id = message.from_user.id | |
now = datetime.now() | |
settings = await get_flood_settings(chat_id) | |
# --- Timed Flood Check --- | |
t_limit = settings.get("t_limit", 0) | |
t_duration = settings.get("t_duration", 30) | |
if t_limit > 0: | |
user_timed_messages[chat_id][user_id].append(now) | |
# Filter out old messages | |
user_timed_messages[chat_id][user_id] = [ | |
t for t in user_timed_messages[chat_id][user_id] if now - t < timedelta(seconds=t_duration) | |
] | |
if len(user_timed_messages[chat_id][user_id]) >= t_limit: | |
user_offenses[chat_id][user_id] += 1 | |
return await take_action(client, message, settings, "timed flood") | |
# --- Consecutive Flood Check --- | |
limit = settings.get("limit", 0) | |
if limit > 0: | |
if user_id != list(user_flood_count[chat_id].keys())[0] if user_flood_count[chat_id] else None: | |
user_flood_count[chat_id].clear() | |
user_flood_count[chat_id][user_id] += 1 | |
if user_flood_count[chat_id][user_id] >= limit: | |
user_offenses[chat_id][user_id] += 1 | |
return await take_action(client, message, settings, "consecutive flood") | |
async def take_action(client, message: Message, settings: dict, reason: str): | |
chat_id = message.chat.id | |
user_id = message.from_user.id | |
action = settings.get("action", "ban") | |
duration_str = settings.get("action_duration", "1h") | |
# Clear flood messages if enabled | |
if settings.get("clear", False): | |
# This is a placeholder; deleting messages requires storing message_ids | |
pass | |
try: | |
if action == "ban": | |
await client.ban_chat_member(chat_id, user_id) | |
await message.reply_text(f"{message.from_user.mention} has been banned for {reason}.") | |
elif action == "kick": | |
await client.ban_chat_member(chat_id, user_id) | |
await client.unban_chat_member(chat_id, user_id) | |
await message.reply_text(f"{message.from_user.mention} has been kicked for {reason}.") | |
elif action == "mute": | |
await client.restrict_chat_member(chat_id, user_id, permissions=ChatPermissions()) | |
await message.reply_text(f"{message.from_user.mention} has been muted for {reason}.") | |
elif action in ["tban", "tmute"]: | |
until_date = datetime.now() + parse_time(duration_str) | |
if action == "tban": | |
await client.ban_chat_member(chat_id, user_id, until_date=until_date) | |
await message.reply_text(f"{message.from_user.mention} has been banned for {duration_str} due to {reason}.") | |
else: # tmute | |
await client.restrict_chat_member(chat_id, user_id, permissions=ChatPermissions(), until_date=until_date) | |
await message.reply_text(f"{message.from_user.mention} has been muted for {duration_str} due to {reason}.") | |
# Reset counters | |
user_flood_count[chat_id].clear() | |
user_timed_messages[chat_id].clear() | |
except Exception as e: | |
await message.reply_text(f"Failed to take action. Error: {e}") | |
# --- Admin Commands --- | |
async def get_flood_command(client, message: Message, _): | |
settings = await get_flood_settings(message.chat.id) | |
limit = settings.get('limit') | |
t_limit = settings.get('t_limit') | |
action = settings.get('action') | |
duration = settings.get('action_duration') | |
status_limit = f"{limit} consecutive messages" if limit > 0 else "Disabled" | |
status_t_limit = f"{t_limit} messages in {settings.get('t_duration')}s" if t_limit > 0 else "Disabled" | |
if action in ["tban", "tmute"]: | |
action_str = f"{action} for {duration}" | |
else: | |
action_str = action | |
await message.reply_text( | |
f"**Antiflood Settings:**\n" | |
f"- **Consecutive Flood:** {status_limit}\n" | |
f"- **Timed Flood:** {status_t_limit}\n" | |
f"- **Action:** {action_str}\n" | |
f"- **Clear messages:** {'On' if settings.get('clear') else 'Off'}" | |
) | |
async def set_flood_command(client, message: Message, _): | |
if len(message.command) < 2: | |
return await message.reply_text("Usage: /setflood <number/off>") | |
arg = message.command[1].lower() | |
if arg in ["off", "no", "0"]: | |
await update_flood_setting(message.chat.id, "limit", 0) | |
await message.reply_text("Consecutive antiflood has been disabled.") | |
elif arg.isdigit(): | |
limit = int(arg) | |
if limit < 3: | |
return await message.reply_text("Flood limit must be at least 3.") | |
await update_flood_setting(message.chat.id, "limit", limit) | |
await message.reply_text(f"Consecutive flood limit set to {limit} messages.") | |
else: | |
await message.reply_text("Invalid argument. Use a number or 'off'.") | |
async def set_flood_timer_command(client, message: Message, _): | |
args = message.command[1:] | |
if not args or args[0].lower() in ["off", "no", "0"]: | |
await update_flood_setting(message.chat.id, "t_limit", 0) | |
return await message.reply_text("Timed antiflood has been disabled.") | |
if len(args) != 2 or not args[0].isdigit() or not re.match(r"\d+s", args[1].lower()): | |
return await message.reply_text("Usage: /setfloodtimer <count> <duration>s (e.g., 10 30s)") | |
await update_flood_setting(message.chat.id, "t_limit", int(args[0])) | |
await update_flood_setting(message.chat.id, "t_duration", int(args[1][:-1])) | |
await message.reply_text(f"Timed flood set to {args[0]} messages in {args[1]}.") | |
async def set_flood_mode_command(client, message: Message, _): | |
args = message.command[1:] | |
if not args: | |
return await message.reply_text("Usage: /floodmode <ban|kick|mute|tban|tmute> [duration]") | |
action = args[0].lower() | |
if action not in ["ban", "kick", "mute", "tban", "tmute"]: | |
return await message.reply_text("Invalid action. Use: ban, kick, mute, tban, tmute.") | |
await update_flood_setting(message.chat.id, "action", action) | |
if action in ["tban", "tmute"]: | |
if len(args) < 2: | |
return await message.reply_text(f"Usage: /floodmode {action} <duration> (e.g., 1h, 3d)") | |
duration_str = args[1] | |
if not parse_time(duration_str): | |
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).") | |
await update_flood_setting(message.chat.id, "action_duration", duration_str) | |
await message.reply_text(f"Antiflood action set to {action} for {duration_str}.") | |
else: | |
await message.reply_text(f"Antiflood action set to {action}.") | |
async def set_clear_flood_command(client, message: Message, _): | |
if len(message.command) < 2 or message.command[1].lower() not in ["on", "off", "yes", "no"]: | |
return await message.reply_text("Usage: /clearflood <on/off>") | |
status = message.command[1].lower() in ["on", "yes"] | |
await update_flood_setting(message.chat.id, "clear", status) | |
await message.reply_text(f"Deleting flood messages is now {'enabled' if status else 'disabled'}.") |