Spaces:
Paused
Paused
| from html import escape | |
| from secrets import choice | |
| from traceback import format_exc | |
| from typing import List | |
| from pyrogram import emoji, enums, filters | |
| from pyrogram.errors import ChannelPrivate, ChatAdminRequired, RPCError | |
| from pyrogram.types import Message, User | |
| from Powers import LOGGER | |
| from Powers.bot_class import Gojo | |
| from Powers.database.antispam_db import GBan | |
| from Powers.database.greetings_db import Greetings | |
| from Powers.supports import get_support_staff | |
| from Powers.utils.cmd_senders import send_cmd | |
| from Powers.utils.custom_filters import (admin_filter, bot_admin_filter, | |
| captcha_filter, command) | |
| from Powers.utils.kbhelpers import ikb | |
| from Powers.utils.msg_types import Types, get_wlcm_type | |
| from Powers.utils.parser import escape_markdown, mention_html | |
| from Powers.utils.string import (build_keyboard, escape_invalid_curly_brackets, | |
| parse_button) | |
| # Initialize | |
| gdb = GBan() | |
| ChatType = enums.ChatType | |
| async def escape_mentions_using_curly_brackets_wl( | |
| user: User, | |
| m: Message, | |
| text: str, | |
| parse_words: list, | |
| ) -> str: | |
| teks = await escape_invalid_curly_brackets(text, parse_words) | |
| if teks: | |
| teks = teks.format( | |
| first=escape(user.first_name), | |
| last=escape(user.last_name or user.first_name), | |
| fullname=" ".join( | |
| [ | |
| escape(user.first_name), | |
| escape(user.last_name), | |
| ] | |
| if user.last_name | |
| else [escape(user.first_name)], | |
| ), | |
| username=( | |
| "@" + (await escape_markdown(escape(user.username))) | |
| if user.username | |
| else (await (mention_html(escape(user.first_name), user.id))) | |
| ), | |
| mention=await (mention_html(escape(user.first_name), user.id)), | |
| chatname=escape(m.chat.title) | |
| if m.chat.type != ChatType.PRIVATE | |
| else escape(user.first_name), | |
| id=user.id, | |
| ) | |
| else: | |
| teks = "" | |
| return teks | |
| async def cleanwlcm(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_current_cleanwelcome_settings() | |
| args = m.text.split(" ", 1) | |
| if len(args) >= 2: | |
| if args[1].lower() == "on": | |
| db.set_current_cleanwelcome_settings(True) | |
| await m.reply_text("Turned on!") | |
| return | |
| if args[1].lower() == "off": | |
| db.set_current_cleanwelcome_settings(False) | |
| await m.reply_text("Turned off!") | |
| return | |
| await m.reply_text("what are you trying to do ??") | |
| return | |
| await m.reply_text(f"Current settings:- {status}") | |
| return | |
| async def cleangdbye(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_current_cleangoodbye_settings() | |
| args = m.text.split(" ", 1) | |
| if len(args) >= 2: | |
| if args[1].lower() == "on": | |
| db.set_current_cleangoodbye_settings(True) | |
| await m.reply_text("Turned on!") | |
| return | |
| if args[1].lower() == "off": | |
| db.set_current_cleangoodbye_settings(False) | |
| await m.reply_text("Turned off!") | |
| return | |
| await m.reply_text("what are you trying to do ??") | |
| return | |
| await m.reply_text(f"Current settings:- {status}") | |
| return | |
| async def cleanservice(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_current_cleanservice_settings() | |
| args = m.text.split(" ", 1) | |
| if len(args) >= 2: | |
| if args[1].lower() == "on": | |
| db.set_current_cleanservice_settings(True) | |
| await m.reply_text("Turned on!") | |
| return | |
| if args[1].lower() == "off": | |
| db.set_current_cleanservice_settings(False) | |
| await m.reply_text("Turned off!") | |
| return | |
| await m.reply_text("what are you trying to do ??") | |
| return | |
| await m.reply_text(f"Current settings:- {status}") | |
| return | |
| async def save_wlcm(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| if m and not m.from_user: | |
| return | |
| args = m.text.split(None, 1) | |
| if len(args) >= 4096: | |
| await m.reply_text( | |
| "Word limit exceed !!", | |
| ) | |
| return | |
| if not (m.reply_to_message and m.reply_to_message.text) and len(m.command) == 0: | |
| await m.reply_text( | |
| "Error: There is no text in here! and only text with buttons are supported currently !", | |
| ) | |
| return | |
| text, msgtype, file = await get_wlcm_type(m) | |
| if not m.reply_to_message and msgtype == Types.TEXT and len(m.command) <= 2: | |
| await m.reply_text(f"<code>{m.text}</code>\n\nError: There is no data in here!") | |
| return | |
| if not text and not file: | |
| await m.reply_text( | |
| "Please provide some data!", | |
| ) | |
| return | |
| if not msgtype: | |
| await m.reply_text("Please provide some data for this to reply with!") | |
| return | |
| db.set_welcome_text(text, msgtype, file) | |
| await m.reply_text("Saved welcome!") | |
| return | |
| async def save_gdbye(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| if m and not m.from_user: | |
| return | |
| args = m.text.split(None, 1) | |
| if len(args) >= 4096: | |
| await m.reply_text( | |
| "Word limit exceeds !!", | |
| ) | |
| return | |
| if not (m.reply_to_message and m.reply_to_message.text) and len(m.command) == 0: | |
| await m.reply_text( | |
| "Error: There is no text in here! and only text with buttons are supported currently !", | |
| ) | |
| return | |
| text, msgtype, file = await get_wlcm_type(m) | |
| if not m.reply_to_message and msgtype == Types.TEXT and len(m.command) <= 2: | |
| await m.reply_text(f"<code>{m.text}</code>\n\nError: There is no data in here!") | |
| return | |
| if not text and not file: | |
| await m.reply_text( | |
| "Please provide some data!", | |
| ) | |
| return | |
| if not msgtype: | |
| await m.reply_text("Please provide some data for this to reply with!") | |
| return | |
| db.set_goodbye_text(text, msgtype, file) | |
| await m.reply_text("Saved goodbye!") | |
| return | |
| async def resetgb(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| if m and not m.from_user: | |
| return | |
| text = "Sad to see you leaving {first}.\nTake Care!" | |
| db.set_goodbye_text(text, None) | |
| await m.reply_text("Ok Done!") | |
| return | |
| async def resetwlcm(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| if m and not m.from_user: | |
| return | |
| text = "Hey {first}, welcome to {chatname}!" | |
| db.set_welcome_text(text, None) | |
| await m.reply_text("Done!") | |
| return | |
| async def cleannnnn(_, m: Message): | |
| db = Greetings(m.chat.id) | |
| clean = db.get_current_cleanservice_settings() | |
| try: | |
| if clean: | |
| await m.delete() | |
| except Exception: | |
| pass | |
| async def member_has_joined(c: Gojo, m: Message): | |
| users: List[User] = m.new_chat_members | |
| db = Greetings(m.chat.id) | |
| for user in users: | |
| banned_users = gdb.check_gban(user.id) | |
| try: | |
| if user.id == c.me.id: | |
| continue | |
| if user.id in get_support_staff("dev"): | |
| await c.send_animation( | |
| chat_id=m.chat.id, | |
| animation="./extras/william.gif", | |
| caption=f"😳 My **DEV** {user.mention} has also joined the chat!", | |
| ) | |
| continue | |
| if banned_users: | |
| await m.chat.ban_member(user.id) | |
| await c.send_message( | |
| m.chat.id, | |
| f"{user.mention} was globally banned so i banned!", | |
| ) | |
| continue | |
| if user.is_bot: | |
| continue # ignore bots | |
| except ChatAdminRequired: | |
| continue | |
| status = db.get_welcome_status() | |
| oo = db.get_welcome_text() | |
| UwU = db.get_welcome_media() | |
| mtype = db.get_welcome_msgtype() | |
| parse_words = [ | |
| "first", | |
| "last", | |
| "fullname", | |
| "username", | |
| "mention", | |
| "id", | |
| "chatname", | |
| ] | |
| hmm = await escape_mentions_using_curly_brackets_wl(user, m, oo, parse_words) | |
| if not status: | |
| continue | |
| tek, button = await parse_button(hmm) | |
| button = await build_keyboard(button) | |
| button = ikb(button) if button else None | |
| if "%%%" in tek: | |
| filter_reply = tek.split("%%%") | |
| teks = choice(filter_reply) | |
| else: | |
| teks = tek | |
| if not teks: | |
| teks = f"A wild {user.mention} appeared in {m.chat.title}! Everyone be aware." | |
| ifff = db.get_current_cleanwelcome_id() | |
| gg = db.get_current_cleanwelcome_settings() | |
| if ifff and gg: | |
| try: | |
| await c.delete_messages(m.chat.id, int(ifff)) | |
| except RPCError: | |
| pass | |
| if not teks: | |
| teks = "Hey {first}, welcome to {chatname}" | |
| try: | |
| if not UwU: | |
| jj = await c.send_message( | |
| m.chat.id, | |
| text=teks, | |
| reply_markup=button, | |
| disable_web_page_preview=True, | |
| ) | |
| else: | |
| jj = await (await send_cmd(c, mtype))( | |
| m.chat.id, | |
| UwU, | |
| caption=teks, | |
| reply_markup=button, | |
| ) | |
| if jj: | |
| db.set_cleanwlcm_id(int(jj.id)) | |
| except ChannelPrivate: | |
| continue | |
| except RPCError as e: | |
| LOGGER.error(e) | |
| LOGGER.error(format_exc(e)) | |
| async def member_has_left(c: Gojo, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_goodbye_status() | |
| oo = db.get_goodbye_text() | |
| UwU = db.get_goodbye_media() | |
| mtype = db.get_goodbye_msgtype() | |
| parse_words = [ | |
| "first", | |
| "last", | |
| "fullname", | |
| "id", | |
| "username", | |
| "mention", | |
| "chatname", | |
| ] | |
| user = m.left_chat_member or m.from_user | |
| hmm = await escape_mentions_using_curly_brackets_wl(user, m, oo, parse_words) | |
| if not status: | |
| return | |
| tek, button = await parse_button(hmm) | |
| button = await build_keyboard(button) | |
| button = ikb(button) if button else None | |
| if "%%%" in tek: | |
| filter_reply = tek.split("%%%") | |
| teks = choice(filter_reply) | |
| else: | |
| teks = tek | |
| if not teks: # Just in case | |
| teks = f"Thanks for being part of this group {user.mention}. But I don't like your arrogance and leaving the group {emoji.EYES}" | |
| ifff = db.get_current_cleangoodbye_id() | |
| iii = db.get_current_cleangoodbye_settings() | |
| if ifff and iii: | |
| try: | |
| await c.delete_messages(m.chat.id, int(ifff)) | |
| except RPCError: | |
| pass | |
| if user.id in get_support_staff("dev"): | |
| await c.send_message( | |
| m.chat.id, | |
| f"Will miss you my master {user.mention} :(", | |
| ) | |
| return | |
| if not teks: | |
| teks = "Sad to see you leaving {first}\nTake Care!" | |
| try: | |
| ooo = ( | |
| await (await send_cmd(c, mtype))( | |
| m.chat.id, | |
| UwU, | |
| caption=teks, | |
| reply_markup=button, | |
| ) if UwU else await c.send_message( | |
| m.chat.id, | |
| text=teks, | |
| reply_markup=button, | |
| disable_web_page_preview=True, | |
| ) | |
| ) | |
| if ooo: | |
| db.set_cleangoodbye_id(int(ooo.id)) | |
| return | |
| except ChannelPrivate: | |
| pass | |
| except RPCError as e: | |
| LOGGER.error(e) | |
| LOGGER.error(format_exc(e)) | |
| return | |
| async def welcome(c: Gojo, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_welcome_status() | |
| oo = db.get_welcome_text() | |
| args = m.text.split(" ", 1) | |
| if m and not m.from_user: | |
| return | |
| if len(args) >= 2: | |
| if args[1].lower() == "noformat": | |
| await m.reply_text( | |
| f"""Current welcome settings:- | |
| Welcome power: {status} | |
| Clean Welcome: {db.get_current_cleanwelcome_settings()} | |
| Cleaning service: {db.get_current_cleanservice_settings()} | |
| Welcome text in no formating: | |
| """, | |
| ) | |
| await c.send_message( | |
| m.chat.id, text=oo, parse_mode=enums.ParseMode.DISABLED | |
| ) | |
| return | |
| if args[1].lower() == "on": | |
| db.set_current_welcome_settings(True) | |
| await m.reply_text("I will greet newly joined member from now on.") | |
| return | |
| if args[1].lower() == "off": | |
| db.set_current_welcome_settings(False) | |
| await m.reply_text("I will stay quiet when someone joins.") | |
| return | |
| await m.reply_text("what are you trying to do ??") | |
| return | |
| await m.reply_text( | |
| f"""Current welcome settings:- | |
| Welcome power: {status} | |
| Clean Welcome: {db.get_current_cleanwelcome_settings()} | |
| Cleaning service: {db.get_current_cleanservice_settings()} | |
| Welcome text: | |
| """, | |
| ) | |
| UwU = db.get_welcome_media() | |
| mtype = db.get_welcome_msgtype() | |
| tek, button = await parse_button(oo) | |
| button = await build_keyboard(button) | |
| button = ikb(button) if button else None | |
| if not UwU: | |
| await c.send_message( | |
| m.chat.id, | |
| text=tek, | |
| reply_markup=button, | |
| disable_web_page_preview=True, | |
| ) | |
| else: | |
| await (await send_cmd(c, mtype))( | |
| m.chat.id, | |
| UwU, | |
| caption=tek, | |
| reply_markup=button, | |
| ) | |
| return | |
| async def goodbye(c: Gojo, m: Message): | |
| db = Greetings(m.chat.id) | |
| status = db.get_goodbye_status() | |
| oo = db.get_goodbye_text() | |
| args = m.text.split(" ", 1) | |
| if m and not m.from_user: | |
| return | |
| if len(args) >= 2: | |
| if args[1].lower() == "noformat": | |
| await m.reply_text( | |
| f"""Current goodbye settings:- | |
| Goodbye power: {status} | |
| Clean Goodbye: {db.get_current_cleangoodbye_settings()} | |
| Cleaning service: {db.get_current_cleanservice_settings()} | |
| Goodbye text in no formating: | |
| """, | |
| ) | |
| await c.send_message( | |
| m.chat.id, text=oo, parse_mode=enums.ParseMode.DISABLED | |
| ) | |
| return | |
| if args[1].lower() == "on": | |
| db.set_current_goodbye_settings(True) | |
| await m.reply_text("I don't want but I will say goodbye to the fugitives") | |
| return | |
| if args[1].lower() == "off": | |
| db.set_current_goodbye_settings(False) | |
| await m.reply_text("I will stay quiet for fugitives") | |
| return | |
| await m.reply_text("what are you trying to do ??") | |
| return | |
| await m.reply_text( | |
| f"""Current Goodbye settings:- | |
| Goodbye power: {status} | |
| Clean Goodbye: {db.get_current_cleangoodbye_settings()} | |
| Cleaning service: {db.get_current_cleanservice_settings()} | |
| Goodbye text: | |
| """, | |
| ) | |
| UwU = db.get_goodbye_media() | |
| mtype = db.get_goodbye_msgtype() | |
| tek, button = await parse_button(oo) | |
| button = await build_keyboard(button) | |
| button = ikb(button) if button else None | |
| if not UwU: | |
| await c.send_message( | |
| m.chat.id, | |
| text=tek, | |
| reply_markup=button, | |
| disable_web_page_preview=True, | |
| ) | |
| else: | |
| await (await send_cmd(c, mtype))( | |
| m.chat.id, | |
| UwU, | |
| caption=tek, | |
| reply_markup=button, | |
| ) | |
| return | |
| __PLUGIN__ = "greetings" | |
| __alt_name__ = ["welcome", "goodbye", "cleanservice"] | |
| __HELP__ = """ | |
| **Greetings** | |
| Customize your group's welcome / goodbye messages that can be personalised in multiple ways. | |
| **Note:** | |
| × Currently it supports only text! | |
| × Gojo must be an admin to greet and goodbye users. | |
| **Admin Commands:** | |
| • /setwelcome <reply> : Sets a custom welcome message. | |
| • /setgoodbye <reply> : Sets a custom goodbye message. | |
| • /resetwelcome : Resets to bot default welcome message. | |
| • /resetgoodbye : Resets to bot default goodbye message. | |
| • /welcome <on/off> | noformat : enable/disable | Shows the current welcome message | settings. | |
| • /goodbye <on/off> | noformat : enable/disable | Shows the current goodbye message | settings. | |
| • /cleanwelcome <on/off> : Shows or sets the current clean welcome settings. | |
| • /cleangoodbye <on/off> : Shows or sets the current clean goodbye settings. | |
| **Cleaner:** | |
| • /cleanservice <on/off> : Use it to clean all service messages automatically or to view current status. | |
| **Format** | |
| Check /markdownhelp for help related to formatting!""" | |