Spaces:
Running
Running
import urllib.parse | |
import asyncio | |
import functools | |
from typing import (Union) | |
from pyrogram.errors import UserNotParticipant, FloodWait | |
from pyrogram.enums.parse_mode import ParseMode | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message,WebAppInfo | |
from FileStream.utils.FileProcessors.translation import LANG | |
from FileStream.Database import Database | |
from FileStream.utils.FileProcessors.human_readable import humanbytes | |
from FileStream.config import Telegram, Server | |
from FileStream.bot import FileStream | |
from FileStream.Tools.cleanup import Get_Title_Year | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
async def get_invite_link(bot, chat_id: Union[str, int]): | |
try: | |
invite_link = await bot.create_chat_invite_link(chat_id=chat_id) | |
return invite_link | |
except FloodWait as e: | |
print(f"Sleep of {e.value}s caused by FloodWait ...") | |
await asyncio.sleep(e.value) | |
return await get_invite_link(bot, chat_id) | |
async def is_user_joined(bot, message: Message): | |
if Telegram.FORCE_SUB_ID and Telegram.FORCE_SUB_ID.startswith("-100"): | |
channel_chat_id = int( | |
Telegram.FORCE_SUB_ID) # When id startswith with -100 | |
elif Telegram.FORCE_SUB_ID and (not Telegram.FORCE_SUB_ID.startswith("-100")): | |
channel_chat_id = Telegram.FORCE_SUB_ID # When id not startswith -100 | |
else: | |
return 200 | |
try: | |
user = await bot.get_chat_member(chat_id=channel_chat_id,user_id=message.from_user.id) | |
if user.status == "BANNED": | |
await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), | |
parse_mode=ParseMode.MARKDOWN, | |
disable_web_page_preview=True) | |
return False | |
except UserNotParticipant: | |
invite_link = await get_invite_link(bot, chat_id=channel_chat_id) | |
#------------------------------------------------------------------# | |
if Telegram.VERIFY_PIC: | |
ver = await message.reply_photo( | |
photo=Telegram.VERIFY_PIC, | |
caption="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>", | |
parse_mode=ParseMode.HTML, | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]])) | |
else: | |
#---------------------------------------------------------------# | |
ver = await message.reply_text( | |
text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>", | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]]), | |
parse_mode=ParseMode.HTML) | |
await asyncio.sleep(30) | |
try: | |
await ver.delete() | |
await message.delete() | |
except Exception: | |
pass | |
return False | |
except Exception: | |
await message.reply_text( | |
text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Telegram.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>", | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True) | |
return False | |
return True | |
#-------------------Polls Handler | |
async def send_polls_func(file_info,replied_message): | |
return | |
#------------------------------------------------------------- | |
async def upload_type_func(file_info,replied_message): | |
""" | |
//Used values Directly from file Info | |
file_name = file_info['file_name'] | |
file_size = humanbytes(file_info['file_size']) | |
mime_type = file_info['mime_type'] | |
user_id = file_info['user_id'] | |
message_id = file_info['message_id'] | |
""" | |
existing_file = await db.get_file_by_fileuniqueid_only(file_info['file']['file_unique_id'], file_info['privacy_type']) | |
if existing_file : | |
response = await gen_link(existing_file['_id']) | |
#await update.message.edit_text(text=stream_text,parse_mode=ParseMode.HTML,disable_web_page_preview=True,reply_markup=reply_markup,) | |
return {"reply_markup" : response["reply_markup"], "stream_text": response["stream_text"],"poster":response["poster"],"type":"ExistingFile"} | |
else: | |
name = file_info['file']['caption'] if file_info['file']['caption'] else file_info['file']['file_name'] | |
title, year = Get_Title_Year(name) | |
stream_text = LANG.STREAM_TEXT_Y.format(file_info['file']['file_name'],title+" "+str(year),humanbytes(file_info['file']['file_size'])) | |
reply_markup = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("PUBLIC",callback_data=f"pubup_{file_info['user_id']}_{file_info['message_id']}"), | |
InlineKeyboardButton("PRIVATE",callback_data=f"privup_{file_info['user_id']}_{file_info['message_id']}"), | |
InlineKeyboardButton("TEMPORARY",callback_data=f"tempup_{file_info['user_id']}_{file_info['message_id']}") | |
], | |
[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] | |
]) | |
return {"reply_markup" : reply_markup, "stream_text": stream_text, "type":"OPTIONS"} | |
async def priv_func(file_name, file_size): | |
file_size = humanbytes(file_size) | |
stream_text = LANG.PRIV_FILE_RENAME.format(file_name, file_size) | |
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]]) | |
return reply_markup, stream_text | |
#------------------[PRIV_FILE GEN LINK FUNC ]------------------# | |
async def gen_priv_file_link(_id): | |
file_info = await db.get_privfile(_id) | |
file_name = file_info['file_name'] | |
file_size = humanbytes(file_info['file_size']) | |
mime_type = file_info['mime_type'] | |
page_link = f"{Server.URL}app/watch/{_id}" | |
stream_link = f"{Server.URL}api/dl/{_id}" | |
file_link = f"https://t.me/{FileStream.username}?start=privfile_{_id}" | |
if "video" in mime_type: | |
stream_text = LANG.STREAM_TEXT.format(file_name, file_size, stream_link, page_link, file_link) | |
reply_markup = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ",url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
], | |
[ | |
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), | |
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") | |
], | |
[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] | |
]) | |
else: | |
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) | |
reply_markup = InlineKeyboardMarkup([ | |
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], | |
[ | |
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), | |
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") | |
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] | |
]) | |
return reply_markup, stream_text | |
#---------------------[ PRIVATE GEN LINK + CALLBACK ]---------------------# | |
async def gen_link(_id): | |
file_info = await db.get_file(_id) | |
print("File Info at Gen Link :",file_info,type(file_info)) | |
#file_name = file_info.get("file").get("file_name", ""), | |
file_name = file_info['file']['file_name'], | |
file_size = humanbytes(file_info['file']['file_size']), | |
mime_type = file_info['file']['mime_type'], | |
poster = ''.join(file_info.get("poster","")), | |
title= file_info["title"], | |
description= file_info['description'], | |
release_date= file_info["release_date"], | |
page_link = f"{Server.URL}app/watch/{_id}", | |
stream_link = f"{Server.URL}api/dl/{_id}", | |
file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" | |
print(poster,type(poster)) | |
if "video" in mime_type: | |
stream_text = LANG.STREAM_TEXT.format(poster,description,title,release_date,file_name, file_size, stream_link, page_link, file_link) | |
reply_markup = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
], | |
[ | |
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), | |
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") | |
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] | |
]) | |
else: | |
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) | |
reply_markup = InlineKeyboardMarkup([ | |
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], | |
[ | |
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), | |
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") | |
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] | |
]) | |
return {"reply_markup": reply_markup, "stream_text": stream_text, "poster": urllib.parse.urlencode(poster) } | |
#---------------------[ GEN STREAM LINKS FOR CHANNEL ]---------------------# | |
async def gen_linkx(m: Message, _id, name: list): | |
file_info = await db.get_file(_id) | |
file_name = file_info['file_name'] | |
mime_type = file_info['mime_type'] | |
file_size = humanbytes(file_info['file_size']) | |
page_link = f"{Server.URL}app/watch/{_id}" | |
stream_link = f"{Server.URL}api/dl/{_id}" | |
file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" | |
if "video" in mime_type: | |
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,page_link) | |
reply_markup = InlineKeyboardMarkup([[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
]]) | |
else: | |
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) | |
reply_markup = InlineKeyboardMarkup( | |
[[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)]]) | |
return reply_markup, stream_text | |
#---------------------[ USER BANNED ]---------------------# | |
async def is_user_banned(message): | |
if await db.is_user_banned(message.from_user.id): | |
await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), | |
parse_mode=ParseMode.MARKDOWN, | |
disable_web_page_preview=True) | |
return True | |
return False | |
#---------------------[ CHANNEL BANNED ]---------------------# | |
async def is_channel_banned(bot, message): | |
if await db.is_user_banned(message.chat.id): | |
await bot.edit_message_reply_markup( | |
chat_id=message.chat.id, | |
message_id=message.id, | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(f"ᴄʜᴀɴɴᴇʟ ɪs ʙᴀɴɴᴇᴅ",callback_data="N/A")]] | |
)) | |
return True | |
return False | |
#---------------------[ USER AUTH ]---------------------# | |
async def is_user_authorized(message): | |
if hasattr(Telegram, 'AUTH_USERS') and Telegram.AUTH_USERS: | |
user_id = message.from_user.id | |
if user_id == Telegram.OWNER_ID: | |
return True | |
if not (user_id in Telegram.AUTH_USERS): | |
await message.reply_text(text="Yᴏᴜ ᴀʀᴇ ɴᴏᴛ ᴀᴜᴛʜᴏʀɪᴢᴇᴅ ᴛᴏ ᴜsᴇ ᴛʜɪs ʙᴏᴛ.",parse_mode=ParseMode.MARKDOWN,disable_web_page_preview=True) | |
return False | |
return True | |
#---------------------[ USER EXIST ]---------------------# | |
async def is_user_exist(bot, message): | |
if not bool(await db.get_user(message.from_user.id)): | |
#await db.add_user(message.from_user.id) | |
await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡUsᴇʀ**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}`") | |
#------------------[ User Status]--------------------# | |
async def user_status(bot, message): | |
user = await db.get_user(message.from_user.id) | |
#await db.add_user(message.from_user.id) | |
return user | |
async def is_channel_exist(bot, message): | |
if not bool(await db.get_user(message.chat.id)): | |
await db.add_user(message.chat.id) | |
members = await bot.get_chat_members_count(message.chat.id) | |
await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡCʜᴀɴɴᴇʟ** \n**⬩ ᴄʜᴀᴛ ɴᴀᴍᴇ :** `{message.chat.title}`\n**⬩ ᴄʜᴀᴛ ɪᴅ :** `{message.chat.id}`\n**⬩ ᴛᴏᴛᴀʟ ᴍᴇᴍʙᴇʀs :** `{members}`" ) | |
# Decorator Function to check if user is authorized | |
#---------------------------------------------------------------- | |
def verify_users(func): | |
async def wrapper(bot, message): | |
response = {} | |
user = await user_status(bot, message) | |
if user: | |
if user['tele_status']['status'] == "ACTIVE": | |
response['status'] = "AUTHORIZED" | |
elif user['tele_status']['status'] == "BANNED": | |
response['status'] = "BANNED" | |
await bot.send_message( | |
Telegram.ULOG_GROUP, | |
f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']} \n Trying to access Services**" | |
) | |
await bot.send_message( | |
message.from_user.id, | |
f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** You are Banned by Admins Contact Admins to unban you**" | |
) | |
else: | |
response['status'] = "NOT EXIST" | |
await bot.send_message( | |
Telegram.ULOG_GROUP, | |
f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']}**" | |
) | |
await bot.send_message( | |
message.from_user.id, | |
f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** Ask Admins to Authorise You**" | |
) | |
return await func(bot, message, response) | |
return wrapper | |
async def verify_user(bot, message): | |
if not await is_user_authorized(message): | |
return False | |
if await is_user_banned(message): | |
return False | |
await is_user_exist(bot, message) | |
if Telegram.FORCE_SUB: | |
if not await is_user_joined(bot, message): | |
return False | |
return True | |