Spaces:
Paused
Paused
| from pyrogram import filters, Client, enums | |
| from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup | |
| from database.connections_mdb import add_connection, all_connections, if_active, delete_connection | |
| from info import ADMINS | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.ERROR) | |
| async def addconnection(client, message): | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| chat_type = message.chat.type | |
| if chat_type == enums.ChatType.PRIVATE: | |
| try: | |
| cmd, group_id = message.text.split(" ", 1) | |
| except: | |
| await message.reply_text( | |
| "<b>Enter in correct format!</b>\n\n" | |
| "<code>/connect groupid</code>\n\n" | |
| "<i>Get your Group id by adding this bot to your group and use <code>/id</code></i>", | |
| quote=True | |
| ) | |
| return | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| group_id = message.chat.id | |
| try: | |
| st = await client.get_chat_member(group_id, userid) | |
| if ( | |
| st.status != enums.ChatMemberStatus.ADMINISTRATOR | |
| and st.status != enums.ChatMemberStatus.OWNER | |
| and userid not in ADMINS | |
| ): | |
| await message.reply_text("You should be an admin in Given group!", quote=True) | |
| return | |
| except Exception as e: | |
| logger.exception(e) | |
| await message.reply_text( | |
| "Invalid Group ID!\n\nIf correct, Make sure I'm present in your group!!", | |
| quote=True, | |
| ) | |
| return | |
| try: | |
| st = await client.get_chat_member(group_id, "me") | |
| if st.status == enums.ChatMemberStatus.ADMINISTRATOR: | |
| ttl = await client.get_chat(group_id) | |
| title = ttl.title | |
| addcon = await add_connection(str(group_id), str(userid)) | |
| if addcon: | |
| await message.reply_text( | |
| f"Successfully connected to **{title}**\nNow manage your group from my pm !", | |
| quote=True, | |
| parse_mode=enums.ParseMode.MARKDOWN | |
| ) | |
| if chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| await client.send_message( | |
| userid, | |
| f"Connected to **{title}** !", | |
| parse_mode=enums.ParseMode.MARKDOWN | |
| ) | |
| else: | |
| await message.reply_text( | |
| "You're already connected to this chat!", | |
| quote=True | |
| ) | |
| else: | |
| await message.reply_text("Add me as an admin in group", quote=True) | |
| except Exception as e: | |
| logger.exception(e) | |
| await message.reply_text('Some error occurred! Try again later.', quote=True) | |
| return | |
| async def deleteconnection(client, message): | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| chat_type = message.chat.type | |
| if chat_type == enums.ChatType.PRIVATE: | |
| await message.reply_text("Run /connections to view or disconnect from groups!", quote=True) | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| group_id = message.chat.id | |
| st = await client.get_chat_member(group_id, userid) | |
| if ( | |
| st.status != enums.ChatMemberStatus.ADMINISTRATOR | |
| and st.status != enums.ChatMemberStatus.OWNER | |
| and str(userid) not in ADMINS | |
| ): | |
| return | |
| delcon = await delete_connection(str(userid), str(group_id)) | |
| if delcon: | |
| await message.reply_text("Successfully disconnected from this chat", quote=True) | |
| else: | |
| await message.reply_text("This chat isn't connected to me!\nDo /connect to connect.", quote=True) | |
| async def connections(client, message): | |
| userid = message.from_user.id | |
| groupids = await all_connections(str(userid)) | |
| if groupids is None: | |
| await message.reply_text( | |
| "There are no active connections!! Connect to some groups first.", | |
| quote=True | |
| ) | |
| return | |
| buttons = [] | |
| for groupid in groupids: | |
| try: | |
| ttl = await client.get_chat(int(groupid)) | |
| title = ttl.title | |
| active = await if_active(str(userid), str(groupid)) | |
| act = " - ACTIVE" if active else "" | |
| buttons.append( | |
| [ | |
| InlineKeyboardButton( | |
| text=f"{title}{act}", callback_data=f"groupcb:{groupid}:{act}" | |
| ) | |
| ] | |
| ) | |
| except: | |
| pass | |
| if buttons: | |
| await message.reply_text( | |
| "Your connected group details ;\n\n", | |
| reply_markup=InlineKeyboardMarkup(buttons), | |
| quote=True | |
| ) | |
| else: | |
| await message.reply_text( | |
| "There are no active connections!! Connect to some groups first.", | |
| quote=True | |
| ) | |