taslim19
Update MongoDB database name in editwatch.py and apply recent changes
f0aea90
raw
history blame
3.27 kB
from pyrogram import filters
from pyrogram.types import Message, ChatPermissions
from pyrogram.enums import ChatMembersFilter, ChatMemberStatus
from pyrogram.errors import PeerIdInvalid
from DragMusic import app
from DragMusic.utils.decorators.admins import AdminActual
# Helper function to extract user info from a message
async def extract_user(message: Message, client):
if message.reply_to_message:
return message.reply_to_message.from_user
if len(message.command) > 1:
identifier = message.command[1]
try:
return await client.get_users(int(identifier))
except (ValueError, PeerIdInvalid):
try:
return await client.get_users(identifier)
except PeerIdInvalid:
await message.reply_text("Invalid user ID or username.")
return None
await message.reply_text("Reply to a user or provide a username/user ID.")
return None
@app.on_message(filters.command("promote") & filters.group)
@AdminActual
async def promote_user(client, message: Message, _):
target_user = await extract_user(message, client)
if not target_user:
return
try:
await client.promote_chat_member(message.chat.id, target_user.id)
await message.reply_text(f"Successfully promoted {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to promote user. Error: {e}")
@app.on_message(filters.command("demote") & filters.group)
@AdminActual
async def demote_user(client, message: Message, _):
target_user = await extract_user(message, client)
if not target_user:
return
try:
await client.promote_chat_member(
message.chat.id,
target_user.id,
privileges=ChatPermissions(can_manage_chat=False)
)
await message.reply_text(f"Successfully demoted {target_user.mention}.")
except Exception as e:
await message.reply_text(f"Failed to demote user. Error: {e}")
@app.on_message(filters.command("adminlist") & filters.group)
async def admin_list(client, message: Message):
admin_list_text = "Admins in this chat:\n\n"
admins = []
async for member in client.get_chat_members(message.chat.id, filter=ChatMembersFilter.ADMINISTRATORS):
admins.append(member)
# Sort to put creator at the top
admins.sort(key=lambda x: x.status == ChatMemberStatus.OWNER, reverse=True)
for admin in admins:
user = admin.user
status = "Owner" if admin.status == ChatMemberStatus.OWNER else "Admin"
if user.username:
admin_list_text += f"• @{user.username} ({status})\n"
else:
admin_list_text += f"• {user.mention} ({status})\n"
if len(admins) > 0:
await message.reply_text(admin_list_text)
else:
await message.reply_text("No admins found in this chat.")
@app.on_message(filters.command("admincache") & filters.group)
@AdminActual
async def admin_cache(client, message: Message, _):
from DragMusic.utils.decorators.admins import adminlist
if message.chat.id in adminlist:
del adminlist[message.chat.id]
await message.reply_text("Admin cache has been refreshed.")