Spaces:
Running
Running
| from random import shuffle | |
| from traceback import format_exc | |
| import pyrogram # don't remove | |
| from pyrogram import filters | |
| from pyrogram.enums import ChatMemberStatus as CMS | |
| from pyrogram.types import CallbackQuery, ChatMemberUpdated, ChatPermissions | |
| from pyrogram.types import InlineKeyboardButton as ikb | |
| from pyrogram.types import InlineKeyboardMarkup as ikm | |
| from pyrogram.types import Message | |
| from Powers import LOGGER | |
| from Powers.bot_class import Gojo | |
| from Powers.database.captcha_db import CAPTCHA, CAPTCHA_DATA | |
| from Powers.supports import get_support_staff | |
| from Powers.utils.captcha_helper import (genrator, get_image_captcha, | |
| get_qr_captcha) | |
| from Powers.utils.custom_filters import admin_filter, command | |
| SUPPORT_STAFF = get_support_staff() | |
| async def start_captcha(c: Gojo, m: Message): | |
| captcha = CAPTCHA() | |
| split = m.command | |
| if len(split) == 1: | |
| is_cap = captcha.is_captcha(m.chat.id) | |
| if is_cap: | |
| txt = "Captcha verification is currently **on** for this chat" | |
| else: | |
| txt = "Captcha verification is currently **off** for this chat" | |
| await m.reply_text(txt) | |
| return | |
| else: | |
| on_off = split[1].lower() | |
| if on_off in ["on","yes","enable"]: | |
| captcha.insert_captcha(m.chat.id) | |
| await m.reply_text("Captcha verification is now **on** for this chat") | |
| return | |
| elif on_off in ["off","no","disable"]: | |
| captcha.remove_captcha(m.chat.id) | |
| await m.reply_text("Captcha verification is now **off** for this chat") | |
| return | |
| else: | |
| await m.reply_text("**USAGE**\n/captcha [on | yes | enable | off | no | disable]") | |
| return | |
| async def set_captcha_mode(c: Gojo, m: Message): | |
| split = m.command | |
| captcha = CAPTCHA() | |
| if len(split) == 1: | |
| curr = captcha.get_captcha(m.chat.id) | |
| if curr: | |
| capatcha_type = curr["captcha_type"] | |
| await m.reply_text(f"Current captcha verification methode is {capatcha_type}\nAvailable methodes:\n■ qr\n■ image") | |
| return | |
| else: | |
| await m.reply_text("Captcha verification is off for the current chat") | |
| return | |
| else: | |
| type_ = split[1].lower() | |
| if type_ == "qr": | |
| captcha.update_type(m.chat.id, "qr") | |
| await m.reply_text("Captcha verification is now changed to qr code") | |
| return | |
| elif type_ == "image": | |
| captcha.update_type(m.chat.id,"image") | |
| await m.reply_text("Captcha verication is now changed to image") | |
| return | |
| else: | |
| await m.reply_text("**USAGE**\n/captchamode [qr | image]") | |
| return | |
| async def joinss(c: Gojo, u: ChatMemberUpdated): | |
| chat = u.chat.id | |
| if ( | |
| u.new_chat_member | |
| ): | |
| pass | |
| else: | |
| return | |
| user = u.new_chat_member.user.id | |
| userr = u.new_chat_member.user | |
| is_qr = CAPTCHA().is_captcha(chat) | |
| if not is_qr: | |
| return | |
| captcha = CAPTCHA() | |
| cap_data = CAPTCHA_DATA() | |
| if user in SUPPORT_STAFF: | |
| return | |
| captcha_type = captcha.get_captcha(chat) | |
| is_already = cap_data.is_already_data(chat, user) | |
| mess = False | |
| try: | |
| if is_already: | |
| mess = await c.get_messages(chat,int(is_already)) | |
| except Exception: | |
| cap_data.del_message_id(chat,is_already) | |
| mess = False | |
| is_already = False | |
| if is_already and not mess: | |
| cap_data.del_message_id(chat,is_already) | |
| return | |
| try: | |
| await c.restrict_chat_member(chat,user,ChatPermissions()) | |
| except Exception as e: | |
| LOGGER.error(e) | |
| LOGGER.error(format_exc()) | |
| return | |
| if not is_already: | |
| if captcha_type == "qr": | |
| pic = await get_qr_captcha(chat, user) | |
| cap = f"Please {userr.mention} scan this qr code with your phone to verify that you are human" | |
| ms = await c.send_photo(chat,pic,caption=cap) | |
| cap_data.store_message_id(chat,user,ms.id) | |
| return | |
| elif captcha_type == "image": | |
| img, code = await get_image_captcha(chat, user) | |
| cap = f"Please {userr.mention} please choose the correct code from the one given bellow\nYou have three tries if you get all three wrong u will be kicked from the chat.\nTries left: 3" | |
| cap_data.load_cap_data(chat, user, code) | |
| rand = [code] | |
| while len(rand) != 5: | |
| hehe = genrator() | |
| rand.append(hehe) | |
| shuffle(rand) | |
| ini = f"captcha_{chat}_{user}_" | |
| kb = ikm( | |
| [ | |
| [ | |
| ikb(rand[0],ini+rand[0]) | |
| ], | |
| [ | |
| ikb(rand[1],ini+rand[1]) | |
| ], | |
| [ | |
| ikb(rand[2],ini+rand[2]) | |
| ], | |
| [ | |
| ikb(rand[3],ini+rand[3]) | |
| ], | |
| [ | |
| ikb(rand[4],ini+rand[4]) | |
| ] | |
| ] | |
| ) | |
| await c.send_photo(chat,img,caption=cap,reply_markup=kb) | |
| return | |
| elif is_already and mess: | |
| kb = ikm( | |
| [ | |
| [ | |
| ikb("Click here to verify",url=mess.link) | |
| ] | |
| ] | |
| ) | |
| await c.send_message(f"{userr.mention} your verification is already pending",reply_markup=kb) | |
| return | |
| else: | |
| await c.unban_chat_member(chat,user) | |
| return | |
| async def captcha_codes_check(c: Gojo, q: CallbackQuery): | |
| split = q.data.split("_") | |
| chat = int(split[1]) | |
| user = int(split[2]) | |
| code = split[3] | |
| if q.from_user.id != user: | |
| await q.answer("Not for you BAKA!") | |
| return | |
| c_data = CAPTCHA_DATA() | |
| code_ = c_data.get_cap_data(chat,user) | |
| if code_ == code: | |
| cap = "You guessed the captcha right...Now you can talk in the chat with no restrictions" | |
| c_data.remove_cap_data(chat,user) | |
| await q.answer(cap,True) | |
| try: | |
| await q.message.chat.unban_member(user) | |
| except Exception as e: | |
| await q.message.reply_text(f"Unable to unmute {q.from_user.mention} this user") | |
| await q.message.reply_text(e) | |
| return | |
| await c.send_message(chat,f"{q.from_user.mention} now you are free to talk") | |
| await q.message.delete() | |
| return | |
| else: | |
| caps = q.message.caption.split(":") | |
| tries = int(caps[1].strip()) - 1 | |
| caps.pop(-1) | |
| caps.append(f" {tries}") | |
| new_cap = ":".join(caps) | |
| await q.answer(f"Wrong\nTries left: {tries}", True) | |
| if not tries: | |
| new_cap = f"You have zero tries left now. I am going to kick you know coz you failed to solve captcha...see yaa {q.from_user.mention}" | |
| try: | |
| await q.message.chat.ban_member(user) | |
| except Exception as e: | |
| await q.message.reply_text("Failed to kick member") | |
| return | |
| await q.message.delete() | |
| await q.message.reply_text(new_cap) | |
| await c.unban_chat_member(chat,user) | |
| else: | |
| await q.edit_message_caption(new_cap,reply_markup=q.message.reply_markup) | |
| return | |
| __PLUGIN__ = "captcha" | |
| __HELP__ = """ | |
| • /captcha [on|yes|enable|off|no|disable] : To enable or disable captcha verification | |
| • /captchamode [qr|image] : To change captcha mode | |
| """ |