Spaces:
Paused
Paused
| import asyncio | |
| from pyrogram import filters | |
| from pyrogram.errors import FloodWait, PeerIdInvalid | |
| from pyrogram.types import Message | |
| from DragMusic import app | |
| from DragMusic.misc import SUDOERS | |
| from DragMusic.utils import get_readable_time | |
| from DragMusic.utils.database import ( | |
| add_banned_user, | |
| get_banned_count, | |
| get_banned_users, | |
| get_served_chats, | |
| is_banned_user, | |
| remove_banned_user, | |
| is_on_off | |
| ) | |
| from DragMusic.utils.decorators.language import language | |
| from DragMusic.utils.extraction import extract_user | |
| from config import BANNED_USERS, LOGGER_ID | |
| async def global_ban(client, message: Message, _): | |
| if not message.reply_to_message: | |
| if len(message.command) != 2: | |
| return await message.reply_text(_["general_1"]) | |
| user = message.command[1] | |
| if user.isdigit(): | |
| user = int(user) | |
| else: | |
| user = user.replace("@", "") | |
| try: | |
| user = await app.get_users(user) | |
| except PeerIdInvalid: | |
| return await message.reply_text("Invalid user ID or username. Please check and try again.") | |
| else: | |
| user = await extract_user(message) | |
| if user.id == message.from_user.id: | |
| return await message.reply_text(_["gban_1"]) | |
| elif user.id == app.id: | |
| return await message.reply_text(_["gban_2"]) | |
| elif user.id in SUDOERS: | |
| return await message.reply_text(_["gban_3"]) | |
| is_gbanned = await is_banned_user(user.id) | |
| if is_gbanned: | |
| return await message.reply_text(_["gban_4"].format(user.mention)) | |
| if user.id not in BANNED_USERS: | |
| BANNED_USERS.add(user.id) | |
| served_chats = [] | |
| chats = await get_served_chats() | |
| for chat in chats: | |
| served_chats.append(int(chat["chat_id"])) | |
| time_expected = get_readable_time(len(served_chats)) | |
| mystic = await message.reply_text(_["gban_5"].format(user.mention, time_expected)) | |
| number_of_chats = 0 | |
| for chat_id in served_chats: | |
| try: | |
| await app.ban_chat_member(chat_id, user.id) | |
| number_of_chats += 1 | |
| except FloodWait as fw: | |
| await asyncio.sleep(int(fw.value)) | |
| except: | |
| continue | |
| await add_banned_user(user.id) | |
| if await is_on_off(2): | |
| await app.send_message( | |
| chat_id=LOGGER_ID, | |
| text=f"{message.from_user.mention} has globally banned {user.mention}.\n" | |
| f"User ID: <code>{user.id}</code>\n" | |
| f"Banned in {number_of_chats} chats.", | |
| message_thread_id=12357 | |
| ) | |
| await message.reply_text( | |
| _["gban_6"].format( | |
| app.mention, | |
| message.chat.title, | |
| message.chat.id, | |
| user.mention, | |
| user.id, | |
| message.from_user.mention, | |
| number_of_chats, | |
| ) | |
| ) | |
| await mystic.delete() | |
| async def global_un(client, message: Message, _): | |
| if not message.reply_to_message: | |
| if len(message.command) != 2: | |
| return await message.reply_text(_["general_1"]) | |
| user = message.command[1] | |
| if user.isdigit(): | |
| user = int(user) | |
| else: | |
| user = user.replace("@", "") | |
| try: | |
| user = await app.get_users(user) | |
| except PeerIdInvalid: | |
| return await message.reply_text("Invalid user ID or username. Please check and try again.") | |
| else: | |
| user = await extract_user(message) | |
| is_gbanned = await is_banned_user(user.id) | |
| if not is_gbanned: | |
| return await message.reply_text(_["gban_7"].format(user.mention)) | |
| if user.id in BANNED_USERS: | |
| BANNED_USERS.remove(user.id) | |
| served_chats = [] | |
| chats = await get_served_chats() | |
| for chat in chats: | |
| served_chats.append(int(chat["chat_id"])) | |
| time_expected = get_readable_time(len(served_chats)) | |
| mystic = await message.reply_text(_["gban_8"].format(user.mention, time_expected)) | |
| number_of_chats = 0 | |
| for chat_id in served_chats: | |
| try: | |
| await app.unban_chat_member(chat_id, user.id) | |
| number_of_chats += 1 | |
| except FloodWait as fw: | |
| await asyncio.sleep(int(fw.value)) | |
| except: | |
| continue | |
| await remove_banned_user(user.id) | |
| if await is_on_off(2): | |
| await app.send_message( | |
| chat_id=LOGGER_ID, | |
| text=f"{message.from_user.mention} has unbanned {user.mention}.\n" | |
| f"User ID: <code>{user.id}</code>\n" | |
| f"Unbanned from {number_of_chats} chats.", | |
| message_thread_id=12357 | |
| ) | |
| await message.reply_text(_["gban_9"].format(user.mention, number_of_chats)) | |
| await mystic.delete() | |
| async def gbanned_list(client, message: Message, _): | |
| counts = await get_banned_count() | |
| if counts == 0: | |
| return await message.reply_text(_["gban_10"]) | |
| mystic = await message.reply_text(_["gban_11"]) | |
| msg = _["gban_12"] | |
| count = 0 | |
| users = await get_banned_users() | |
| for user_id in users: | |
| count += 1 | |
| try: | |
| user = await app.get_users(user_id) | |
| user = user.first_name if not user.mention else user.mention | |
| msg += f"{count}➤ {user}\n" | |
| except Exception: | |
| msg += f"{count}➤ {user_id}\n" | |
| continue | |
| if count == 0: | |
| return await mystic.edit_text(_["gban_10"]) | |
| else: | |
| return await mystic.edit_text(msg) | |