import urllib.parse import asyncio import functools from typing import (Union) from pyrogram.errors import UserNotParticipant, FloodWait from pyrogram.enums.parse_mode import ParseMode from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message,WebAppInfo from FileStream.utils.FileProcessors.translation import LANG from FileStream.Database import Database from FileStream.utils.FileProcessors.human_readable import humanbytes from FileStream.config import Telegram, Server from FileStream.bot import FileStream from FileStream.Tools.cleanup import Get_Title_Year db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) async def get_invite_link(bot, chat_id: Union[str, int]): try: invite_link = await bot.create_chat_invite_link(chat_id=chat_id) return invite_link except FloodWait as e: print(f"Sleep of {e.value}s caused by FloodWait ...") await asyncio.sleep(e.value) return await get_invite_link(bot, chat_id) async def is_user_joined(bot, message: Message): if Telegram.FORCE_SUB_ID and Telegram.FORCE_SUB_ID.startswith("-100"): channel_chat_id = int( Telegram.FORCE_SUB_ID) # When id startswith with -100 elif Telegram.FORCE_SUB_ID and (not Telegram.FORCE_SUB_ID.startswith("-100")): channel_chat_id = Telegram.FORCE_SUB_ID # When id not startswith -100 else: return 200 try: user = await bot.get_chat_member(chat_id=channel_chat_id,user_id=message.from_user.id) if user.status == "BANNED": await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True) return False except UserNotParticipant: invite_link = await get_invite_link(bot, chat_id=channel_chat_id) #------------------------------------------------------------------# if Telegram.VERIFY_PIC: ver = await message.reply_photo( photo=Telegram.VERIFY_PIC, caption="Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐", parse_mode=ParseMode.HTML, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]])) else: #---------------------------------------------------------------# ver = await message.reply_text( text="Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]]), parse_mode=ParseMode.HTML) await asyncio.sleep(30) try: await ver.delete() await message.delete() except Exception: pass return False except Exception: await message.reply_text( text=f"Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ [ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]", parse_mode=ParseMode.HTML, disable_web_page_preview=True) return False return True #-------------------Polls Handler async def send_polls_func(file_info,replied_message): return #------------------------------------------------------------- async def upload_type_func(file_info,replied_message): """ //Used values Directly from file Info file_name = file_info['file_name'] file_size = humanbytes(file_info['file_size']) mime_type = file_info['mime_type'] user_id = file_info['user_id'] message_id = file_info['message_id'] """ existing_file = await db.get_file_by_fileuniqueid_only(file_info['file']['file_unique_id'], file_info['privacy_type']) if existing_file : reply_markup,stream_text = await gen_link(existing_file['_id']) #await update.message.edit_text(text=stream_text,parse_mode=ParseMode.HTML,disable_web_page_preview=True,reply_markup=reply_markup,) #return {"reply_markup" : response.get("reply_markup"), "stream_text": response.get("stream_text"),"poster":response.get("poster"),"type":"ExistingFile"} return reply_markup,stream_text else: name = file_info['file']['caption'] if file_info['file']['caption'] else file_info['file']['file_name'] title, year = Get_Title_Year(name) poster="None" stream_text = LANG.STREAM_TEXT_Y.format(file_info['file']['file_name'],title+" "+str(year),humanbytes(file_info['file']['file_size'])) reply_markup = InlineKeyboardMarkup([ [ InlineKeyboardButton("PUBLIC",callback_data=f"pubup_{file_info['user_id']}_{file_info['message_id']}"), InlineKeyboardButton("PRIVATE",callback_data=f"privup_{file_info['user_id']}_{file_info['message_id']}"), InlineKeyboardButton("TEMPORARY",callback_data=f"tempup_{file_info['user_id']}_{file_info['message_id']}") ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ]) return reply_markup, stream_text async def priv_func(file_name, file_size): file_size = humanbytes(file_size) stream_text = LANG.PRIV_FILE_RENAME.format(file_name, file_size) reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]]) return reply_markup, stream_text #------------------[PRIV_FILE GEN LINK FUNC ]------------------# async def gen_priv_file_link(_id): file_info = await db.get_privfile(_id) file_name = file_info['file_name'] file_size = humanbytes(file_info['file_size']) mime_type = file_info['mime_type'] page_link = f"{Server.URL}app/watch/{_id}" stream_link = f"{Server.URL}api/dl/{_id}" file_link = f"https://t.me/{FileStream.username}?start=privfile_{_id}" if "video" in mime_type: stream_text = LANG.STREAM_TEXT.format(file_name, file_size, stream_link, page_link, file_link) reply_markup = InlineKeyboardMarkup([ [ InlineKeyboardButton("sᴛʀᴇᴀᴍ",url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ], [ InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ]) else: stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) reply_markup = InlineKeyboardMarkup([ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [ InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ]) return reply_markup, stream_text #---------------------[ PRIVATE GEN LINK + CALLBACK ]---------------------# async def gen_link(_id): file_info = await db.get_file(_id) #print("File Info at Gen Link :",file_info,type(file_info)) #file_name = file_info.get("file").get("file_name", ""), file_name = file_info['file']['file_name'] file_size = humanbytes(file_info['file']['file_size']) mime_type = file_info['file']['mime_type'] poster = file_info['poster'] title= file_info["title"] description= file_info['description'] release_date= file_info["release_date"] page_link = f"{Server.URL}app/watch/{_id}" stream_link = f"{Server.URL}api/dl/{_id}" file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" #poster=''.join(poster) #print("\n Poster", poster, type(poster)) if "video" in mime_type: stream_text = LANG.STREAM_TEXT.format(poster,description,title,release_date,file_name, file_size, stream_link, page_link, file_link) reply_markup = InlineKeyboardMarkup([ [ InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ], [ InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ]) else: stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) reply_markup = InlineKeyboardMarkup([ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [ InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ]) return reply_markup, stream_text #---------------------[ GEN STREAM LINKS FOR CHANNEL ]---------------------# async def gen_linkx(m: Message, _id, name: list): file_info = await db.get_file(_id) file_name = file_info['file_name'] mime_type = file_info['mime_type'] file_size = humanbytes(file_info['file_size']) page_link = f"{Server.URL}app/watch/{_id}" stream_link = f"{Server.URL}api/dl/{_id}" file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" if "video" in mime_type: stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,page_link) reply_markup = InlineKeyboardMarkup([[ InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ]]) else: stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) reply_markup = InlineKeyboardMarkup( [[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)]]) return reply_markup, stream_text #---------------------[ USER BANNED ]---------------------# async def is_user_banned(message): if await db.is_user_banned(message.from_user.id): await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True) return True return False #---------------------[ CHANNEL BANNED ]---------------------# async def is_channel_banned(bot, message): if await db.is_user_banned(message.chat.id): await bot.edit_message_reply_markup( chat_id=message.chat.id, message_id=message.id, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(f"ᴄʜᴀɴɴᴇʟ ɪs ʙᴀɴɴᴇᴅ",callback_data="N/A")]] )) return True return False #---------------------[ USER AUTH ]---------------------# async def is_user_authorized(message): if hasattr(Telegram, 'AUTH_USERS') and Telegram.AUTH_USERS: user_id = message.from_user.id if user_id == Telegram.OWNER_ID: return True if not (user_id in Telegram.AUTH_USERS): await message.reply_text(text="Yᴏᴜ ᴀʀᴇ ɴᴏᴛ ᴀᴜᴛʜᴏʀɪᴢᴇᴅ ᴛᴏ ᴜsᴇ ᴛʜɪs ʙᴏᴛ.",parse_mode=ParseMode.MARKDOWN,disable_web_page_preview=True) return False return True #---------------------[ USER EXIST ]---------------------# async def is_user_exist(bot, message): if not bool(await db.get_user(message.from_user.id)): #await db.add_user(message.from_user.id) await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡUsᴇʀ**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}`") #------------------[ User Status]--------------------# async def user_status(bot, message): user = await db.get_user(message.from_user.id) #await db.add_user(message.from_user.id) return user async def is_channel_exist(bot, message): if not bool(await db.get_user(message.chat.id)): await db.add_user(message.chat.id) members = await bot.get_chat_members_count(message.chat.id) await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡCʜᴀɴɴᴇʟ** \n**⬩ ᴄʜᴀᴛ ɴᴀᴍᴇ :** `{message.chat.title}`\n**⬩ ᴄʜᴀᴛ ɪᴅ :** `{message.chat.id}`\n**⬩ ᴛᴏᴛᴀʟ ᴍᴇᴍʙᴇʀs :** `{members}`" ) # Decorator Function to check if user is authorized #---------------------------------------------------------------- def verify_users(func): @functools.wraps(func) async def wrapper(bot, message): response = {} user = await user_status(bot, message) if user: if user['tele_status']['status'] == "ACTIVE": response['status'] = "AUTHORIZED" elif user['tele_status']['status'] == "BANNED": response['status'] = "BANNED" await bot.send_message( Telegram.ULOG_GROUP, f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']} \n Trying to access Services**" ) await bot.send_message( message.from_user.id, f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** You are Banned by Admins Contact Admins to unban you**" ) else: response['status'] = "NOT EXIST" await bot.send_message( Telegram.ULOG_GROUP, f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']}**" ) await bot.send_message( message.from_user.id, f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** Ask Admins to Authorise You**" ) return await func(bot, message, response) return wrapper async def verify_user(bot, message): if not await is_user_authorized(message): return False if await is_user_banned(message): return False await is_user_exist(bot, message) if Telegram.FORCE_SUB: if not await is_user_joined(bot, message): return False return True