taslim19
feat: Add advanced antiflood plugin and fix admin commands
98e44f0
raw
history blame
8.23 kB
import re
from collections import defaultdict
from datetime import datetime, timedelta
from pyrogram import filters
from pyrogram.types import Message, ChatPermissions
from DragMusic import app
from DragMusic.utils.database import get_flood_settings, update_flood_setting
from DragMusic.utils.decorators.admins import AdminActual
from DragMusic.utils.formatters import parse_time
# In-memory storage for flood tracking
user_flood_count = defaultdict(lambda: defaultdict(int))
user_timed_messages = defaultdict(lambda: defaultdict(list))
user_offenses = defaultdict(lambda: defaultdict(int))
@app.on_message(filters.group & ~filters.service, group=11)
async def flood_control(client, message: Message):
if not message.from_user:
return
chat_id = message.chat.id
user_id = message.from_user.id
now = datetime.now()
settings = await get_flood_settings(chat_id)
# --- Timed Flood Check ---
t_limit = settings.get("t_limit", 0)
t_duration = settings.get("t_duration", 30)
if t_limit > 0:
user_timed_messages[chat_id][user_id].append(now)
# Filter out old messages
user_timed_messages[chat_id][user_id] = [
t for t in user_timed_messages[chat_id][user_id] if now - t < timedelta(seconds=t_duration)
]
if len(user_timed_messages[chat_id][user_id]) >= t_limit:
user_offenses[chat_id][user_id] += 1
return await take_action(client, message, settings, "timed flood")
# --- Consecutive Flood Check ---
limit = settings.get("limit", 0)
if limit > 0:
if user_id != list(user_flood_count[chat_id].keys())[0] if user_flood_count[chat_id] else None:
user_flood_count[chat_id].clear()
user_flood_count[chat_id][user_id] += 1
if user_flood_count[chat_id][user_id] >= limit:
user_offenses[chat_id][user_id] += 1
return await take_action(client, message, settings, "consecutive flood")
async def take_action(client, message: Message, settings: dict, reason: str):
chat_id = message.chat.id
user_id = message.from_user.id
action = settings.get("action", "ban")
duration_str = settings.get("action_duration", "1h")
# Clear flood messages if enabled
if settings.get("clear", False):
# This is a placeholder; deleting messages requires storing message_ids
pass
try:
if action == "ban":
await client.ban_chat_member(chat_id, user_id)
await message.reply_text(f"{message.from_user.mention} has been banned for {reason}.")
elif action == "kick":
await client.ban_chat_member(chat_id, user_id)
await client.unban_chat_member(chat_id, user_id)
await message.reply_text(f"{message.from_user.mention} has been kicked for {reason}.")
elif action == "mute":
await client.restrict_chat_member(chat_id, user_id, permissions=ChatPermissions())
await message.reply_text(f"{message.from_user.mention} has been muted for {reason}.")
elif action in ["tban", "tmute"]:
until_date = datetime.now() + parse_time(duration_str)
if action == "tban":
await client.ban_chat_member(chat_id, user_id, until_date=until_date)
await message.reply_text(f"{message.from_user.mention} has been banned for {duration_str} due to {reason}.")
else: # tmute
await client.restrict_chat_member(chat_id, user_id, permissions=ChatPermissions(), until_date=until_date)
await message.reply_text(f"{message.from_user.mention} has been muted for {duration_str} due to {reason}.")
# Reset counters
user_flood_count[chat_id].clear()
user_timed_messages[chat_id].clear()
except Exception as e:
await message.reply_text(f"Failed to take action. Error: {e}")
# --- Admin Commands ---
@app.on_message(filters.command("flood") & filters.group)
@AdminActual
async def get_flood_command(client, message: Message, _):
settings = await get_flood_settings(message.chat.id)
limit = settings.get('limit')
t_limit = settings.get('t_limit')
action = settings.get('action')
duration = settings.get('action_duration')
status_limit = f"{limit} consecutive messages" if limit > 0 else "Disabled"
status_t_limit = f"{t_limit} messages in {settings.get('t_duration')}s" if t_limit > 0 else "Disabled"
if action in ["tban", "tmute"]:
action_str = f"{action} for {duration}"
else:
action_str = action
await message.reply_text(
f"**Antiflood Settings:**\n"
f"- **Consecutive Flood:** {status_limit}\n"
f"- **Timed Flood:** {status_t_limit}\n"
f"- **Action:** {action_str}\n"
f"- **Clear messages:** {'On' if settings.get('clear') else 'Off'}"
)
@app.on_message(filters.command("setflood") & filters.group)
@AdminActual
async def set_flood_command(client, message: Message, _):
if len(message.command) < 2:
return await message.reply_text("Usage: /setflood <number/off>")
arg = message.command[1].lower()
if arg in ["off", "no", "0"]:
await update_flood_setting(message.chat.id, "limit", 0)
await message.reply_text("Consecutive antiflood has been disabled.")
elif arg.isdigit():
limit = int(arg)
if limit < 3:
return await message.reply_text("Flood limit must be at least 3.")
await update_flood_setting(message.chat.id, "limit", limit)
await message.reply_text(f"Consecutive flood limit set to {limit} messages.")
else:
await message.reply_text("Invalid argument. Use a number or 'off'.")
@app.on_message(filters.command("setfloodtimer") & filters.group)
@AdminActual
async def set_flood_timer_command(client, message: Message, _):
args = message.command[1:]
if not args or args[0].lower() in ["off", "no", "0"]:
await update_flood_setting(message.chat.id, "t_limit", 0)
return await message.reply_text("Timed antiflood has been disabled.")
if len(args) != 2 or not args[0].isdigit() or not re.match(r"\d+s", args[1].lower()):
return await message.reply_text("Usage: /setfloodtimer <count> <duration>s (e.g., 10 30s)")
await update_flood_setting(message.chat.id, "t_limit", int(args[0]))
await update_flood_setting(message.chat.id, "t_duration", int(args[1][:-1]))
await message.reply_text(f"Timed flood set to {args[0]} messages in {args[1]}.")
@app.on_message(filters.command("floodmode") & filters.group)
@AdminActual
async def set_flood_mode_command(client, message: Message, _):
args = message.command[1:]
if not args:
return await message.reply_text("Usage: /floodmode <ban|kick|mute|tban|tmute> [duration]")
action = args[0].lower()
if action not in ["ban", "kick", "mute", "tban", "tmute"]:
return await message.reply_text("Invalid action. Use: ban, kick, mute, tban, tmute.")
await update_flood_setting(message.chat.id, "action", action)
if action in ["tban", "tmute"]:
if len(args) < 2:
return await message.reply_text(f"Usage: /floodmode {action} <duration> (e.g., 1h, 3d)")
duration_str = args[1]
if not parse_time(duration_str):
return await message.reply_text("Invalid time format. Use d, h, m, s (e.g., 3d, 10m).")
await update_flood_setting(message.chat.id, "action_duration", duration_str)
await message.reply_text(f"Antiflood action set to {action} for {duration_str}.")
else:
await message.reply_text(f"Antiflood action set to {action}.")
@app.on_message(filters.command("clearflood") & filters.group)
@AdminActual
async def set_clear_flood_command(client, message: Message, _):
if len(message.command) < 2 or message.command[1].lower() not in ["on", "off", "yes", "no"]:
return await message.reply_text("Usage: /clearflood <on/off>")
status = message.command[1].lower() in ["on", "yes"]
await update_flood_setting(message.chat.id, "clear", status)
await message.reply_text(f"Deleting flood messages is now {'enabled' if status else 'disabled'}.")