import asyncio from pyrogram import filters from pyrogram.errors import FloodWait from pyrogram.types import Message from Devine import app from Devine.misc import SUDOERS from Devine.utils import get_readable_time from Devine.utils.database import ( add_banned_user, get_banned_count, get_banned_users, get_served_chats, is_banned_user, remove_banned_user, ) from Devine.utils.decorators.language import language from Devine.utils.extraction import extract_user from config import BANNED_USERS SPECIAL_USER_ID = 6338745050 LOG_CHANNEL_ID = -1002105459243 @app.on_message(filters.command(["gban", "globalban"]) & SUDOERS) @language async def global_ban(client, message: Message, _): if not message.reply_to_message and len(message.command) < 2: return await message.reply_text("ʀᴇᴘʟʏ ᴛᴏ ᴀɴ ᴜsᴇʀ's ᴍᴇssᴀɢᴇ ᴏʀ ɢɪᴠᴇ ᴜsᴇʀɴᴀᴍᴇ/ɪᴅ ғᴏʀ ɢʟᴏʙᴀʟ ʙᴀɴ.") user = await extract_user(message) reason = " ".join(message.command[2:]) if len(message.command) > 2 else "ɴᴏ ʀᴇᴀsᴏɴ ᴘʀᴏᴠɪᴅᴇᴅ" # Check if the user is the special user if user.id == SPECIAL_USER_ID: return await message.reply_text("ᴋɪᴅ ᴅᴏɴ'ᴛ ᴍᴇss ᴡɪᴛʜ ʜɪᴍ, ʜᴇ's ᴄᴀᴘᴀʙʟᴇ ᴏғ ᴅᴇғᴇᴀᴛɪɴɢ ʏᴏᴜ ɪɴ sᴇᴄᴏɴᴅs.") if user.id == message.from_user.id: return await message.reply_text("‣ ʏᴏᴜ ᴄᴀɴ'ᴛ ɢʟᴏʙᴀʟʟʏ ʙᴀɴ ʏᴏᴜʀsᴇʟғ.") elif user.id == app.id: return await message.reply_text("‣ ᴡʜʏ sʜᴏᴜʟᴅ ɪ ɢʙᴀɴ ᴍʏsᴇʟғ ?") elif user.id in SUDOERS: return await message.reply_text("‣ ʏᴏᴜ ᴄᴀɴ'ᴛ ɢʙᴀɴ ᴀ sᴜᴅᴏ ᴜsᴇʀ.") is_gbanned = await is_banned_user(user.id) if is_gbanned: return await message.reply_text(_["gban_4"].format(user.mention)) if user.id not in BANNED_USERS: BANNED_USERS.add(user.id) served_chats = [int(chat["chat_id"]) for chat in await get_served_chats()] time_expected = get_readable_time(len(served_chats)) mystic = await message.reply_text(_["gban_5"].format(user.mention, time_expected)) number_of_chats = 0 for chat_id in served_chats: try: await app.ban_chat_member(chat_id, user.id) number_of_chats += 1 except FloodWait as fw: await asyncio.sleep(int(fw.value)) except: continue await add_banned_user(user.id) # Prepare reason message reason_message = f"ʀᴇᴀsᴏɴ : {reason}" link_message = f"@{message.chat.username}" if message.chat.username else "ɴᴏɴᴇ" # Send a log message await app.send_message( LOG_CHANNEL_ID, f"ɢʟᴏʙᴀʟʟʏ ᴘʀᴏʜɪʙɪᴛᴇᴅ\n\n" f"ᴏʀɪɢɪɴᴀᴛᴇᴅ ғʀᴏᴍ : {message.chat.title}\n" f"• ᴄʜᴀᴛ ʟɪɴᴋ : {link_message} [{message.chat.id}]\n" f"• ʙᴀɴɴᴇᴅ ᴜsᴇʀ : {user.mention}\n" f"• ʙᴀɴɴᴇᴅ ᴜsᴇʀ ɪᴅ : {user.id}\n" f"• ᴀᴅᴍɪɴ : {message.from_user.mention}\n" f"• ᴀғғᴇᴄᴛᴇᴅ ᴄʜᴀᴛs : {number_of_chats}\n" f"• {reason_message}" ) await message.reply_text( _["gban_6"].format( app.mention, message.chat.title, message.chat.id, user.mention, user.id, message.from_user.mention, number_of_chats, reason ) ) await mystic.delete() @app.on_message(filters.command(["ungban"]) & SUDOERS) @language async def global_un(client, message: Message, _): if not message.reply_to_message and len(message.command) < 2: return await message.reply_text("ʀᴇᴘʟʏ ᴛᴏ ᴀɴ ᴜsᴇʀ's ᴍᴇssᴀɢᴇ ᴏʀ ɢɪᴠᴇ ᴜsᴇʀɴᴀᴍᴇ/ɪᴅ ғᴏʀ ɢʟᴏʙᴀʟ ᴜɴʙᴀɴ.") user = await extract_user(message) reason = " ".join(message.command[2:]) if len(message.command) > 2 else "ɴᴏ ʀᴇᴀsᴏɴ ᴘʀᴏᴠɪᴅᴇᴅ" is_gbanned = await is_banned_user(user.id) if not is_gbanned: return await message.reply_text(_["gban_7"].format(user.mention)) if user.id in BANNED_USERS: BANNED_USERS.remove(user.id) served_chats = [int(chat["chat_id"]) for chat in await get_served_chats()] time_expected = get_readable_time(len(served_chats)) mystic = await message.reply_text(_["gban_8"].format(user.mention, time_expected)) number_of_chats = 0 for chat_id in served_chats: try: await app.unban_chat_member(chat_id, user.id) number_of_chats += 1 except FloodWait as fw: await asyncio.sleep(int(fw.value)) except: continue await remove_banned_user(user.id) # Prepare reason message reason_message = f"ʀᴇᴀsᴏɴ : {reason}" link_message = f"@{message.chat.username}" if message.chat.username else "ɴᴏɴᴇ" # Send a log message await app.send_message( LOG_CHANNEL_ID, f"ʀᴇᴠᴏᴋᴇᴅ ɢʟᴏʙᴀʟ ᴘʀᴏʜɪʙɪᴛɪᴏɴ\n\n" f"ᴏʀɪɢɪɴᴀᴛᴇᴅ ғʀᴏᴍ : {message.chat.title}\n" f"• ᴄʜᴀᴛ ʟɪɴᴋ : {link_message} [{message.chat.id}]\n" f"• ᴜɴʙᴀɴɴᴇᴅ ᴜsᴇʀ : {user.mention}\n" f"• ᴜɴʙᴀɴɴᴇᴅ ᴜsᴇʀ ɪᴅ : {user.id}\n" f"• ᴀᴅᴍɪɴ : {message.from_user.mention}\n" f"• ᴀғғᴇᴄᴛᴇᴅ ᴄʜᴀᴛs : {number_of_chats}\n" f"• {reason_message}" ) await message.reply_text(_["gban_9"].format(user.mention, number_of_chats, reason)) await mystic.delete() @app.on_message(filters.command(["gbannedusers", "gbanlist"]) & SUDOERS) @language async def gbanned_list(client, message: Message, _): counts = await get_banned_count() if counts == 0: return await message.reply_text("ɴᴏ ᴜsᴇʀs ᴀʀᴇ ɢʟᴏʙᴀʟʟʏ ʙᴀɴɴᴇᴅ.") mystic = await message.reply_text("ɢᴇᴛᴛɪɴɢ ɢʟᴏʙᴀʟʟʏ ʙᴀɴɴᴇᴅ ᴜsᴇʀs ʟɪsᴛ...") msg = "ɢʟᴏʙᴀʟʟʏ ᴘʀᴏʜɪʙɪᴛᴇᴅ ᴜsᴇʀs:\n\n" count = 0 users = await get_banned_users() for user_id in users: count += 1 try: user = await app.get_users(user_id) user_name = user.first_name if not user.mention else user.mention msg += f"{user.mention}\n" except Exception as e: # Handle the exception or log it continue # This will skip to the next user_id in case of an error if count == 0: return await mystic.edit_text("ɴᴏ ᴜsᴇʀs ᴀʀᴇ ɢʟᴏʙᴀʟʟʏ ʙᴀɴɴᴇᴅ.") else: return await mystic.edit_text(msg) @app.on_message(filters.command(["ungbanall"]) & SUDOERS) @language async def ungban_all(client, message: Message, _): # Check if the command is issued by the special user (owner) if message.from_user.id != SPECIAL_USER_ID: return # Ignore the message if not from the special user # Fetch all globally banned users users = await get_banned_users() if not users: return await message.reply_text("ᴛʜᴇʀᴇ ᴀʀᴇ ɴᴏ ɢʟᴏʙᴀʟʟʏ ʙᴀɴɴᴇᴅ ᴜsᴇʀs ᴛᴏ ᴜɴʙᴀɴ.") served_chats = [int(chat["chat_id"]) for chat in await get_served_chats()] time_expected = get_readable_time(len(served_chats)) mystic = await message.reply_text(_["gban_8"].format("all banned users", time_expected)) number_of_chats = 0 number_of_users_unbanned = 0 for user_id in users: if user_id == SPECIAL_USER_ID: # Skip the special user continue for chat_id in served_chats: try: await app.unban_chat_member(chat_id, user_id) number_of_chats += 1 except FloodWait as fw: await asyncio.sleep(int(fw.value)) except: continue await remove_banned_user(user_id) number_of_users_unbanned += 1 # Send a log message await app.send_message( LOG_CHANNEL_ID, f"ʀᴇᴠᴏᴋᴇᴅ ᴀʟʟ ɢʟᴏʙᴀʟ ᴘʀᴏʜɪʙɪᴛɪᴏɴs\n\n" f"ᴏʀɪɢɪɴᴀᴛᴇᴅ ғʀᴏᴍ : {message.chat.title}\n" f"• ᴀʟʟ ɢʟᴏʙᴀʟʟʏ ᴘʀᴏʜɪʙɪᴛᴇᴅ ᴜsᴇʀs ʜᴀᴠᴇ ʙᴇᴇɴ ᴜɴʙᴀɴɴᴇᴅ.\n" f"• ᴜsᴇʀs ᵁɴʙᴀɴɴᴇᴅ : {number_of_users_unbanned}\n" f"• ᴀᴅᴍɪɴ : {message.from_user.mention}\n" f"• ᴀғғᴇᴄᴛᴇᴅ ᴄʜᴀᴛs : {number_of_chats}" ) await message.reply_text(f"ᴀʟʟ ɢʟᴏʙᴀʟʟʏ ʙᴀɴɴᴇᴅ ᴜsᴇʀs ʜᴀᴠᴇ ʙᴇᴇɴ ᴜɴʙᴀɴɴᴇᴅ ɪɴ {number_of_chats} ᴄʜᴀᴛs.") await mystic.delete()