privateone's picture
Update FileStream/utils/FileProcessors/bot_utils.py
1d0b593 verified
import urllib.parse
import asyncio
import functools
from typing import (Union)
from pyrogram.errors import UserNotParticipant, FloodWait
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message,WebAppInfo
from FileStream.utils.FileProcessors.translation import LANG
from FileStream.Database import Database
from FileStream.utils.FileProcessors.human_readable import humanbytes
from FileStream.config import Telegram, Server
from FileStream.bot import FileStream
from FileStream.Tools.cleanup import Get_Title_Year
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
async def get_invite_link(bot, chat_id: Union[str, int]):
try:
invite_link = await bot.create_chat_invite_link(chat_id=chat_id)
return invite_link
except FloodWait as e:
print(f"Sleep of {e.value}s caused by FloodWait ...")
await asyncio.sleep(e.value)
return await get_invite_link(bot, chat_id)
async def is_user_joined(bot, message: Message):
if Telegram.FORCE_SUB_ID and Telegram.FORCE_SUB_ID.startswith("-100"):
channel_chat_id = int(
Telegram.FORCE_SUB_ID) # When id startswith with -100
elif Telegram.FORCE_SUB_ID and (not Telegram.FORCE_SUB_ID.startswith("-100")):
channel_chat_id = Telegram.FORCE_SUB_ID # When id not startswith -100
else:
return 200
try:
user = await bot.get_chat_member(chat_id=channel_chat_id,user_id=message.from_user.id)
if user.status == "BANNED":
await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True)
return False
except UserNotParticipant:
invite_link = await get_invite_link(bot, chat_id=channel_chat_id)
#------------------------------------------------------------------#
if Telegram.VERIFY_PIC:
ver = await message.reply_photo(
photo=Telegram.VERIFY_PIC,
caption="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
parse_mode=ParseMode.HTML,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]]))
else:
#---------------------------------------------------------------#
ver = await message.reply_text(
text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]]),
parse_mode=ParseMode.HTML)
await asyncio.sleep(30)
try:
await ver.delete()
await message.delete()
except Exception:
pass
return False
except Exception:
await message.reply_text(
text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Telegram.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>",
parse_mode=ParseMode.HTML,
disable_web_page_preview=True)
return False
return True
#-------------------Polls Handler
async def send_polls_func(file_info,replied_message):
return
#-------------------------------------------------------------
async def upload_type_func(file_info,replied_message):
"""
//Used values Directly from file Info
file_name = file_info['file_name']
file_size = humanbytes(file_info['file_size'])
mime_type = file_info['mime_type']
user_id = file_info['user_id']
message_id = file_info['message_id']
"""
existing_file = await db.get_file_by_fileuniqueid_only(file_info['file']['file_unique_id'], file_info['privacy_type'])
if existing_file :
reply_markup,stream_text = await gen_link(existing_file['_id'])
#await update.message.edit_text(text=stream_text,parse_mode=ParseMode.HTML,disable_web_page_preview=True,reply_markup=reply_markup,)
#return {"reply_markup" : response.get("reply_markup"), "stream_text": response.get("stream_text"),"poster":response.get("poster"),"type":"ExistingFile"}
return reply_markup,stream_text
else:
name = file_info['file']['caption'] if file_info['file']['caption'] else file_info['file']['file_name']
title, year = Get_Title_Year(name)
poster="None"
stream_text = LANG.STREAM_TEXT_Y.format(file_info['file']['file_name'],title+" "+str(year),humanbytes(file_info['file']['file_size']))
reply_markup = InlineKeyboardMarkup([
[
InlineKeyboardButton("PUBLIC",callback_data=f"usercheck_{file_info['user_id']}_{file_info['message_id']}"),
InlineKeyboardButton("PRIVATE",callback_data=f"privup_{file_info['user_id']}_{file_info['message_id']}"),
InlineKeyboardButton("TEMPORARY",callback_data=f"tempup_{file_info['user_id']}_{file_info['message_id']}")
],
[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]
])
return reply_markup, stream_text
async def priv_func(file_name, file_size):
file_size = humanbytes(file_size)
stream_text = LANG.PRIV_FILE_RENAME.format(file_name, file_size)
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]])
return reply_markup, stream_text
#------------------[PRIV_FILE GEN LINK FUNC ]------------------#
async def gen_priv_file_link(_id):
file_info = await db.get_privfile(_id)
file_name = file_info['file_name']
file_size = humanbytes(file_info['file_size'])
mime_type = file_info['mime_type']
page_link = f"{Server.URL}app/watch/{_id}"
stream_link = f"{Server.URL}api/dl/{_id}"
file_link = f"https://t.me/{FileStream.username}?start=privfile_{_id}"
if "video" in mime_type:
stream_text = LANG.STREAM_TEXT.format(file_name, file_size, stream_link, page_link, file_link)
reply_markup = InlineKeyboardMarkup([
[
InlineKeyboardButton("sᴛʀᴇᴀᴍ",url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
],
[
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link),
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}")
],
[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]
])
else:
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link)
reply_markup = InlineKeyboardMarkup([
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
[
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link),
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}")
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]
])
return reply_markup, stream_text
#---------------------[ PRIVATE GEN LINK + CALLBACK ]---------------------#
async def gen_link(_id):
file_info = await db.get_file(_id)
#print("File Info at Gen Link :",file_info,type(file_info))
#file_name = file_info.get("file").get("file_name", ""),
file_name = file_info['file']['file_name']
file_size = humanbytes(file_info['file']['file_size'])
mime_type = file_info['file']['mime_type']
poster = file_info['poster']
title= file_info["title"]
description= file_info['description']
release_date= file_info["release_date"]
page_link = f"{Server.URL}app/watch/{_id}"
stream_link = f"{Server.URL}api/dl/{_id}"
file_link = f"https://t.me/{FileStream.username}?start=file_{_id}"
#poster=''.join(poster)
#print("\n Poster", poster, type(poster))
if "video" in mime_type:
stream_text = LANG.STREAM_TEXT.format(poster,description,title,release_date,file_name, file_size, stream_link, page_link, file_link)
reply_markup = InlineKeyboardMarkup([
[
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
],
[
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link),
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}")
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]
])
else:
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link)
reply_markup = InlineKeyboardMarkup([
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
[
InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link),
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}")
], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]
])
return reply_markup, stream_text
#---------------------[ GEN STREAM LINKS FOR CHANNEL ]---------------------#
async def gen_linkx(m: Message, _id, name: list):
file_info = await db.get_file(_id)
file_name = file_info['file_name']
mime_type = file_info['mime_type']
file_size = humanbytes(file_info['file_size'])
page_link = f"{Server.URL}app/watch/{_id}"
stream_link = f"{Server.URL}api/dl/{_id}"
file_link = f"https://t.me/{FileStream.username}?start=file_{_id}"
if "video" in mime_type:
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,page_link)
reply_markup = InlineKeyboardMarkup([[
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
]])
else:
stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link)
reply_markup = InlineKeyboardMarkup(
[[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)]])
return reply_markup, stream_text
#---------------------[ USER BANNED ]---------------------#
async def is_user_banned(message):
if await db.is_user_banned(message.from_user.id):
await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True)
return True
return False
#---------------------[ CHANNEL BANNED ]---------------------#
async def is_channel_banned(bot, message):
if await db.is_user_banned(message.chat.id):
await bot.edit_message_reply_markup(
chat_id=message.chat.id,
message_id=message.id,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(f"ᴄʜᴀɴɴᴇʟ ɪs ʙᴀɴɴᴇᴅ",callback_data="N/A")]]
))
return True
return False
#---------------------[ USER AUTH ]---------------------#
async def is_user_authorized(message):
if hasattr(Telegram, 'AUTH_USERS') and Telegram.AUTH_USERS:
user_id = message.from_user.id
if user_id == Telegram.OWNER_ID:
return True
if not (user_id in Telegram.AUTH_USERS):
await message.reply_text(text="Yᴏᴜ ᴀʀᴇ ɴᴏᴛ ᴀᴜᴛʜᴏʀɪᴢᴇᴅ ᴛᴏ ᴜsᴇ ᴛʜɪs ʙᴏᴛ.",parse_mode=ParseMode.MARKDOWN,disable_web_page_preview=True)
return False
return True
#---------------------[ USER EXIST ]---------------------#
async def is_user_exist(bot, message):
if not bool(await db.get_user(message.from_user.id)):
#await db.add_user(message.from_user.id)
await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡUsᴇʀ**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}`")
#------------------[ User Status]--------------------#
async def user_status(bot, message):
user = await db.get_user(message.from_user.id)
#await db.add_user(message.from_user.id)
return user
async def is_channel_exist(bot, message):
if not bool(await db.get_user(message.chat.id)):
await db.add_user(message.chat.id)
members = await bot.get_chat_members_count(message.chat.id)
await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡCʜᴀɴɴᴇʟ** \n**⬩ ᴄʜᴀᴛ ɴᴀᴍᴇ :** `{message.chat.title}`\n**⬩ ᴄʜᴀᴛ ɪᴅ :** `{message.chat.id}`\n**⬩ ᴛᴏᴛᴀʟ ᴍᴇᴍʙᴇʀs :** `{members}`" )
# Decorator Function to check if user is authorized
#----------------------------------------------------------------
def verify_users(func):
@functools.wraps(func)
async def wrapper(bot, message):
response = {}
user = await user_status(bot, message)
if user:
if user['tele_status']['status'] == "ACTIVE":
response['status'] = "AUTHORIZED"
elif user['tele_status']['status'] == "BANNED":
response['status'] = "BANNED"
await bot.send_message(
Telegram.ULOG_GROUP,
f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']} \n Trying to access Services**"
)
await bot.send_message(
message.from_user.id,
f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** You are Banned by Admins Contact Admins to unban you**"
)
else:
response['status'] = "NOT EXIST"
await bot.send_message(
Telegram.ULOG_GROUP,
f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']}**"
)
await bot.send_message(
message.from_user.id,
f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** Ask Admins to Authorise You**"
)
return await func(bot, message, response)
return wrapper
async def verify_user(bot, message):
if not await is_user_authorized(message):
return False
if await is_user_banned(message):
return False
await is_user_exist(bot, message)
if Telegram.FORCE_SUB:
if not await is_user_joined(bot, message):
return False
return True