Spaces:
Paused
Paused
| import io | |
| from pyrogram import filters, Client, enums | |
| from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup | |
| from database.filters_mdb import( | |
| add_filter, | |
| get_filters, | |
| delete_filter, | |
| count_filters | |
| ) | |
| from database.connections_mdb import active_connection | |
| from utils import get_file_id, parser, split_quotes | |
| from info import ADMINS | |
| async def addfilter(client, message): | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| chat_type = message.chat.type | |
| args = message.text.html.split(None, 1) | |
| if chat_type == enums.ChatType.PRIVATE: | |
| grpid = await active_connection(str(userid)) | |
| if grpid is not None: | |
| grp_id = grpid | |
| try: | |
| chat = await client.get_chat(grpid) | |
| title = chat.title | |
| except: | |
| await message.reply_text("Make sure I'm present in your group!!", quote=True) | |
| return | |
| else: | |
| await message.reply_text("I'm not connected to any groups!", quote=True) | |
| return | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| grp_id = message.chat.id | |
| title = message.chat.title | |
| else: | |
| return | |
| st = await client.get_chat_member(grp_id, userid) | |
| if ( | |
| st.status != enums.ChatMemberStatus.ADMINISTRATOR | |
| and st.status != enums.ChatMemberStatus.OWNER | |
| and str(userid) not in ADMINS | |
| ): | |
| return | |
| if len(args) < 2: | |
| await message.reply_text("Command Incomplete :(", quote=True) | |
| return | |
| extracted = split_quotes(args[1]) | |
| text = extracted[0].lower() | |
| if not message.reply_to_message and len(extracted) < 2: | |
| await message.reply_text("Add some content to save your filter!", quote=True) | |
| return | |
| if (len(extracted) >= 2) and not message.reply_to_message: | |
| reply_text, btn, alert = parser(extracted[1], text) | |
| fileid = None | |
| if not reply_text: | |
| await message.reply_text("You cannot have buttons alone, give some text to go with it!", quote=True) | |
| return | |
| elif message.reply_to_message and message.reply_to_message.reply_markup: | |
| try: | |
| rm = message.reply_to_message.reply_markup | |
| btn = rm.inline_keyboard | |
| msg = get_file_id(message.reply_to_message) | |
| if msg: | |
| fileid = msg.file_id | |
| reply_text = message.reply_to_message.caption.html | |
| else: | |
| reply_text = message.reply_to_message.text.html | |
| fileid = None | |
| alert = None | |
| except: | |
| reply_text = "" | |
| btn = "[]" | |
| fileid = None | |
| alert = None | |
| elif message.reply_to_message and message.reply_to_message.media: | |
| try: | |
| msg = get_file_id(message.reply_to_message) | |
| fileid = msg.file_id if msg else None | |
| reply_text, btn, alert = parser(extracted[1], text) if message.reply_to_message.sticker else parser(message.reply_to_message.caption.html, text) | |
| except: | |
| reply_text = "" | |
| btn = "[]" | |
| alert = None | |
| elif message.reply_to_message and message.reply_to_message.text: | |
| try: | |
| fileid = None | |
| reply_text, btn, alert = parser(message.reply_to_message.text.html, text) | |
| except: | |
| reply_text = "" | |
| btn = "[]" | |
| alert = None | |
| else: | |
| return | |
| await add_filter(grp_id, text, reply_text, btn, fileid, alert) | |
| await message.reply_text( | |
| f"Filter for `{text}` added in **{title}**", | |
| quote=True, | |
| parse_mode=enums.ParseMode.MARKDOWN | |
| ) | |
| async def get_all(client, message): | |
| chat_type = message.chat.type | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| if chat_type == enums.ChatType.PRIVATE: | |
| userid = message.from_user.id | |
| grpid = await active_connection(str(userid)) | |
| if grpid is not None: | |
| grp_id = grpid | |
| try: | |
| chat = await client.get_chat(grpid) | |
| title = chat.title | |
| except: | |
| await message.reply_text("Make sure I'm present in your group!!", quote=True) | |
| return | |
| else: | |
| await message.reply_text("I'm not connected to any groups!", quote=True) | |
| return | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| grp_id = message.chat.id | |
| title = message.chat.title | |
| else: | |
| return | |
| st = await client.get_chat_member(grp_id, userid) | |
| if ( | |
| st.status != enums.ChatMemberStatus.ADMINISTRATOR | |
| and st.status != enums.ChatMemberStatus.OWNER | |
| and str(userid) not in ADMINS | |
| ): | |
| return | |
| texts = await get_filters(grp_id) | |
| count = await count_filters(grp_id) | |
| if count: | |
| filterlist = f"Total number of filters in **{title}** : {count}\n\n" | |
| for text in texts: | |
| keywords = " × `{}`\n".format(text) | |
| filterlist += keywords | |
| if len(filterlist) > 4096: | |
| with io.BytesIO(str.encode(filterlist.replace("`", ""))) as keyword_file: | |
| keyword_file.name = "keywords.txt" | |
| await message.reply_document( | |
| document=keyword_file, | |
| quote=True | |
| ) | |
| return | |
| else: | |
| filterlist = f"There are no active filters in **{title}**" | |
| await message.reply_text( | |
| text=filterlist, | |
| quote=True, | |
| parse_mode=enums.ParseMode.MARKDOWN | |
| ) | |
| async def deletefilter(client, message): | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| chat_type = message.chat.type | |
| if chat_type == enums.ChatType.PRIVATE: | |
| grpid = await active_connection(str(userid)) | |
| if grpid is not None: | |
| grp_id = grpid | |
| try: | |
| chat = await client.get_chat(grpid) | |
| title = chat.title | |
| except: | |
| await message.reply_text("Make sure I'm present in your group!!", quote=True) | |
| return | |
| else: | |
| await message.reply_text("I'm not connected to any groups!", quote=True) | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| grp_id = message.chat.id | |
| title = message.chat.title | |
| else: | |
| return | |
| st = await client.get_chat_member(grp_id, userid) | |
| if ( | |
| st.status != enums.ChatMemberStatus.ADMINISTRATOR | |
| and st.status != enums.ChatMemberStatus.OWNER | |
| and str(userid) not in ADMINS | |
| ): | |
| return | |
| try: | |
| cmd, text = message.text.split(" ", 1) | |
| except: | |
| await message.reply_text( | |
| "<i>Mention the filtername which you wanna delete!</i>\n\n" | |
| "<code>/del filtername</code>\n\n" | |
| "Use /viewfilters to view all available filters", | |
| quote=True | |
| ) | |
| return | |
| query = text.lower() | |
| await delete_filter(message, query, grp_id) | |
| async def delallconfirm(client, message): | |
| userid = message.from_user.id if message.from_user else None | |
| if not userid: | |
| return await message.reply(f"You are anonymous admin. Use /connect {message.chat.id} in PM") | |
| chat_type = message.chat.type | |
| if chat_type == enums.ChatType.PRIVATE: | |
| grpid = await active_connection(str(userid)) | |
| if grpid is not None: | |
| grp_id = grpid | |
| try: | |
| chat = await client.get_chat(grpid) | |
| title = chat.title | |
| except: | |
| await message.reply_text("Make sure I'm present in your group!!", quote=True) | |
| return | |
| else: | |
| await message.reply_text("I'm not connected to any groups!", quote=True) | |
| return | |
| elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: | |
| grp_id = message.chat.id | |
| title = message.chat.title | |
| else: | |
| return | |
| st = await client.get_chat_member(grp_id, userid) | |
| if (st.status == enums.ChatMemberStatus.OWNER) or (str(userid) in ADMINS): | |
| await message.reply_text( | |
| f"This will delete all filters from '{title}'.\nDo you want to continue??", | |
| reply_markup=InlineKeyboardMarkup([ | |
| [InlineKeyboardButton(text="YES",callback_data="delallconfirm")], | |
| [InlineKeyboardButton(text="CANCEL",callback_data="delallcancel")] | |
| ]), | |
| quote=True | |
| ) | |