Spaces:
Runtime error
Runtime error
import time, re | |
from config import BOT_USERNAME | |
from pyrogram.enums import MessageEntityType | |
from pyrogram import filters | |
from pyrogram.types import Message | |
from Devine import app | |
from Devine.mongo.readable_time import get_readable_time | |
from Devine.mongo.afkdb import add_afk, is_afk, remove_afk | |
async def active_afk(_, message: Message): | |
if message.sender_chat: | |
return | |
user_id = message.from_user.id | |
verifier, reasondb = await is_afk(user_id) | |
if verifier: | |
await remove_afk(user_id) | |
try: | |
afktype = reasondb["type"] | |
timeafk = reasondb["time"] | |
data = reasondb["data"] | |
reasonafk = reasondb["reason"] | |
seenago = get_readable_time((int(time.time() - timeafk))) | |
if afktype == "text": | |
send = await message.reply_text( | |
f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}", | |
disable_web_page_preview=True, | |
) | |
if afktype == "text_reason": | |
send = await message.reply_text( | |
f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`", | |
disable_web_page_preview=True, | |
) | |
if afktype == "animation": | |
if str(reasonafk) == "None": | |
send = await message.reply_animation( | |
data, | |
caption=f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}", | |
) | |
else: | |
send = await message.reply_animation( | |
data, | |
caption=f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`", | |
) | |
if afktype == "photo": | |
if str(reasonafk) == "None": | |
send = await message.reply_photo( | |
photo=f"downloads/{user_id}.jpg", | |
caption=f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}", | |
) | |
else: | |
send = await message.reply_photo( | |
photo=f"downloads/{user_id}.jpg", | |
caption=f"{message.from_user.first_name} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`", | |
) | |
except Exception: | |
send = await message.reply_text( | |
f"**{message.from_user.first_name}** ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ", | |
disable_web_page_preview=True, | |
) | |
if len(message.command) == 1 and not message.reply_to_message: | |
details = { | |
"type": "text", | |
"time": time.time(), | |
"data": None, | |
"reason": None, | |
} | |
elif len(message.command) > 1 and not message.reply_to_message: | |
_reason = (message.text.split(None, 1)[1].strip())[:100] | |
details = { | |
"type": "text_reason", | |
"time": time.time(), | |
"data": None, | |
"reason": _reason, | |
} | |
elif len(message.command) == 1 and message.reply_to_message.animation: | |
_data = message.reply_to_message.animation.file_id | |
details = { | |
"type": "animation", | |
"time": time.time(), | |
"data": _data, | |
"reason": None, | |
} | |
elif len(message.command) > 1 and message.reply_to_message.animation: | |
_data = message.reply_to_message.animation.file_id | |
_reason = (message.text.split(None, 1)[1].strip())[:100] | |
details = { | |
"type": "animation", | |
"time": time.time(), | |
"data": _data, | |
"reason": _reason, | |
} | |
elif len(message.command) == 1 and message.reply_to_message.photo: | |
await app.download_media( | |
message.reply_to_message, file_name=f"{user_id}.jpg" | |
) | |
details = { | |
"type": "photo", | |
"time": time.time(), | |
"data": None, | |
"reason": None, | |
} | |
elif len(message.command) > 1 and message.reply_to_message.photo: | |
await app.download_media( | |
message.reply_to_message, file_name=f"{user_id}.jpg" | |
) | |
_reason = message.text.split(None, 1)[1].strip() | |
details = { | |
"type": "photo", | |
"time": time.time(), | |
"data": None, | |
"reason": _reason, | |
} | |
elif len(message.command) == 1 and message.reply_to_message.sticker: | |
if message.reply_to_message.sticker.is_animated: | |
details = { | |
"type": "text", | |
"time": time.time(), | |
"data": None, | |
"reason": None, | |
} | |
else: | |
await app.download_media( | |
message.reply_to_message, file_name=f"{user_id}.jpg" | |
) | |
details = { | |
"type": "photo", | |
"time": time.time(), | |
"data": None, | |
"reason": None, | |
} | |
elif len(message.command) > 1 and message.reply_to_message.sticker: | |
_reason = (message.text.split(None, 1)[1].strip())[:100] | |
if message.reply_to_message.sticker.is_animated: | |
details = { | |
"type": "text_reason", | |
"time": time.time(), | |
"data": None, | |
"reason": _reason, | |
} | |
else: | |
await app.download_media( | |
message.reply_to_message, file_name=f"{user_id}.jpg" | |
) | |
details = { | |
"type": "photo", | |
"time": time.time(), | |
"data": None, | |
"reason": _reason, | |
} | |
else: | |
details = { | |
"type": "text", | |
"time": time.time(), | |
"data": None, | |
"reason": None, | |
} | |
await add_afk(user_id, details) | |
await message.reply_text(f"{message.from_user.first_name} ɪs ɴᴏᴡ ᴀғᴋ!") | |
chat_watcher_group = 1 | |
async def chat_watcher_func(_, message): | |
if message.sender_chat: | |
return | |
userid = message.from_user.id | |
user_name = message.from_user.first_name | |
if message.entities: | |
possible = ["/afk", f"/afk@{BOT_USERNAME}"] | |
message_text = message.text or message.caption | |
for entity in message.entities: | |
if entity.type == MessageEntityType.BOT_COMMAND: | |
if (message_text[0 : 0 + entity.length]).lower() in possible: | |
return | |
msg = "" | |
replied_user_id = 0 | |
verifier, reasondb = await is_afk(userid) | |
if verifier: | |
await remove_afk(userid) | |
try: | |
afktype = reasondb["type"] | |
timeafk = reasondb["time"] | |
data = reasondb["data"] | |
reasonafk = reasondb["reason"] | |
seenago = get_readable_time((int(time.time() - timeafk))) | |
if afktype == "text": | |
msg += f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\n" | |
if afktype == "text_reason": | |
msg += f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n" | |
if afktype == "animation": | |
if str(reasonafk) == "None": | |
send = await message.reply_animation( | |
data, | |
caption=f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_animation( | |
data, | |
caption=f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
if afktype == "photo": | |
if str(reasonafk) == "None": | |
send = await message.reply_photo( | |
photo=f"downloads/{userid}.jpg", | |
caption=f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_photo( | |
photo=f"downloads/{userid}.jpg", | |
caption=f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ ᴀɴᴅ ᴡᴀs ᴀᴡᴀʏ ғᴏʀ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
except: | |
msg += f"{user_name[:25]} ɪs ʙᴀᴄᴋ ᴏɴʟɪɴᴇ\n\n" | |
if message.reply_to_message: | |
try: | |
replied_first_name = message.reply_to_message.from_user.first_name | |
replied_user_id = message.reply_to_message.from_user.id | |
verifier, reasondb = await is_afk(replied_user_id) | |
if verifier: | |
try: | |
afktype = reasondb["type"] | |
timeafk = reasondb["time"] | |
data = reasondb["data"] | |
reasonafk = reasondb["reason"] | |
seenago = get_readable_time((int(time.time() - timeafk))) | |
if afktype == "text": | |
msg += ( | |
f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n" | |
) | |
if afktype == "text_reason": | |
msg += f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n" | |
if afktype == "animation": | |
if str(reasonafk) == "None": | |
send = await message.reply_animation( | |
data, | |
caption=f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_animation( | |
data, | |
caption=f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
if afktype == "photo": | |
if str(reasonafk) == "None": | |
send = await message.reply_photo( | |
photo=f"downloads/{replied_user_id}.jpg", | |
caption=f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_photo( | |
photo=f"downloads/{replied_user_id}.jpg", | |
caption=f"{replied_first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
except Exception: | |
msg += f"{replied_first_name} ɪs ᴀғᴋ,\nᴩᴀᴛᴀ ɴɪ ʙᴄ ᴋᴀʙ sᴇ\n\n" | |
except: | |
pass | |
if message.entities: | |
entity = message.entities | |
j = 0 | |
for x in range(len(entity)): | |
if (entity[j].type) == MessageEntityType.MENTION: | |
found = re.findall("@([_0-9a-zA-Z]+)", message.text) | |
try: | |
get_user = found[j] | |
user = await app.get_users(get_user) | |
if user.id == replied_user_id: | |
j += 1 | |
continue | |
except: | |
j += 1 | |
continue | |
verifier, reasondb = await is_afk(user.id) | |
if verifier: | |
try: | |
afktype = reasondb["type"] | |
timeafk = reasondb["time"] | |
data = reasondb["data"] | |
reasonafk = reasondb["reason"] | |
seenago = get_readable_time((int(time.time() - timeafk))) | |
if afktype == "text": | |
msg += ( | |
f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n" | |
) | |
if afktype == "text_reason": | |
msg += f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n" | |
if afktype == "animation": | |
if str(reasonafk) == "None": | |
send = await message.reply_animation( | |
data, | |
caption=f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_animation( | |
data, | |
caption=f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
if afktype == "photo": | |
if str(reasonafk) == "None": | |
send = await message.reply_photo( | |
photo=f"downloads/{user.id}.jpg", | |
caption=f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_photo( | |
photo=f"downloads/{user.id}.jpg", | |
caption=f"{user.first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
except: | |
msg += f"{user.first_name[:25]} ɪs ᴀғᴋ\n\n" | |
elif (entity[j].type) == MessageEntityType.TEXT_MENTION: | |
try: | |
user_id = entity[j].user.id | |
if user_id == replied_user_id: | |
j += 1 | |
continue | |
first_name = entity[j].user.first_name | |
except: | |
j += 1 | |
continue | |
verifier, reasondb = await is_afk(user_id) | |
if verifier: | |
try: | |
afktype = reasondb["type"] | |
timeafk = reasondb["time"] | |
data = reasondb["data"] | |
reasonafk = reasondb["reason"] | |
seenago = get_readable_time((int(time.time() - timeafk))) | |
if afktype == "text": | |
msg += f"{first_name[:25]} is ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n" | |
if afktype == "text_reason": | |
msg += f"{first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n" | |
if afktype == "animation": | |
if str(reasonafk) == "None": | |
send = await message.reply_animation( | |
data, | |
caption=f"{first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_animation( | |
data, | |
caption=f"{first_name[:25]} ɪs AFK sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
if afktype == "photo": | |
if str(reasonafk) == "None": | |
send = await message.reply_photo( | |
photo=f"downloads/{user_id}.jpg", | |
caption=f"{first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\n", | |
) | |
else: | |
send = await message.reply_photo( | |
photo=f"downloads/{user_id}.jpg", | |
caption=f"{first_name[:25]} ɪs ᴀғᴋ sɪɴᴄᴇ {seenago}\n\nʀᴇᴀsᴏɴ: `{reasonafk}`\n\n", | |
) | |
except: | |
msg += f"{first_name[:25]} ɪs ᴀғᴋ\n\n" | |
j += 1 | |
if msg != "": | |
try: | |
send = await message.reply_text(msg, disable_web_page_preview=True) | |
except: | |
return | |