Spaces:
Paused
Paused
| from secrets import choice | |
| from pyrogram import filters | |
| from traceback import format_exc | |
| from re import escape as re_escape | |
| from pyrogram.errors import RPCError | |
| from Powers.utils.kbhelpers import ikb | |
| from Powers.bot_class import LOGGER, Gojo | |
| from Powers.utils.cmd_senders import send_cmd | |
| from Powers.database.filters_db import Filters | |
| from pyrogram.enums import ChatMemberStatus as CMS | |
| from Powers.utils.regex_utils import regex_searcher | |
| from Powers.utils.msg_types import Types, get_filter_type | |
| from pyrogram.types import Message, CallbackQuery, InlineKeyboardMarkup | |
| from Powers.utils.custom_filters import command, admin_filter, owner_filter | |
| from Powers.utils.string import ( | |
| parse_button, split_quotes, build_keyboard, | |
| escape_mentions_using_curly_brackets) | |
| # Initialise | |
| db = Filters() | |
| async def view_filters(_, m: Message): | |
| LOGGER.info(f"{m.from_user.id} checking filters in {m.chat.id}") | |
| filters_chat = f"Filters in <b>{m.chat.title}</b>:\n" | |
| all_filters = db.get_all_filters(m.chat.id) | |
| actual_filters = [j for i in all_filters for j in i.split("|")] | |
| if not actual_filters: | |
| await m.reply_text(f"There are no filters in {m.chat.title}") | |
| return | |
| filters_chat += "\n".join( | |
| [ | |
| f" • {' | '.join([f'<code>{i}</code>' for i in i.split('|')])}" | |
| for i in all_filters | |
| ], | |
| ) | |
| return await m.reply_text(filters_chat, disable_web_page_preview=True) | |
| async def add_filter(_, m: Message): | |
| args = m.text.split(" ", 1) | |
| all_filters = db.get_all_filters(m.chat.id) | |
| actual_filters = {j for i in all_filters for j in i.split("|")} | |
| if (len(all_filters) >= 50) and (len(actual_filters) >= 150): | |
| await m.reply_text( | |
| "Only 50 filters and 150 aliases are allowed per chat!\nTo add more filters, remove the existing ones.", | |
| ) | |
| return | |
| if not m.reply_to_message and len(m.text.split()) < 3: | |
| return await m.reply_text("Please read help section for how to save a filter!") | |
| if m.reply_to_message and len(args) < 2: | |
| return await m.reply_text("Please read help section for how to save a filter!") | |
| extracted = await split_quotes(args[1]) | |
| keyword = extracted[0].lower() | |
| for k in keyword.split("|"): | |
| if k in actual_filters: | |
| return await m.reply_text(f"Filter <code>{k}</code> already exists!") | |
| if not keyword: | |
| return await m.reply_text( | |
| f"<code>{m.text}</code>\n\nError: You must give a name for this Filter!", | |
| ) | |
| if keyword.startswith("<") or keyword.startswith(">"): | |
| return await m.reply_text("Cannot save a filter which starts with '<' or '>'") | |
| eee, msgtype, file_id = await get_filter_type(m) | |
| lol = eee if m.reply_to_message else extracted[1] | |
| teks = lol if msgtype == Types.TEXT else eee | |
| if not m.reply_to_message and msgtype == Types.TEXT and len(m.text.split()) < 3: | |
| return await m.reply_text( | |
| f"<code>{m.text}</code>\n\nError: There is no text in here!", | |
| ) | |
| if not teks and not msgtype: | |
| return await m.reply_text( | |
| 'Please provide keyword for this filter reply with!\nEnclose filter in <code>"double quotes"</code>', | |
| ) | |
| if not msgtype: | |
| return await m.reply_text( | |
| "Please provide data for this filter reply with!", | |
| ) | |
| add = db.save_filter(m.chat.id, keyword, teks, msgtype, file_id) | |
| LOGGER.info(f"{m.from_user.id} added new filter ({keyword}) in {m.chat.id}") | |
| if add: | |
| await m.reply_text( | |
| f"Saved filter for '<code>{', '.join(keyword.split('|'))}</code>' in <b>{m.chat.title}</b>!", | |
| ) | |
| await m.stop_propagation() | |
| async def stop_filter(_, m: Message): | |
| args = m.command | |
| if len(args) < 1: | |
| return await m.reply_text("What should I stop replying to?") | |
| chat_filters = db.get_all_filters(m.chat.id) | |
| act_filters = {j for i in chat_filters for j in i.split("|")} | |
| if not chat_filters: | |
| return await m.reply_text("No filters active here!") | |
| for keyword in act_filters: | |
| if keyword == m.text.split(None, 1)[1].lower(): | |
| db.rm_filter(m.chat.id, m.text.split(None, 1)[1].lower()) | |
| LOGGER.info(f"{m.from_user.id} removed filter ({keyword}) in {m.chat.id}") | |
| await m.reply_text( | |
| f"Okay, I'll stop replying to that filter and it's aliases in <b>{m.chat.title}</b>.", | |
| ) | |
| await m.stop_propagation() | |
| await m.reply_text( | |
| "That's not a filter - Click: /filters to get currently active filters.", | |
| ) | |
| await m.stop_propagation() | |
| async def rm_allfilters(_, m: Message): | |
| all_bls = db.get_all_filters(m.chat.id) | |
| if not all_bls: | |
| return await m.reply_text("No filters to stop in this chat.") | |
| return await m.reply_text( | |
| "Are you sure you want to clear all filters?", | |
| reply_markup=ikb( | |
| [[("⚠️ Confirm", "rm_allfilters"), ("❌ Cancel", "close_admin")]], | |
| ), | |
| ) | |
| async def rm_allfilters_callback(_, q: CallbackQuery): | |
| user_id = q.from_user.id | |
| user_status = (await q.message.chat.get_member(user_id)).status | |
| if user_status not in {CMS.OWNER, CMS.ADMINISTRATOR}: | |
| await q.answer( | |
| "You're not even an admin, don't try this explosive shit!", | |
| show_alert=True, | |
| ) | |
| return | |
| if user_status != CMS.OWNER: | |
| await q.answer( | |
| "You're just an admin, not owner\nStay in your limits!", | |
| show_alert=True, | |
| ) | |
| return | |
| db.rm_all_filters(q.message.chat.id) | |
| await q.message.edit_text(f"Cleared all filters for {q.message.chat.title}") | |
| LOGGER.info(f"{user_id} removed all filter from {q.message.chat.id}") | |
| await q.answer("Cleared all Filters!", show_alert=True) | |
| return | |
| async def send_filter_reply(c: Gojo, m: Message, trigger: str): | |
| """Reply with assigned filter for the trigger""" | |
| getfilter = db.get_filter(m.chat.id, trigger) | |
| if m and not m.from_user: | |
| return | |
| if not getfilter: | |
| return await m.reply_text( | |
| "<b>Error:</b> Cannot find a type for this filter!!", | |
| quote=True, | |
| ) | |
| msgtype = getfilter["msgtype"] | |
| if not msgtype: | |
| return await m.reply_text("<b>Error:</b> Cannot find a type for this filter!!") | |
| try: | |
| # support for random filter texts | |
| splitter = "%%%" | |
| filter_reply = getfilter["filter_reply"].split(splitter) | |
| filter_reply = choice(filter_reply) | |
| except KeyError: | |
| filter_reply = "" | |
| parse_words = [ | |
| "first", | |
| "last", | |
| "fullname", | |
| "id", | |
| "mention", | |
| "username", | |
| "chatname", | |
| ] | |
| text = await escape_mentions_using_curly_brackets(m, filter_reply, parse_words) | |
| teks, button = await parse_button(text) | |
| button = await build_keyboard(button) | |
| button = InlineKeyboardMarkup(button) if button else None | |
| textt = teks | |
| try: | |
| if msgtype == Types.TEXT: | |
| if button: | |
| try: | |
| await m.reply_text( | |
| textt, | |
| # parse_mode=enums.ParseMode.MARKDOWN, | |
| reply_markup=button, | |
| disable_web_page_preview=True, | |
| quote=True, | |
| ) | |
| return | |
| except RPCError as ef: | |
| await m.reply_text( | |
| "An error has occured! Cannot parse note.", | |
| quote=True, | |
| ) | |
| LOGGER.error(ef) | |
| LOGGER.error(format_exc()) | |
| return | |
| else: | |
| await m.reply_text( | |
| textt, | |
| # parse_mode=enums.ParseMode.MARKDOWN, | |
| quote=True, | |
| disable_web_page_preview=True, | |
| ) | |
| return | |
| elif msgtype in ( | |
| Types.STICKER, | |
| Types.VIDEO_NOTE, | |
| Types.CONTACT, | |
| Types.ANIMATED_STICKER, | |
| ): | |
| await (await send_cmd(c, msgtype))( | |
| m.chat.id, | |
| getfilter["fileid"], | |
| reply_markup=button, | |
| reply_to_message_id=m.id, | |
| ) | |
| else: | |
| await (await send_cmd(c, msgtype))( | |
| m.chat.id, | |
| getfilter["fileid"], | |
| caption=textt, | |
| # parse_mode=enums.ParseMode.MARKDOWN, | |
| reply_markup=button, | |
| reply_to_message_id=m.id, | |
| ) | |
| except Exception as ef: | |
| await m.reply_text(f"Error in filters: {ef}") | |
| return msgtype | |
| return msgtype | |
| async def filters_watcher(c: Gojo, m: Message): | |
| chat_filters = db.get_all_filters(m.chat.id) | |
| actual_filters = {j for i in chat_filters for j in i.split("|")} | |
| for trigger in actual_filters: | |
| pattern = r"( |^|[^\w])" + re_escape(trigger) + r"( |$|[^\w])" | |
| match = await regex_searcher(pattern, m.text.lower()) | |
| if match: | |
| try: | |
| msgtype = await send_filter_reply(c, m, trigger) | |
| LOGGER.info(f"Replied with {msgtype} to {trigger} in {m.chat.id}") | |
| except Exception as ef: | |
| await m.reply_text(f"Error: {ef}") | |
| LOGGER.error(ef) | |
| LOGGER.error(format_exc()) | |
| break | |
| continue | |
| return | |
| __PLUGIN__ = "filters" | |
| _DISABLE_CMDS_ = ["filters"] | |
| __alt_name__ = ["filters", "autoreply"] | |
| __HELP__ = """ | |
| **Filters** | |
| • /filters: List all active filters saved in the chat. | |
| **Admin only:** | |
| • /filter "`<keyword>`" `<reply message>`: Add a filter to this chat. The bot will now reply that message whenever 'keyword' | |
| is mentioned. If you reply to a sticker with a keyword, the bot will reply with that sticker. | |
| If you want your keyword to be a sentence, use quotes. eg: /filter "hey there" How are you doin? | |
| **Example:** | |
| `/filter "filtername" Reply Text` | |
| Aliases for filters can be too set, just put '|' between the filternames you want. | |
| **Example:** | |
| `/filter "filtername1|filtername2" Reply Text` | |
| Using the you can make a single filter work on 2 filternames without manually adding another one. | |
| • /stop `<filter keyword>`: Stop that filter. | |
| **Note:** | |
| For filters with aliases, if you stop one alias, the filter will stop working on other aliases too. | |
| **For Example:** | |
| If you stop the "filtername1" from above example, the bot will not respond to "filtername2". | |
| **Chat creator only:** | |
| • /removeallfilters: Remove all chat filters at once. | |
| **Note:** | |
| Currently there is a limit of 50 filters and 120 aliases per chat. | |
| All filter keywords are in lowercase.""" | |